tutoriales.com

Agregación Avanzada en MongoDB: Transformando Datos con el Pipeline de Agregación

Descubre el poder del pipeline de agregación de MongoDB, una herramienta esencial para el procesamiento y análisis de datos. Este tutorial te guiará a través de sus etapas clave, desde la selección y transformación hasta la agrupación y combinación, con ejemplos prácticos para que domines la manipulación de datos.

Intermedio18 min de lectura5 views
Reportar error

El pipeline de agregación es una de las características más potentes y versátiles de MongoDB. Permite procesar documentos en una colección a través de una serie de etapas, transformando los datos en resultados agregados. Desde cálculos simples hasta análisis complejos, el pipeline es fundamental para extraer información valiosa de tus bases de datos.

En este tutorial, exploraremos a fondo el pipeline de agregación, sus operadores y cómo construir consultas eficientes para transformar y analizar tus datos.

🎯 ¿Qué es el Pipeline de Agregación de MongoDB?

Imagina que tienes una fábrica de datos. El pipeline de agregación es como una cadena de montaje donde cada estación (o etapa) realiza una operación específica en el producto (tus documentos). Los documentos entran por un extremo, se procesan secuencialmente a través de las etapas, y al final, obtienes el producto terminado: los resultados agregados.

Este proceso es altamente flexible y permite realizar operaciones como:

  • Filtrado de documentos: Eliminar documentos que no cumplen ciertos criterios.
  • Transformación de documentos: Modificar la estructura de los documentos, añadir o eliminar campos.
  • Agrupación de documentos: Combinar documentos con valores comunes en campos específicos para realizar cálculos.
  • Unión de colecciones: Combinar datos de diferentes colecciones.
  • Ordenamiento y limitación: Organizar y restringir el número de resultados.

📖 Componentes Clave del Pipeline

El pipeline se compone de una secuencia de etapas, donde cada etapa procesa el resultado de la etapa anterior. Una etapa consiste en un operador de agregación y sus argumentos.

💡 Consejo: Piensa en cada etapa como una función que toma una serie de documentos y produce una nueva serie de documentos para la siguiente función.

🛠️ Estructura Básica de una Consulta de Agregación

Una consulta de agregación se ejecuta usando el método aggregate() en una colección. Se le pasa un array de objetos, donde cada objeto representa una etapa del pipeline.

db.collection.aggregate([
  { <etapa1> },
  { <etapa2> },
  { <etapa3> },
  ...
])

Veamos algunos ejemplos comunes de etapas y cómo se utilizan.

✨ Etapas Fundamentales del Pipeline

Exploraremos las etapas más comunes y útiles del pipeline de agregación. Para ello, usaremos una colección de ejemplo llamada pedidos con la siguiente estructura (simplificada):

[
  {
    "_id": "P001",
    "clienteId": "C001",
    "fecha": ISODate("2023-01-15T10:00:00Z"),
    "estado": "completado",
    "total": 120.50,
    "productos": [
      { "itemId": "A1", "nombre": "Laptop", "cantidad": 1, "precioUnitario": 120.50 }
    ]
  },
  {
    "_id": "P002",
    "clienteId": "C002",
    "fecha": ISODate("2023-01-16T11:30:00Z"),
    "estado": "pendiente",
    "total": 50.00,
    "productos": [
      { "itemId": "B2", "nombre": "Ratón", "cantidad": 2, "precioUnitario": 25.00 }
    ]
  },
  {
    "_id": "P003",
    "clienteId": "C001",
    "fecha": ISODate("2023-01-17T14:00:00Z"),
    "estado": "completado",
    "total": 30.00,
    "productos": [
      { "itemId": "C3", "nombre": "Teclado", "cantidad": 1, "precioUnitario": 30.00 }
    ]
  },
  {
    "_id": "P004",
    "clienteId": "C003",
    "fecha": ISODate("2023-02-01T09:00:00Z"),
    "estado": "completado",
    "total": 200.00,
    "productos": [
      { "itemId": "D4", "nombre": "Monitor", "cantidad": 1, "precioUnitario": 200.00 }
    ]
  },
  {
    "_id": "P005",
    "clienteId": "C002",
    "fecha": ISODate("2023-02-02T16:00:00Z"),
    "estado": "cancelado",
    "total": 75.00,
    "productos": [
      { "itemId": "E5", "nombre": "Auriculares", "cantidad": 3, "precioUnitario": 25.00 }
    ]
  }
]

