tutoriales.com

Desarrollo de Microservicios Basados en Eventos en Go: Implementando un Patrón Saga con NATS

Este tutorial te guiará en la creación de microservicios basados en eventos utilizando Go y el sistema de mensajería NATS. Exploraremos el patrón Saga para manejar transacciones distribuidas y asegurar la consistencia de datos entre servicios.

Intermedio35 min de lectura11 views
Reportar error

Introducción a los Microservicios Basados en Eventos y el Patrón Saga 🚀

En el mundo moderno del desarrollo de software, los microservicios se han consolidado como una arquitectura preferida para construir sistemas escalables y resilientes. Sin embargo, la distribución de la lógica de negocio en múltiples servicios introduce desafíos, especialmente en lo que respecta a la gestión de transacciones que abarcan varios de ellos. Aquí es donde entran en juego los microservicios basados en eventos y patrones como el Saga.

Un sistema basado en eventos es aquel donde los servicios se comunican emitiendo y reaccionando a eventos. En lugar de llamadas directas y síncronas, un servicio publica un evento cuando algo significativo ocurre (por ejemplo, OrdenCreada), y otros servicios interesados pueden subscribirse a esos eventos para realizar sus propias acciones. Esta desacopla a los servicios, haciéndolos más robustos y flexibles.

El patrón Saga aborda el problema de las transacciones distribuidas. A diferencia de las transacciones ACID monolíticas que bloquean recursos hasta que se confirma todo, un Saga es una secuencia de transacciones locales, donde cada transacción local actualiza la base de datos de un servicio individual y publica un evento. Si una transacción local falla, el Saga ejecuta una serie de transacciones de compensación para deshacer los cambios realizados por las transacciones locales anteriores. Esto garantiza la consistencia eventual del sistema.

💡 Consejo: El patrón Saga es esencial cuando no puedes usar transacciones distribuidas clásicas (2PC) debido a la naturaleza heterogénea de las bases de datos o el deseo de alta disponibilidad.

En este tutorial, construiremos un ejemplo práctico de un sistema de pedidos simple utilizando Go y NATS como broker de mensajes. Implementaremos un Saga para gestionar la creación de un pedido que involucrará a un servicio de Pedidos y a un servicio de Inventario.

¿Por qué Go para Microservicios Basados en Eventos? ✨

Go es una elección excelente para este tipo de arquitectura por varias razones:

  • Concurrencia: Las goroutines y los channels de Go hacen que el manejo de la concurrencia y la programación asíncrona sean muy sencillos y eficientes, algo crucial para los servicios que publican y consumen eventos constantemente.
  • Rendimiento: Go compila a binarios nativos, ofreciendo un rendimiento excepcional y un bajo consumo de recursos.
  • Despliegue Sencillo: Los binarios estáticamente enlazados son fáciles de desplegar, ya sea en contenedores Docker o directamente en la infraestructura.
  • Ecosistema Robusto: Un vasto ecosistema de librerías y herramientas para redes, bases de datos y sistemas de mensajería.

¿Por qué NATS para la Mensajería? 📨

NATS es un sistema de mensajería ligero, de alto rendimiento y fácil de usar, ideal para arquitecturas de microservicios. Algunas de sus características clave incluyen:

  • Rendimiento: Diseñado para la velocidad y la baja latencia.
  • Simplicidad: API sencilla y fácil de integrar.
  • Modelos de Mensajería: Soporta Publicar/Subscribir y Solicitud/Respuesta.
  • Streaming (NATS JetStream): Proporciona persistencia de mensajes, entrega "At-Least-Once" y otras características avanzadas, convirtiéndolo en una excelente alternativa a Kafka para muchos casos de uso.
🔥 Importante: Para este tutorial, usaremos NATS JetStream para asegurar la persistencia de los eventos y la entrega confiable, lo cual es fundamental para el patrón Saga.

Arquitectura del Sistema de Pedidos con Saga 🏗️

Nuestro sistema de ejemplo estará compuesto por dos microservicios principales y NATS como broker de mensajes:

  1. Servicio de Pedidos (order-service): Responsable de crear, actualizar y gestionar los pedidos de los clientes. Publicará eventos como OrderCreated y escuchará eventos de compensación o confirmación.
  2. Servicio de Inventario (inventory-service): Gestionará la disponibilidad y la reducción del stock de productos. Escuchará OrderCreated y publicará InventoryReserved o InventoryReservationFailed.

