tutoriales.com

Desarrollando Microservicios Reactivos con Spring WebFlux y RSocket en Java

Este tutorial te guiará en la construcción de microservicios reactivos utilizando Spring WebFlux y RSocket en Java. Exploraremos los fundamentos de la programación reactiva, implementaremos APIs basadas en WebFlux y aprenderemos a establecer comunicaciones bidireccionales eficientes con RSocket para sistemas distribuidos de alto rendimiento.

Avanzado25 min de lectura4 views
Reportar error

🚀 Introducción a la Programación Reactiva y Spring WebFlux

En el mundo actual de aplicaciones distribuidas y de alto rendimiento, la programación reactiva ha emergido como un paradigma crucial. Ofrece una forma eficiente de manejar flujos de datos asíncronos y no bloqueantes, lo que es esencial para construir sistemas escalables y responsivos.

¿Qué es la Programación Reactiva?

La programación reactiva se centra en el manejo de flujos de datos y la propagación del cambio. Esto significa que cuando un dato cambia, los componentes que dependen de él reaccionan automáticamente. Se basa en el concepto de observables y suscriptores, donde un observable emite una secuencia de eventos y un suscriptor reacciona a esos eventos.

📌 Nota: Los principios de la programación reactiva están formalizados por el *Reactive Manifesto*, que enfatiza la capacidad de respuesta, la resiliencia, la elasticidad y la orientación a mensajes.

Spring WebFlux: El Marco Reactivo de Spring

Spring WebFlux es parte del ecosistema Spring Framework 5 y proporciona soporte para construir aplicaciones web reactivas. A diferencia de Spring MVC (que está basado en el paradigma servlet bloqueante), WebFlux utiliza un stack completamente no bloqueante, lo que lo hace ideal para microservicios y APIs que necesitan manejar un gran número de concurrencias con un uso eficiente de recursos.

90% Escalabilidad
85% Eficiencia

🛠️ Configurando tu Proyecto Spring WebFlux

Antes de sumergirnos en el código, necesitamos configurar un nuevo proyecto Spring Boot con WebFlux.

Creando el Proyecto con Spring Initializr

Usaremos Spring Initializr para generar la estructura base de nuestro proyecto. Ve a start.spring.io y selecciona las siguientes dependencias:

  • Project: Maven Project
  • Language: Java
  • Spring Boot: 3.x.x (la versión más reciente estable)
  • Group: com.example
  • Artifact: reactive-service
  • Name: reactive-service
  • Package name: com.example.reactiveservice
  • Java: 17 o superior

Dependencias:

  • Spring Reactive Web (para WebFlux)
  • Spring RSocket (para RSocket)
  • Lombok (opcional, para reducir boilerplate)

Una vez configurado, haz clic en GENERATE para descargar el proyecto.

Estructura Básica del Proyecto

Descomprime el archivo y ábrelo con tu IDE favorito (IntelliJ IDEA, VS Code, Eclipse). Deberías ver una estructura similar a esta:

reactive-service
├── .mvn
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── example
│   │   │           └── reactiveservice
│   │   │               └── ReactiveServiceApplication.java
│   │   └── resources
│   │       ├── application.properties
│   │       └── static
│   └── test
├── pom.xml
└── README.md

📖 Desarrollando una API Reactiva con Spring WebFlux

Vamos a crear una API simple para gestionar una lista de productos. Usaremos Flux y Mono, los tipos reactivos fundamentales de Project Reactor (la librería que usa Spring WebFlux).

Modelando el Dominio: Clase Product

Primero, definamos nuestro modelo Product.

package com.example.reactiveservice.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Product {
    private String id;
    private String name;
    private double price;
}
💡 Consejo: `Flux` representa un flujo de 0 a N elementos, mientras que `Mono` representa un flujo de 0 o 1 elemento.

Creando un ProductRepository Reactivo (In-memory)

Para simplificar, usaremos un repositorio en memoria que devuelva Flux y Mono.

package com.example.reactiveservice.repository;

import com.example.reactiveservice.model.Product;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Repository
public class ProductRepository {

    private final Map<String, Product> products = new HashMap<>();

    @PostConstruct
    public void init() {
        products.put("1", new Product("1", "Laptop Dell", 1200.00));
        products.put("2", new Product("2", "Mouse Logitech", 25.50));
        products.put("3", new Product("3", "Teclado Mecánico", 80.00));
    }

    public Flux<Product> findAll() {
        // Simula latencia para demostrar el no-bloqueo
        return Flux.fromIterable(products.values())
                .delayElements(Duration.ofMillis(100)); 
    }