1. $match: Filtrado de Documentos 🔎

El operador $match filtra los documentos para pasar solo aquellos que coinciden con los criterios especificados a la siguiente etapa. Funciona de manera similar a la operación find().

Ejemplo: Obtener solo los pedidos completados.

db.pedidos.aggregate([
  { $match: { estado: "completado" } }
])

Resultado esperado: Pedidos P001, P003, P004.

2. $project: Selección y Transformación de Campos 📝

El operador $project permite seleccionar los campos a incluir, excluir o renombrar, así como crear nuevos campos calculados. Puedes especificar 1 para incluir un campo, 0 para excluirlo (excepto _id), o usar expresiones para transformar.

Ejemplo: Mostrar solo _id, clienteId y total de los pedidos completados, y renombrar _id a idPedido.

db.pedidos.aggregate([
  { $match: { estado: "completado" } },
  { $project: { _id: 0, idPedido: "$_id", clienteId: 1, total: 1 } }
])

Resultado esperado (ejemplo para P001):

{
  "idPedido": "P001",
  "clienteId": "C001",
  "total": 120.50
}
🔥 Importante: Si incluyes un campo y no especificas `_id: 0`, el campo `_id` se incluirá por defecto. Si excluyes `_id: 0`, puedes incluir o excluir cualquier otro campo.

3. $group: Agrupación de Documentos 🤝

El operador $group es el corazón de la agregación. Agrupa documentos por un campo o expresión clave y luego calcula valores agregados para cada grupo utilizando operadores acumuladores (como $sum, $avg, $min, $max, $push, $addToSet).

Ejemplo: Calcular el total de ventas por cliente.

db.pedidos.aggregate([
  { $group: { _id: "$clienteId", totalVentas: { $sum: "$total" }, cantidadPedidos: { $sum: 1 } } }
])

Resultado esperado:

[
  { "_id": "C001", "totalVentas": 150.50, "cantidadPedidos": 2 },
  { "_id": "C002", "totalVentas": 125.00, "cantidadPedidos": 2 },
  { "_id": "C003", "totalVentas": 200.00, "cantidadPedidos": 1 }
]
📌 Nota: El campo `_id` dentro de `$group` define la clave de agrupación. Si usas `_id: null` o `_id: {}`, agrupará todos los documentos en un solo grupo.

4. $sort: Ordenamiento de Resultados 📊

El operador $sort ordena los documentos por uno o más campos. Se especifica 1 para orden ascendente y -1 para descendente.

Ejemplo: Obtener el total de ventas por cliente y ordenarlos de mayor a menor.

db.pedidos.aggregate([
  { $group: { _id: "$clienteId", totalVentas: { $sum: "$total" }, cantidadPedidos: { $sum: 1 } } },
  { $sort: { totalVentas: -1 } }
])

Resultado esperado:

[
  { "_id": "C003", "totalVentas": 200.00, "cantidadPedidos": 1 },
  { "_id": "C001", "totalVentas": 150.50, "cantidadPedidos": 2 },
  { "_id": "C002", "totalVentas": 125.00, "cantidadPedidos": 2 }
]

5. $limit y $skip: Paginación de Resultados 🔢

Estos operadores se usan para controlar la cantidad de documentos que pasan a la siguiente etapa o que son devueltos. Son ideales para paginación.

  • $limit: Restringe el número de documentos pasados.
  • $skip: Salta un número especificado de documentos.

Ejemplo: Obtener los 2 clientes con mayores ventas (excluyendo el primero).

db.pedidos.aggregate([
  { $group: { _id: "$clienteId", totalVentas: { $sum: "$total" } } },
  { $sort: { totalVentas: -1 } },
  { $skip: 1 },
  { $limit: 2 }
])

Resultado esperado:

[
  { "_id": "C001", "totalVentas": 150.50 },
  { "_id": "C002", "totalVentas": 125.00 }
]

🚀 Etapas Avanzadas del Pipeline

Más allá de las etapas básicas, MongoDB ofrece herramientas poderosas para manipular estructuras de datos complejas y combinar información de múltiples colecciones.

6. $unwind: Desestructuración de Arrays 💥