El patrón Saga se implementará utilizando un coordinador central (que residirá en el order-service en este ejemplo, pero podría ser un servicio Saga dedicado en sistemas más complejos) que orquestará los pasos de la transacción distribuida.

Cliente order-service (Orquestador Saga) inventory-service (Participante) 1. REST POST 2. NATS: OrderCreated 3a. InventoryReserved 3b. InventoryReservationFailed Pedido Confirmado Pedido Fallido OrderCreationFailed (NATS)

Flujo del Saga para "Crear Pedido" 🔄

  1. El cliente envía una solicitud para crear un pedido al order-service.
  2. El order-service crea un nuevo pedido con estado PENDING en su base de datos local.
  3. El order-service publica un evento OrderCreated en un tema de NATS (ej. orders.created) con los detalles del pedido.
  4. El inventory-service se suscribe al tema orders.created.
  5. Al recibir OrderCreated, el inventory-service intenta reservar el inventario para los productos del pedido.
    • Si la reserva es exitosa: El inventory-service actualiza su inventario y publica un evento InventoryReserved (ej. inventory.reserved).
    • Si la reserva falla: El inventory-service publica un evento InventoryReservationFailed (ej. inventory.reservation_failed).
  6. El order-service se suscribe a los temas inventory.reserved y inventory.reservation_failed.
    • Al recibir InventoryReserved: El order-service actualiza el estado del pedido a CONFIRMED en su base de datos local.
    • Al recibir InventoryReservationFailed: El order-service actualiza el estado del pedido a FAILED en su base de datos local y, si es necesario, publica un evento de compensación (ej. OrderCreationFailed) para que otros servicios puedan deshacer cualquier acción previa.
Paso 1: Cliente envía Crear Pedido
Paso 2: Order Service: Guarda Pedido (PENDING), Publica OrderCreated
Paso 3: Inventory Service: Recibe OrderCreated, Intenta reservar Inventario
Paso 4a (Éxito): Inventory Service: Publica InventoryReserved
Paso 4b (Fallo): Inventory Service: Publica InventoryReservationFailed
Paso 5a (Éxito): Order Service: Recibe InventoryReserved, Actualiza Pedido (CONFIRMED)
Paso 5b (Fallo): Order Service: Recibe InventoryReservationFailed, Actualiza Pedido (FAILED), (Opcional) Publica Evento de Compensación

Configurando NATS JetStream ⚙️

Antes de empezar con el código Go, necesitamos tener una instancia de NATS JetStream corriendo. La forma más sencilla es usar Docker.

Crea un archivo docker-compose.yml:

version: '3.8'
services:
  nats:
    image: nats:2.10.15-alpine
    command: -js
    ports:
      - "4222:4222" # NATS Client Port
      - "8222:8222" # NATS Monitoring Port
    volumes:
      - nats_data:/data

volumes:
  nats_data:

Levanta el servidor NATS:

docker-compose up -d

Verifica que NATS esté corriendo visitando http://localhost:8222 en tu navegador. Deberías ver la interfaz de administración de NATS. También puedes usar la herramienta nats cli:

nats account info

Ahora, necesitamos crear los Streams en NATS JetStream. Los Streams son colecciones de mensajes que persisten y se entregan a los consumidores. Crearemos un stream para los eventos de ordenes y otro para inventario.

Podemos crear los streams programáticamente en nuestros servicios Go o usar la CLI. Para este tutorial, los crearemos directamente en el código para simplificar la configuración inicial.


Desarrollo del Servicio de Pedidos (order-service) 📦

El servicio de pedidos será responsable de:

  1. Manejar las solicitudes HTTP para crear nuevos pedidos.
  2. Guardar el pedido inicial en su base de datos (usaremos un mapa en memoria para simplificar).
  3. Publicar un evento OrderCreated en NATS JetStream.
  4. Suscribirse a eventos de InventoryReserved y InventoryReservationFailed para actualizar el estado del pedido.

Estructuras de Datos y Modelos 🧱

Crearemos un Order struct y un OrderCreatedEvent struct.

package main

type OrderStatus string

const (
	OrderStatusPending    OrderStatus = "PENDING"
	OrderStatusConfirmed  OrderStatus = "CONFIRMED"
	OrderStatusFailed     OrderStatus = "FAILED"
)