    public Mono<Product> findById(String id) {
        return Mono.justOrEmpty(products.get(id))
                .delayElement(Duration.ofMillis(50));
    }

    public Mono<Product> save(Product product) {
        if (product.getId() == null || product.getId().isEmpty()) {
            product.setId(UUID.randomUUID().toString());
        }
        products.put(product.getId(), product);
        return Mono.just(product);
    }

    public Mono<Void> deleteById(String id) {
        products.remove(id);
        return Mono.empty();
    }
}

Implementando un ProductHandler (Functional Endpoints)

WebFlux soporta dos estilos de desarrollo: Annotation-based (similar a Spring MVC) y Functional Endpoints. Usaremos Functional Endpoints para ilustrar el estilo reactivo puro.

package com.example.reactiveservice.handler;

import com.example.reactiveservice.model.Product;
import com.example.reactiveservice.repository.ProductRepository;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import static org.springframework.web.reactive.function.BodyInserters.fromValue;

@Component
public class ProductHandler {

    private final ProductRepository productRepository;

    public ProductHandler(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }

    public Mono<ServerResponse> getAllProducts(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(productRepository.findAll(), Product.class);
    }

    public Mono<ServerResponse> getProductById(ServerRequest request) {
        String id = request.pathVariable("id");
        return productRepository.findById(id)
                .flatMap(product -> ServerResponse.ok()
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(fromValue(product)))
                .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> createProduct(ServerRequest request) {
        return request.bodyToMono(Product.class)
                .flatMap(productRepository::save)
                .flatMap(savedProduct -> ServerResponse.created(request.uri())
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(fromValue(savedProduct)));
    }

    public Mono<ServerResponse> deleteProduct(ServerRequest request) {
        String id = request.pathVariable("id");
        return productRepository.deleteById(id)
                .then(ServerResponse.noContent().build());
    }
}

Definiendo las Rutas (Functional Endpoints)

Ahora, enlazamos los handlers con las rutas HTTP.

package com.example.reactiveservice.router;

import com.example.reactiveservice.handler.ProductHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class ProductRouter {

    @Bean
    public RouterFunction<ServerResponse> productRoutes(ProductHandler handler) {
        return route(GET("/products"), handler::getAllProducts)
                .andRoute(GET("/products/{id}"), handler::getProductById)
                .andRoute(POST("/products"), handler::createProduct)
                .andRoute(DELETE("/products/{id}"), handler::deleteProduct);
    }
}

Probando la API Reactiva

Inicia tu aplicación Spring Boot. Puedes probar la API usando curl o Postman.

  • Obtener todos los productos:
curl -v http://localhost:8080/products
Verás un flujo de datos que se entregan progresivamente debido al `delayElements` en el repositorio.
  • Obtener un producto por ID:
curl -v http://localhost:8080/products/1
  • Crear un nuevo producto (POST):
curl -v -X POST -H "Content-Type: application/json" -d '{"name":"Webcam HD", "price":50.00}' http://localhost:8080/products
🔥 Importante: La ventaja de WebFlux es su capacidad para manejar un gran número de solicitudes concurrentes con un número reducido de hilos, ya que no bloquea la ejecución mientras espera I/O.

⚡ Introducción a RSocket en Microservicios

RSocket es un protocolo de capa de aplicación que facilita la comunicación reactiva entre servicios. Fue diseñado para la comunicación de microservicios, proporcionando modelos de interacción flexibles y manejo de backpressure.

¿Por qué RSocket?

Mientras que HTTP/1.1 y HTTP/2 son excelentes para el modelo request-response, RSocket va más allá al ofrecer:

  • Multiplexación: Múltiples solicitudes y respuestas sobre una única conexión TCP.
  • Reanudación de sesión: Capacidad para reanudar sesiones después de interrupciones de red.
  • Backpressure: Mecanismo para que el consumidor indique al productor cuántos datos puede manejar, evitando sobrecargas.
  • Modelos de interacción:
    • Request/Response: Similar a HTTP.
    • Request/Stream: Una solicitud, múltiples respuestas (ideal para datos en tiempo real).
    • Fire-and-Forget: No espera respuesta.
    • Channel: Comunicación bidireccional de flujo a flujo (stream-to-stream).
Comparativa: HTTP vs RSocket HTTP (Request/Response) Cliente Servidor Conexión 1 Conexión 2 Req Res RSocket (Multiplexado) CONEXIÓN ÚNICA TCP/WebSocket Cliente Servidor Request-Response Request-Stream Fire-and-Forget Channel (Bidireccional)

RSocket con Spring Boot

Spring Boot proporciona una excelente integración con RSocket, permitiéndonos exponer y consumir servicios RSocket fácilmente.


📡 Implementando un Servidor RSocket con Spring

Vamos a extender nuestro servicio reactivo para que también actúe como un servidor RSocket, ofreciendo un servicio de notificación de productos.

Creando el ProductRSocketController

Utilizaremos la anotación @MessageMapping para definir los endpoints RSocket.

package com.example.reactiveservice.rsocket;

import com.example.reactiveservice.model.Product;
import com.example.reactiveservice.repository.ProductRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

@Slf4j
@Controller
public class ProductRSocketController {