El operador $unwind deconstruye un campo de array de los documentos de entrada para generar un documento de salida para cada elemento del array. Esto es extremadamente útil cuando tienes arrays anidados y necesitas operar en elementos individuales del array.

Ejemplo: Obtener el detalle de cada producto vendido en todos los pedidos.

db.pedidos.aggregate([
  { $unwind: "$productos" }
])

Resultado esperado (ejemplo para P001):

{
  "_id": "P001",
  "clienteId": "C001",
  "fecha": ISODate("2023-01-15T10:00:00Z"),
  "estado": "completado",
  "total": 120.50,
  "productos": { "itemId": "A1", "nombre": "Laptop", "cantidad": 1, "precioUnitario": 120.50 }
}

Observa cómo el documento original se duplica para cada elemento del array productos. Ahora podemos, por ejemplo, calcular el ingreso total por cada item individual.

Ejemplo (continuación): Calcular el total vendido por cada producto.

db.pedidos.aggregate([
  { $unwind: "$productos" },
  { $group: { _id: "$productos.nombre", totalVendido: { $sum: { $multiply: ["$productos.cantidad", "$productos.precioUnitario"] } } } },
  { $sort: { totalVendido: -1 } }
])

Resultado esperado:

[
  { "_id": "Monitor", "totalVendido": 200.00 },
  { "_id": "Laptop", "totalVendido": 120.50 },
  { "_id": "Auriculares", "totalVendido": 75.00 },
  { "_id": "Ratón", "totalVendido": 50.00 },
  { "_id": "Teclado", "totalVendido": 30.00 }
]

7. $lookup: Uniones (Joins) de Colecciones 🔗

El operador $lookup realiza una operación de unión (left outer join) entre dos colecciones en la misma base de datos. Permite combinar documentos de una colección con documentos de otra colección, basándose en un campo común.

Necesitamos una colección clientes para este ejemplo:

[
  { "_id": "C001", "nombre": "Alice", "ciudad": "Madrid" },
  { "_id": "C002", "nombre": "Bob", "ciudad": "Barcelona" },
  { "_id": "C003", "nombre": "Charlie", "ciudad": "Valencia" }
]

Ejemplo: Obtener los pedidos junto con la información del cliente que los realizó.

db.pedidos.aggregate([
  {
    $lookup: {
      from: "clientes",         // La colección a unir
      localField: "clienteId",  // Campo de la colección de entrada (pedidos)
      foreignField: "_id",      // Campo de la colección a unir (clientes)
      as: "infoCliente"         // Nombre del nuevo campo de array con los documentos unidos
    }
  },
  { $unwind: "$infoCliente" } // Desestructurar para tener el cliente como objeto y no array
])

Resultado esperado (ejemplo para P001):

{
  "_id": "P001",
  "clienteId": "C001",
  "fecha": ISODate("2023-01-15T10:00:00Z"),
  "estado": "completado",
  "total": 120.50,
  "productos": [
    { "itemId": "A1", "nombre": "Laptop", "cantidad": 1, "precioUnitario": 120.50 }
  ],
  "infoCliente": { "_id": "C001", "nombre": "Alice", "ciudad": "Madrid" }
}
⚠️ Advertencia: `$lookup` puede ser costoso en colecciones muy grandes. Úsalo con moderación y asegúrate de que los campos `localField` y `foreignField` estén indexados si el rendimiento es crítico.

8. $addFields y $set: Añadir Nuevos Campos ✨

Estos operadores son muy similares y se usan para añadir nuevos campos a los documentos de entrada. $addFields es el original y $set fue introducido en MongoDB 4.2 como alias para $addFields (con algunas pequeñas diferencias de comportamiento en el edge case de reemplazar _id). Generalmente, $set es preferible por su sintaxis y consistencia con $set en otras operaciones.

Ejemplo: Añadir un campo importeNeto (calculado como total menos un 21% de IVA) y un campo añoPedido.

db.pedidos.aggregate([
  { $match: { estado: "completado" } },
  { $set: {
      importeNeto: { $divide: ["$total", 1.21] },
      añoPedido: { $year: "$fecha" }
    }
  },
  { $project: { _id: 0, clienteId: 1, total: 1, importeNeto: { $round: ["$importeNeto", 2] }, añoPedido: 1 } }
])

Resultado esperado (ejemplo para P001):

{
  "clienteId": "C001",
  "total": 120.50,
  "importeNeto": 99.59,
  "añoPedido": 2023
}