type Order struct {
	ID        string      `json:"id"`
	UserID    string      `json:"user_id"`
	ProductID string      `json:"product_id"`
	Quantity  int         `json:"quantity"`
	Status    OrderStatus `json:"status"`
}

type OrderCreatedEvent struct {
	OrderID   string `json:"order_id"`
	ProductID string `json:"product_id"`
	Quantity  int    `json:"quantity"`
}

type InventoryReservedEvent struct {
	OrderID   string `json:"order_id"`	
	ProductID string `json:"product_id"`
	Quantity  int    `json:"quantity"`
}

type InventoryReservationFailedEvent struct {
	OrderID   string `json:"order_id"`	
	ProductID string `json:"product_id"`	
	Quantity  int    `json:"quantity"`
	Reason    string `json:"reason"`
}

Conexión a NATS y Configuración de JetStream 🌐

Usaremos la librería oficial github.com/nats-io/nats.go.

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/google/uuid"
	"github.com/nats-io/nats.go"
)

var ( 
	nc   *nats.Conn
	js   nats.JetStreamContext
	orders = make(map[string]Order)
	mu     sync.Mutex
)

func initNATS() {
	var err error
	// Conectar a NATS
	nc, err = nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatalf("Error al conectar a NATS: %v", err)
	}

	// Inicializar JetStream
	js, err = nc.JetStream()
	if err != nil {
		log.Fatalf("Error al obtener JetStream context: %v", err)
	}

	// Crear Stream 'ORDERS'
	_, err = js.AddStream(&nats.StreamConfig{
		Name:     "ORDERS",
		Subjects: []string{"orders.>"},
		Retention: nats.WorkQueuePolicy, // Solo un consumidor por mensaje
		MaxMsgs:  100000, 
		MaxBytes: 100 * 1024 * 1024,
		MaxAge:   24 * time.Hour,
		Storage:  nats.FileStorage,
		Replicas: 1,
	})
	if err != nil && err != nats.ErrStreamNameAlreadyInUse {
		log.Fatalf("Error al crear stream 'ORDERS': %v", err)
	}

	// Crear Stream 'INVENTORY_EVENTS'
	_, err = js.AddStream(&nats.StreamConfig{
		Name:     "INVENTORY_EVENTS",
		Subjects: []string{"inventory.>"},
		Retention: nats.WorkQueuePolicy,
		MaxMsgs:  100000,
		MaxBytes: 100 * 1024 * 1024,
		MaxAge:   24 * time.Hour,
		Storage:  nats.FileStorage,
		Replicas: 1,
	})
	if err != nil && err != nats.ErrStreamNameAlreadyInUse {
		log.Fatalf("Error al crear stream 'INVENTORY_EVENTS': %v", err)
	}

	log.Println("NATS JetStream inicializado y streams creados.")
}

func main() {
	initNATS()
	defer nc.Close()

	// Subscribirse a eventos de inventario
	go subscribeToInventoryEvents()

	http.HandleFunc("/orders", createOrderHandler)
	http.HandleFunc("/orders/status", getOrderStatusHandler)

	log.Println("Servicio de Pedidos escuchando en :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}
📌 Nota: Los `StreamConfig` definen cómo NATS JetStream maneja los mensajes. `WorkQueuePolicy` asegura que cada mensaje sea entregado a un solo consumidor activo.

Manejador para Crear Pedidos 📝

Cuando se recibe una solicitud POST /orders, el servicio creará un pedido, lo guardará como PENDING y publicará el evento OrderCreated.

// ... (código anterior)

func createOrderHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Método no permitido", http.StatusMethodNotAllowed)
		return
	}

	var req Order
	err := json.NewDecoder(r.Body).Decode(&req)
	if err != nil {
		http.Error(w, "Solicitud inválida", http.StatusBadRequest)
		return
	}

	req.ID = uuid.New().String()
	req.Status = OrderStatusPending

	mu.Lock()
	orders[req.ID] = req
	mu.Unlock()

	// Publicar evento OrderCreated
	event := OrderCreatedEvent{
		OrderID:   req.ID,
		ProductID: req.ProductID,
		Quantity:  req.Quantity,
	}
	eventBytes, _ := json.Marshal(event)

	_, err = js.Publish("orders.created", eventBytes)
	if err != nil {
		log.Printf("Error al publicar OrderCreated: %v", err)
		http.Error(w, "Error interno al crear pedido", http.StatusInternalServerError)
		return
	}

	log.Printf("Pedido %s creado y evento OrderCreated publicado.", req.ID)

	w.WriteHeader(http.StatusCreated)
	json.NewEncoder(w).Encode(req)
}

func getOrderStatusHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodGet {
		http.Error(w, "Método no permitido", http.StatusMethodNotAllowed)
		return
	}

	orderID := r.URL.Query().Get("id")
	if orderID == "" {
		http.Error(w, "ID de pedido requerido", http.StatusBadRequest)
		return
	}

	mu.Lock()
	order, ok := orders[orderID]
	mu.Unlock()

	if !ok {
		http.Error(w, "Pedido no encontrado", http.StatusNotFound)
		return
	}

	json.NewEncoder(w).Encode(order)
}

Suscripción y Manejo de Eventos de Inventario 👂

El order-service debe escuchar los eventos que el inventory-service publica para actualizar el estado del pedido.

// ... (código anterior)

func subscribeToInventoryEvents() {
	// Suscribirse a InventoryReserved
	_, err := js.QueueSubscribe("inventory.reserved", "order-consumer-group", func(msg *nats.Msg) {
		var event InventoryReservedEvent
		err := json.Unmarshal(msg.Data, &event)
		if err != nil {
			log.Printf("Error al decodificar InventoryReservedEvent: %v", err)
			return
		}

		mu.Lock()
		order, ok := orders[event.OrderID]
		if ok {
			order.Status = OrderStatusConfirmed
			orders[event.OrderID] = order
			log.Printf("Pedido %s CONFIRMADO (Inventario reservado).", event.OrderID)
		}
		mu.Unlock()
		msg.Ack()
	}, nats.Durable("order-res-durable"), nats.ManualAck())

	if err != nil {
		log.Fatalf("Error al suscribirse a inventory.reserved: %v", err)
	}

	// Suscribirse a InventoryReservationFailed
	_, err = js.QueueSubscribe("inventory.reservation_failed", "order-consumer-group", func(msg *nats.Msg) {
		var event InventoryReservationFailedEvent
		err := json.Unmarshal(msg.Data, &event)
		if err != nil {
			log.Printf("Error al decodificar InventoryReservationFailedEvent: %v", err)
			return
		}

		mu.Lock()
		order, ok := orders[event.OrderID]
		if ok {
			order.Status = OrderStatusFailed
			orders[event.OrderID] = order
			log.Printf("Pedido %s FALLIDO (Inventario no disponible): %s.", event.OrderID, event.Reason)
			// Aquí podrías publicar un evento de compensación, por ejemplo, "OrderCreationFailed" si hubiera más servicios involucrados.
		}
		mu.Unlock()
		msg.Ack()
	}, nats.Durable("order-fail-durable"), nats.ManualAck())

	if err != nil {
		log.Fatalf("Error al suscribirse a inventory.reservation_failed: %v", err)
	}

	log.Println("Suscrito a eventos de inventario.")
}
⚠️ Advertencia: Para la persistencia en JetStream, es crucial usar `nats.Durable()` y `nats.ManualAck()`. `Durable` crea un consumidor duradero que recuerda su posición, y `ManualAck` requiere que tu código llame `msg.Ack()` para confirmar que el mensaje fue procesado.

Desarrollo del Servicio de Inventario (inventory-service) 🛒

El servicio de inventario se encargará de:

  1. Mantener el stock de productos (usaremos un mapa en memoria).
  2. Suscribirse al evento OrderCreated.
  3. Intentar reservar el inventario.
  4. Publicar InventoryReserved o InventoryReservationFailed en NATS JetStream.

Estructuras de Datos y Modelos 📦

package main

type Product struct {
	ID       string `json:"id"`
	Name     string `json:"name"`
	Quantity int    `json:"quantity"`
}

// Los mismos eventos definidos en order-service, para asegurar coherencia
type OrderCreatedEvent struct {
	OrderID   string `json:"order_id"`
	ProductID string `json:"product_id"`
	Quantity  int    `json:"quantity"`
}