    private final ProductRepository productRepository;

    public ProductRSocketController(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }

    // Request-Response
    @MessageMapping("get.product")
    public Mono<Product> getProductById(String id) {
        log.info("RSocket: Solicitud de producto por ID: {}", id);
        return productRepository.findById(id);
    }

    // Request-Stream
    @MessageMapping("product.stream")
    public Flux<Product> streamAllProducts() {
        log.info("RSocket: Solicitud de stream de todos los productos");
        return productRepository.findAll()
                .delayElements(Duration.ofSeconds(1))
                .onBackpressureDrop(); // Manejo de backpressure simple
    }

    // Fire-and-Forget
    @MessageMapping("product.notify")
    public Mono<Void> productNotification(Product product) {
        log.info("RSocket: Notificación de producto recibida: {}", product);
        // Aquí podríamos, por ejemplo, enviar la notificación a otro servicio o a un log
        return Mono.empty();
    }

    // Channel (bidireccional, cliente envía y servidor envía)
    @MessageMapping("product.channel")
    public Flux<Product> productChannel(Flux<String> productIds) {
        log.info("RSocket: Solicitud de canal de productos");
        return productIds
                .delayElements(Duration.ofMillis(500)) // Simula procesamiento del cliente
                .doOnNext(id -> log.info("RSocket: Procesando ID de cliente: {}", id))
                .flatMap(productRepository::findById)
                .delayElements(Duration.ofSeconds(2)); // Simula stream del servidor
    }
}

Configurando el Servidor RSocket en application.properties

Para que nuestro servicio escuche las conexiones RSocket, necesitamos configurar un puerto RSocket. Esto se hace en src/main/resources/application.properties.

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
💡 Consejo: RSocket puede operar sobre TCP, WebSockets y otros transportes. TCP es común para la comunicación entre servicios.

🤝 Construyendo un Cliente RSocket

Para probar nuestro servidor RSocket, crearemos un cliente simple que se conecte y utilice los diferentes modelos de interacción.

Creando un Proyecto Cliente RSocket

Puedes crear un nuevo proyecto Spring Boot (o añadirlo como un módulo a tu proyecto existente) con la dependencia Spring RSocket.

Dependencias del cliente:

  • Spring RSocket
  • Spring Reactive Web (opcional, si quieres usar WebClient)
  • Lombok (opcional)

Configuración del Cliente RSocketRequester

Necesitamos un RSocketRequester para interactuar con el servidor.

package com.example.reactiveclient;

import com.example.reactiveservice.model.Product;
import io.rsocket.SocketAcceptor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

@Slf4j
@SpringBootApplication
public class ReactiveClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReactiveClientApplication.class, args);
    }

    @Bean
    public RSocketRequester getRSocketRequester(RSocketRequester.Builder builder, RSocketStrategies rSocketStrategies) {
        // Un cliente RSocket también puede recibir solicitudes si implementa un MessageMapping en un @Controller
        // Aquí configuramos un SocketAcceptor vacío para este ejemplo simple.
        SocketAcceptor responder = rSocketStrategies.socketAcceptor().accept(new Object() {});

        return builder
                .dataMimeType(MimeTypeUtils.APPLICATION_JSON)
                .rsocketConnector(connector -> connector.acceptor(responder))
                .tcp("localhost", 7000);
    }

    @Bean
    public CommandLineRunner run(RSocketRequester requester) {
        return args -> {
            log.info("--- Cliente RSocket Iniciado ---");

            // 1. Request-Response
            requester.route("get.product")
                    .data("1")
                    .retrieveMono(Product.class)
                    .doOnNext(product -> log.info("Received Product (RR): {}", product))
                    .subscribe();

            // 2. Request-Stream
            log.info("Solicitando stream de productos...");
            requester.route("product.stream")
                    .retrieveFlux(Product.class)
                    .take(5) // Tomar solo 5 elementos para el ejemplo
                    .delayElements(Duration.ofSeconds(1)) // Simula procesamiento lento del cliente
                    .doOnNext(product -> log.info("Received Product (RS): {}", product))
                    .doFinally(signalType -> log.info("Stream de productos finalizado (RS)"))
                    .subscribe();

            // 3. Fire-and-Forget
            Product newProduct = new Product(null, "Monitor Ultrawide", 350.00);
            requester.route("product.notify")
                    .data(newProduct)
                    .send()
                    .doOnSuccess(v -> log.info("Notification sent (FnF) for product: {}", newProduct.getName()))
                    .subscribe();

            // 4. Channel (Bidireccional)
            log.info("Iniciando canal bidireccional...");
            Flux<String> clientProductIds = Flux.just("1", "3", "5", "2")
                    .delayElements(Duration.ofSeconds(1));

            requester.route("product.channel")
                    .data(clientProductIds)
                    .retrieveFlux(Product.class)
                    .doOnNext(product -> log.info("Received Product (Channel): {}", product))
                    .doFinally(signalType -> log.info("Canal de productos finalizado (Channel)"))
                    .subscribe();

            // Mantener la aplicación cliente en ejecución para recibir streams/canales
            // En un entorno real, esto sería manejado por un servidor o un bucle de eventos
            Thread.sleep(20000);
        };
    }
}