9. $out y $merge: Salida de Agregación a una Colección 💾

Una vez que has procesado tus datos con el pipeline, a menudo querrás guardar los resultados en una nueva colección o en una colección existente.

  • $out: Escribe los documentos resultantes del pipeline en una nueva colección. Si la colección ya existe, la reemplazará.
  • $merge: Escribe los documentos resultantes del pipeline en una colección especificada. Si la colección ya existe, $merge puede insertar nuevos documentos, actualizar documentos existentes o incluso reemplazar documentos, basándose en el comportamiento que se le configure.
⚠️ Advertencia: `$out` es destructivo, ¡reemplazará la colección de destino si existe! `$merge` ofrece un control más granular y es generalmente preferible para actualizaciones incrementales.

Ejemplo con $out: Guardar un resumen de ventas por cliente en una nueva colección resumenVentasClientes.

db.pedidos.aggregate([
  { $group: { _id: "$clienteId", totalVentas: { $sum: "$total" } } },
  { $out: "resumenVentasClientes" }
])

Ejemplo con $merge: Actualizar o insertar el resumen de ventas por cliente en resumenVentasClientes. Si un cliente ya existe, se actualiza el totalVentas; si no, se inserta.

db.pedidos.aggregate([
  { $group: { _id: "$clienteId", totalVentas: { $sum: "$total" } } },
  {
    $merge: {
      into: "resumenVentasClientes",
      on: "_id", // Campo clave para identificar documentos que ya existen
      whenMatched: "replace", // Qué hacer si el documento coincide: reemplazarlo
      whenNotMatched: "insert" // Qué hacer si no coincide: insertarlo
    }
  }
])

📈 Optimización y Buenas Prácticas

El pipeline de agregación es potente, pero también puede ser intensivo en recursos. Aquí hay algunas prácticas recomendadas para mejorar el rendimiento:

  • Prioriza $match y $project temprano: Filtra documentos con $match al principio para reducir la cantidad de datos que pasan por el pipeline. Proyecta solo los campos necesarios con $project temprano para reducir la huella de memoria.

  • Usa índices: Asegúrate de que los campos utilizados en $match, $sort y $lookup estén indexados. Esto acelera significativamente las operaciones.

  • Evita $unwind innecesarios: $unwind puede duplicar documentos y aumentar el tamaño del conjunto de datos. Úsalo solo cuando sea estrictamente necesario.

  • Limita los resultados: Usa $limit para restringir el número de documentos devueltos si no necesitas todos ellos.

  • Comprende el $explain: Utiliza db.collection.explain().aggregate([...]) para entender cómo MongoDB ejecuta tu pipeline y encontrar posibles cuellos de botella.

Diagrama del Flujo de Agregación

1. Colección de Entrada Documentos originales de la base de datos 2. Etapa $match Filtrado de documentos según criterios 3. Etapa $project Transformación y selección de campos 4. Etapa $unwind Desestructuración de campos tipo array 5. Etapa $group Agrupación y cálculo de acumuladores 6. Etapa $sort Ordenamiento de los resultados obtenidos 7. Etapa $limit/$skip Paginación y restricción de cantidad 8. Colección de Salida Resultados finales agregados y procesados
¿Por qué el orden de las etapas es importante? El orden de las etapas es crucial porque cada etapa opera sobre el resultado de la etapa anterior. Por ejemplo, si aplicas `$match` después de `$group`, el `$match` filtrará los resultados agrupados, no los documentos originales. Colocar `$match` y `$project` al principio puede mejorar drásticamente el rendimiento al reducir la cantidad de datos que deben ser procesados en etapas posteriores.
Pipeline Dominado

Conclusión

El pipeline de agregación de MongoDB es una herramienta increíblemente poderosa y flexible para el análisis y la transformación de datos. Desde filtrar y proyectar hasta agrupar y unir colecciones, sus etapas te permiten construir consultas complejas para extraer inteligencia de tu base de datos.

Al dominar sus operadores y entender el flujo de datos a través de cada etapa, podrás diseñar soluciones robustas para tus necesidades de reporte y análisis. ¡Experimenta con diferentes combinaciones de etapas y operadores para descubrir todo su potencial!

Tutoriales relacionados

Comentarios (0)

Aún no hay comentarios. ¡Sé el primero!