type InventoryReservedEvent struct {
	OrderID   string `json:"order_id"`	
	ProductID string `json:"product_id"`
	Quantity  int    `json:"quantity"`
}

type InventoryReservationFailedEvent struct {
	OrderID   string `json:"order_id"`	
	ProductID string `json:"product_id"`	
	Quantity  int    `json:"quantity"`
	Reason    string `json:"reason"`
}

Conexión a NATS y Stock Inicial 🏭

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/nats-io/nats.go"
)

var (
	nc            *nats.Conn
	js            nats.JetStreamContext
	productStock  = make(map[string]Product)
	stockMu       sync.Mutex
)

func initNATS() {
	var err error
	nc, err = nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatalf("Error al conectar a NATS: %v", err)
	}

	js, err = nc.JetStream()
	if err != nil {
		log.Fatalf("Error al obtener JetStream context: %v", err)
	}

	// Crear Stream 'INVENTORY_EVENTS' (si no existe, desde order-service ya se creó)
	_, err = js.AddStream(&nats.StreamConfig{
		Name:     "INVENTORY_EVENTS",
		Subjects: []string{"inventory.>"},
		Retention: nats.WorkQueuePolicy,
		MaxMsgs:  100000,
		MaxBytes: 100 * 1024 * 1024,
		MaxAge:   24 * time.Hour,
		Storage:  nats.FileStorage,
		Replicas: 1,
	})
	if err != nil && err != nats.ErrStreamNameAlreadyInUse {
		log.Fatalf("Error al crear stream 'INVENTORY_EVENTS': %v", err)
	}

	// Crear Stream 'ORDERS' (si no existe, desde order-service ya se creó)
	_, err = js.AddStream(&nats.StreamConfig{
		Name:     "ORDERS",
		Subjects: []string{"orders.>"},
		Retention: nats.WorkQueuePolicy, 
		MaxMsgs:  100000, 
		MaxBytes: 100 * 1024 * 1024,
		MaxAge:   24 * time.Hour,
		Storage:  nats.FileStorage,
		Replicas: 1,
	})
	if err != nil && err != nats.ErrStreamNameAlreadyInUse {
		log.Fatalf("Error al crear stream 'ORDERS': %v", err)
	}

	log.Println("NATS JetStream inicializado y streams verificados.")
}

func initStock() {
	stockMu.Lock()
	productStock["product-123"] = Product{
		ID:       "product-123",
		Name:     "Laptop X",
		Quantity: 5,
	}
	productStock["product-456"] = Product{
		ID:       "product-456",
		Name:     "Mouse Y",
		Quantity: 10,
	}
	stockMu.Unlock()
	log.Println("Stock inicializado.")
}

func main() {
	initNATS()
	defer nc.Close()

	initStock()

	// Suscribirse a eventos de creación de pedidos
	subscribeToOrderCreated()

	log.Println("Servicio de Inventario escuchando eventos...")
	select {} // Mantener el servicio corriendo indefinidamente
}

Suscripción a Eventos OrderCreated y Lógica de Reserva 🎯

El corazón del inventory-service es su suscriptor al evento OrderCreated. Aquí es donde se realiza la lógica de negocio para la reserva de inventario y se publican los eventos de respuesta.

// ... (código anterior)

func subscribeToOrderCreated() {
	_, err := js.QueueSubscribe("orders.created", "inventory-consumer-group", func(msg *nats.Msg) {
		var event OrderCreatedEvent
		err := json.Unmarshal(msg.Data, &event)
		if err != nil {
			log.Printf("Error al decodificar OrderCreatedEvent: %v", err)
			msg.Ack()
			return
		}

		log.Printf("Recibido OrderCreated para pedido %s, producto %s, cantidad %d", event.OrderID, event.ProductID, event.Quantity)

		stockMu.Lock()
		product, ok := productStock[event.ProductID]
		if !ok || product.Quantity < event.Quantity {
			// Inventario insuficiente o producto no encontrado
			stockMu.Unlock()
			publishInventoryReservationFailed(event.OrderID, event.ProductID, event.Quantity, "Inventario insuficiente o producto no existe")
			msg.Ack() // Confirmar que el mensaje fue procesado (incluso si falló la reserva)
			return
		}

		// Reservar inventario
		product.Quantity -= event.Quantity
		productStock[event.ProductID] = product
		log.Printf("Inventario reservado para pedido %s. Nuevo stock de %s: %d", event.OrderID, event.ProductID, product.Quantity)
		stockMu.Unlock()

		publishInventoryReserved(event.OrderID, event.ProductID, event.Quantity)
		msg.Ack() // Confirmar que el mensaje fue procesado exitosamente

	}, nats.Durable("inventory-created-durable"), nats.ManualAck())

	if err != nil {
		log.Fatalf("Error al suscribirse a orders.created: %v", err)
	}

	log.Println("Suscrito a orders.created.")
}