Ejecutando el Cliente y el Servidor

  1. Asegúrate de que tu ReactiveServiceApplication esté corriendo y escuchando en el puerto 8080 (WebFlux) y 7000 (RSocket).
  2. Inicia tu ReactiveClientApplication.

Observa la salida en la consola de ambos servicios. Verás cómo el cliente solicita datos al servidor RSocket y cómo el servidor responde utilizando los diferentes modelos de interacción. El modelo Request-Stream demostrará cómo los datos fluyen a lo largo del tiempo, y el Channel mostrará la comunicación bidireccional.

⚠️ Advertencia: Para el `product.channel`, el servidor solo tiene productos con IDs "1", "2", "3". El ID "5" del cliente no encontrará un producto en el servidor y, por lo tanto, no emitirá una respuesta para ese ID. Esto es un comportamiento normal en RSocket si el Mono devuelto por `findById` es `empty`.

✨ Beneficios y Casos de Uso de WebFlux + RSocket

Combinar Spring WebFlux con RSocket ofrece una potente solución para construir microservicios:

  • Rendimiento y Escalabilidad: WebFlux garantiza un uso eficiente de los recursos del servidor para APIs REST, mientras que RSocket optimiza la comunicación entre servicios con backpressure y multiplexación.
  • Comunicaciones en Tiempo Real: RSocket Request-Stream y Channel son ideales para aplicaciones que requieren actualizaciones en vivo, como paneles de control, chats o notificaciones.
  • Resiliencia: La capacidad de reanudación de sesiones de RSocket mejora la robustez de los sistemas distribuidos.
  • Flexibilidad: Permite elegir el modelo de comunicación más adecuado para cada interacción (request-response, streaming, etc.).

Tabla Comparativa: WebFlux vs. WebFlux + RSocket

CaracterísticaSpring WebFlux (REST)Spring WebFlux + RSocket
---------
Modelo de ComunicaciónRequest-ResponseRequest-Response, Request-Stream, Fire-and-Forget, Channel
ProtocoloHTTP/1.1, HTTP/2RSocket (TCP, WebSocket)
---------
Uso PrincipalAPIs REST para navegadores/clientesComunicación inter-servicio, Tiempo real
BackpressureLimitado (a nivel TCP)Nativo y granular
---------
MultiplexaciónHTTP/2Nativo, más flexible
OverheadMayor para flujos continuosMenor para flujos y canales
¿Cuándo usar solo WebFlux y cuándo añadir RSocket? Si tu aplicación necesita exponer APIs RESTful tradicionales a clientes web o móviles, WebFlux es una excelente opción. Si tienes microservicios que necesitan comunicarse entre sí con alta eficiencia, en tiempo real, o con requisitos de streaming bidireccional, RSocket es el complemento perfecto para WebFlux. No son excluyentes, sino complementarios.

💡 Conclusión

La programación reactiva con Spring WebFlux y RSocket te equipa con herramientas poderosas para enfrentar los desafíos de construir microservicios modernos. Al adoptar un enfoque no bloqueante y orientarse a los flujos de datos, puedes desarrollar aplicaciones más eficientes, escalables y resilientes.

Hemos cubierto desde los fundamentos de WebFlux y la creación de una API reactiva, hasta la implementación y consumo de servicios RSocket con sus diversos modelos de interacción. ¡Ahora estás listo para explorar el mundo de los microservicios reactivos en Java!

Tutoriales relacionados

Comentarios (0)

Aún no hay comentarios. ¡Sé el primero!