func publishInventoryReserved(orderID, productID string, quantity int) {
	event := InventoryReservedEvent{
		OrderID:   orderID,
		ProductID: productID,
		Quantity:  quantity,
	}
	eventBytes, _ := json.Marshal(event)
	
	_, err := js.Publish("inventory.reserved", eventBytes)
	if err != nil {
		log.Printf("Error al publicar InventoryReserved: %v", err)
	}
}

func publishInventoryReservationFailed(orderID, productID string, quantity int, reason string) {
	event := InventoryReservationFailedEvent{
		OrderID:   orderID,
		ProductID: productID,
		Quantity:  quantity,
		Reason:    reason,
	}
	eventBytes, _ := json.Marshal(event)

	_, err := js.Publish("inventory.reservation_failed", eventBytes)
	if err != nil {
		log.Printf("Error al publicar InventoryReservationFailed: %v", err)
	}
}

Poniendo Todo en Marcha: Pruebas y Verificación ✅

Ahora que ambos servicios están codificados, es hora de probar el sistema.

Pasos para Ejecutar 🏃

  1. Asegúrate de que NATS esté corriendo con Docker Compose (docker-compose up -d).
  2. Abre dos terminales separadas.
  3. En la primera terminal, navega a la carpeta de tu order-service y ejecuta:
go run .
  1. En la segunda terminal, navega a la carpeta de tu inventory-service y ejecuta:
go run .

Verás logs en ambas terminales indicando que los servicios se han iniciado y suscrito a los eventos correspondientes.

Escenarios de Prueba 🧪

Usaremos curl para interactuar con el order-service.

Escenario 1: Pedido Exitoso (Saga Completado) 🎉

  1. Crea un pedido para un producto existente y con suficiente stock (product-123, 2 unidades).
curl -X POST -H "Content-Type: application/json" -d '{"user_id": "user-A", "product_id": "product-123", "quantity": 2}' http://localhost:8080/orders
Output de `order-service`:
... Pedido <ID_DEL_PEDIDO> creado y evento OrderCreated publicado.
... Pedido <ID_DEL_PEDIDO> CONFIRMADO (Inventario reservado).
Output de `inventory-service`:
... Recibido OrderCreated para pedido <ID_DEL_PEDIDO>, producto product-123, cantidad 2
... Inventario reservado para pedido <ID_DEL_PEDIDO>. Nuevo stock de product-123: 3
  1. Verifica el estado del pedido:
curl http://localhost:8080/orders/status?id=<ID_DEL_PEDIDO>
La respuesta debería mostrar `"status": "CONFIRMED"`.

Escenario 2: Pedido Fallido (Saga con Compensación/Fallo) ❌

  1. Intenta crear un pedido con una cantidad mayor a la disponible para product-123 (recuerda que el stock ahora es 3). Por ejemplo, 5 unidades.
curl -X POST -H "Content-Type: application/json" -d '{"user_id": "user-B", "product_id": "product-123", "quantity": 5}' http://localhost:8080/orders
Output de `order-service`:
... Pedido <ID_DEL_PEDIDO> creado y evento OrderCreated publicado.
... Pedido <ID_DEL_PEDIDO> FALLIDO (Inventario no disponible): Inventario insuficiente o producto no existe.
Output de `inventory-service`:
... Recibido OrderCreated para pedido <ID_DEL_PEDIDO>, producto product-123, cantidad 5
En este caso, el `inventory-service` no registrará una reserva y el `order-service` marcará el pedido como `FAILED`.

2. Verifica el estado del pedido:

curl http://localhost:8080/orders/status?id=<ID_DEL_PEDIDO>
La respuesta debería mostrar `"status": "FAILED"`.

Observaciones 🔎

  • Consistencia Eventual: Notarás que el estado del pedido en el order-service se actualiza después de que el inventory-service procesa el evento. Esto es la consistencia eventual en acción. El sistema no está bloqueado esperando una respuesta instantánea.
  • Idempotencia: En un sistema real, los consumidores de eventos deben ser idempotentes, es decir, procesar el mismo evento varias veces no debería causar efectos secundarios no deseados. Para este ejemplo simple, no lo hemos implementado explícitamente, pero es una consideración clave.
  • Transacciones de Compensación: Nuestro ejemplo muestra el order-service marcando el pedido como FAILED si la reserva de inventario falla. En un Saga más complejo, si el order-service ya hubiera realizado otras acciones (ej. cobrar al cliente), necesitaría publicar un evento de compensación (ej. RefundRequested) para deshacer esas acciones.

Consideraciones Avanzadas y Mejores Prácticas 🌟

Este tutorial proporciona una base sólida, pero los sistemas basados en eventos y los Sagas pueden volverse complejos. Aquí hay algunas consideraciones adicionales:

Patrones de Orquestación y Coreografía 🕺💃

Hemos implementado un Saga de orquestación, donde el order-service actúa como un coordinador central que le dice a los otros servicios qué hacer. La alternativa es la coreografía, donde cada servicio escucha eventos y decide de forma autónoma qué acción realizar, sin un coordinador central explícito.

CaracterísticaOrquestación (Centralizada)Coreografía (Descentralizada)
---------
ComplejidadIdeal para Sagas con muchos pasos o lógica complejaMás simple para Sagas pequeños y menos pasos
AcoplamientoEl orquestador conoce la lógica del Saga (alto acoplamiento)Los servicios solo conocen sus propios pasos (bajo acoplamiento)
---------
VisibilidadFácil de monitorear el progreso del SagaDifícil de rastrear el flujo completo del Saga
MantenimientoCambios en la lógica del Saga requieren modificar el orquestadorCambios en la lógica del Saga pueden requerir coordinar múltiples servicios
💡 Consejo: Elige la orquestación para Sagas complejos y la coreografía para Sagas simples, o considera un enfoque híbrido.

Gestión de Errores y Retries 🚧

¿Qué pasa si un servicio falla al procesar un evento? JetStream, con sus consumidores duraderos y ManualAck, ayuda mucho. Si un servicio se cae antes de llamar msg.Ack(), el mensaje se volverá a entregar a otra instancia o a la misma cuando se recupere. Sin embargo, también debes considerar:

  • DLQ (Dead Letter Queue): Un lugar donde los mensajes que no pueden ser procesados después de varios reintentos son enviados para análisis manual.
  • Circuit Breaker: Patrón para evitar sobrecargar un servicio que ya está fallando.
  • Manejo de idempotencia: Asegurarse de que el reprocesamiento de eventos no cause efectos secundarios. Esto a menudo implica guardar un ID de transacción en la base de datos y comprobar si ya se procesó.

Observabilidad 📊

En un sistema distribuido, la observabilidad es clave. Implementa:

  • Logging: Registra eventos importantes y errores.
  • Métricas: Mide el rendimiento de los servicios y la latencia del procesamiento de eventos.
  • Tracing distribuido: Utiliza herramientas como OpenTelemetry para seguir la trayectoria de una solicitud o Saga a través de múltiples servicios.
Conocimiento del Saga

Conclusión 🎉

En este tutorial, hemos explorado los fundamentos del desarrollo de microservicios basados en eventos en Go, enfocándonos en la implementación del patrón Saga para gestionar transacciones distribuidas. Hemos visto cómo utilizar NATS JetStream para la comunicación confiable y persistente entre servicios, y hemos construido un ejemplo funcional de un sistema de pedidos y inventario.

Comprender y aplicar estos patrones es fundamental para construir sistemas de microservicios robustos, escalables y resilientes que puedan manejar la complejidad inherente a la distribución. Go, con sus capacidades de concurrencia y su rendimiento, es una herramienta formidable para esta tarea, y NATS proporciona la columna vertebral de mensajería que lo hace posible.

Continúa experimentando con este ejemplo, añadiendo más servicios, implementando más pasos en el Saga, o explorando otras características de NATS JetStream para profundizar tu comprensión.

Tutoriales relacionados

Comentarios (0)

Aún no hay comentarios. ¡Sé el primero!