Mongo pipelines primer

Build a solid understanding of Mongo aggregation pipelines and pipeline stages.

Mongo pipelines primer


In this post, we attempt to understand Mongo aggregation pipeline.

An aggregation pipeline is required to do any kind of aggregation on MongoDB data. They allow us to perform Mongo equivalent for SQL's GROUP BY clause. We will see multiple examples of it.

We will write queries for following kind of scenarios:

  • Group on a particular field
  • Apply filter and group the result
  • Group and apply ordering on the grouped result
  • Group on embedded documents' fields


We will build on our previous post which is here. We will store GOT characters and their information in our Mongo database.

Let's start a Mongo database.

$ mongo

Let's create a collection called people.

> db.createCollection('people')

Let's insert some documents in this collection.

> db.people.insert({'name': 'Eddard Stark', 'house': 'Stark', 'location': 'Winterfell', 'age': 42})

> db.people.insertMany([{'name': 'Cat Stark', 'house': 'Tully', 'location': 'Winterfell', 'age': 40}, {'name': 'Arya Stark', 'house': 'Stark', 'location': 'Winterfell', 'age': 12}, {'name': 'Sansa Stark', 'house': 'Stark', 'location': 'Winterfell', 'age': 15}, {'name': 'Jon Snow', 'location': 'Winterfell', 'age': 18}])

> db.people.insertMany([{'name': 'Tywin Lannister', 'house': 'Lannister', 'location': 'Casterly Rock', 'age': 55}, {'name': 'Jamie Lannister', 'house': 'Lannister', 'location': 'Casterly Rock', 'age': 35}, {'name': 'Tyrion Lannister', 'house': 'Lannister', 'location': 'Casterly Rock', alias: 'Imp', age: 30}, {'name': 'Cersei Lannister', 'location': 'Casterly Rock', house: 'Lannister', age: 35}])

Aggregation basic

Aggregations are performed by using method aggregate() on a collection. aggregate method expects an array to be passed as an argument to it. The entries of this array are the aggregation stages. Hence multiple stages can be used during aggregation.

A very naive example could be just passing an empty array to aggregate. In such a case, we aren't applying any aggregation stage on the collection.

> db.people.aggregate([])

This would return all the documents of the collection since we didn't pass any aggregation stage.

There are multiple stages supported by Mongo, few of them are:

  • $match
  • $group
  • $sort
  • $limit
  • $unwind

Let's see usage of $match. This stage can be used to filter the documents.

> db.people.aggregate([{$match: {house: "Stark"}}])

The output would look like:

{ "_id" : ObjectId("61c33f5afa3c6ac0076acc0f"), "name" : "Eddard Stark", "house" : "Stark", "location" : "Winterfell", "age" : 42 }
{ "_id" : ObjectId("61c34002fa3c6ac0076acc11"), "name" : "Arya Stark", "house" : "Stark", "location" : "Winterfell", "age" : 12 }
{ "_id" : ObjectId("61c34002fa3c6ac0076acc12"), "name" : "Sansa Stark", "house" : "Stark", "location" : "Winterfell", "age" : 15 }

Here we passed a single aggregation stage to aggregate which is {$match: {house: "Stark"}}. $match is the stage name and the operand passed to it is a pipeline expression. Here, the pipeline expression is {house: "Stark"}. As is evident, pipeline expression has a similar structure as a document.

There are two things worth mentioning about aggregation stages:

  • Every stage produces a list of documents.
  • The output of an stage becomes the input of the next stage.

In our current example, applying stage $match produced a list of documents.

Let's say we want to modify each output document and add a field called words to it. In such case we can pass a second stage to aggregate.

> db.people.aggregate([
      {$match: {house: "Stark"}},
      {$addFields: {words: "Winter is coming!"}}

This would output:

{ "_id" : ObjectId("61c33f5afa3c6ac0076acc0f"), "name" : "Eddard Stark", "house" : "Stark", "location" : "Winterfell", "age" : 42, "words" : "Winter is coming!" }
{ "_id" : ObjectId("61c34002fa3c6ac0076acc11"), "name" : "Arya Stark", "house" : "Stark", "location" : "Winterfell", "age" : 12, "words" : "Winter is coming!" }
{ "_id" : ObjectId("61c34002fa3c6ac0076acc12"), "name" : "Sansa Stark", "house" : "Stark", "location" : "Winterfell", "age" : 15, "words" : "Winter is coming!" }

Hence, we passed two stages in this example. As can be noticed, the array passed to aggregate() has two entries, one corresponding to each stage. The first entry is {$match: {house: "Stark"}} and the second entry is {$addFields: {words: "Winter is coming!"}}.

The output of first stage, i.e $match would have been passed as input to second stage, i.e $addFields.

Stage $addFields expect a value which describes the fields that needs to be added and the value for each of those fields. In our example we added only a single field called words. We could have also added a second field, say sigil. In such case, it would have looked like the following:

{$addFields: {words: "Winter is coming!", sigil: "Direwolf"}}

Find all distinct houses

Finding distinct houses would require using stage $group. $group expects its operand to be a document. This document must have a field called _id. The value of _id should be the collection field on which we want the grouping to be applied. The query would look like:

> db.people.aggregate([
      {$group: {_id: "$house"}}

This would output

{ "_id" : null }
{ "_id" : "Lannister" }
{ "_id" : "Stark" }
{ "_id" : "Tully" }

We want the grouping to be applied on field house. The field name has to be prepended with a $, that's why we have used $house instead of house.
Say we had a field called region on each document, and if we wanted the grouping to be applied on the region, then we would have used $region as value for _id

We are seeing an entry for _id as null because document for Jon Snow doesn't have a house and hence the house can be considered as null for this document.

Find average age per house

This again would require using stage $group with method aggregate(). $group also allows adding computed fields on the output documents. In the last example the output documents only had _id as we only used _id while using $group.

$group can also accept accumulator expressions. Let's understand it with the example query to get average age per house. The query would look like:

> db.people.aggregate([
      {$group: {_id: "$house", averageAge: {$avg: "$age"}}}

The output would look like:

{ "_id" : "Stark", "averageAge" : 23 }
{ "_id" : null, "averageAge" : 18 }
{ "_id" : "Tully", "averageAge" : 40 }
{ "_id" : "Lannister", "averageAge" : 38.75 }

As can be noticed from the query, we have added a computed field called averageAge while using stage $group. We passed it the accumulator expression {$avg: "$age"}. The accumulator expression is similar to a document where there is a field and a value. We have used the accumulator operator $avg because we want to find an average. Similar to this, Mongo also provides accumulator operators like $max, $min, $count etc.

As we want average to be performed on field age, hence we have used $age as operand for the accumulator operator $avg.

Find number of members per house

We can use the following query to find number of members per house:

> db.people.aggregate([
      {$group: {_id: "$house", numMembers: {$count: {}}}}

The output would look like:

{ "_id" : "Stark", "numMembers" : 3 }
{ "_id" : null, "numMembers" : 1 }
{ "_id" : "Tully", "numMembers" : 1 }
{ "_id" : "Lannister", "numMembers" : 4 }

Sorting the aggregated result

Last query gave us number of members per house. We also want it sorted in decreasing order, so that the house with maximum members shows at the top.

> db.people.aggregate([
      {$group: {_id: "$house", numMembers: {$count: {}}}},
      {$sort: {numMembers: -1}}

We have modified the previous query and added a second stage called $sort to the aggregation pipeline. $group stage outputs a document where each document has a numMembers field. Hence the next stage, i.e $sort can apply a sort on this field. Hence we have passed {numMembers: -1}. We used -1 because we want ordering to be in descreasing order.

The output should look like:

{ "_id" : "Lannister", "numMembers" : 4 }
{ "_id" : "Stark", "numMembers" : 3 }
{ "_id" : null, "numMembers" : 1 }
{ "_id" : "Tully", "numMembers" : 1 }

Few more stages

There are several more stages which can come in handy at times.

One such stage is $limit. Let's assume our dataset is huge and after aggregation and ordering, the output has 1000s of documents. In such cases, we wouldn't want to work on all the documents and instead get first few documents. $limit can come handy in such scenarios.

Let's modify our previous example, to get first two houses with highest number of members.

> db.people.aggregate([
      {$group: {_id: "$house", numMembers: {$count: {}}}},
      {$sort: {numMembers: -1}},
      {$limit: 2}

Here, we added a third stage called $limit to our aggregation pipeline.

Aggregation on embedded documents

Let's assume we have a collection called houses which looks like the following:

{ "_id" : ObjectId("61ca90df0ee3a7a24c8f101a"), "name" : "Stark", "sigil" : "Direwolf", "seat" : "Winterfell", "words" : "Winter is coming", "members" : [ { "name" : "Ned Stark", "age" : 40 }, { "name" : "Catelyn Stark", "age" : 38 } ] }
{ "_id" : ObjectId("61ca93840ee3a7a24c8f101b"), "name" : "Lannister", "sigil" : "Lion", "seat" : "Casterly Rock", "words" : "Hear me roar", "members" : [ { "name" : "Jamie Lannister", "age" : 36 }, { "name" : "Tyrion Lannister", "age" : 34 } ] }
{ "_id" : ObjectId("61ca94ba0ee3a7a24c8f101c"), "name" : "Baratheon", "sigil" : "Stag", "seat" : "Storm's End", "words" : "Ours is the fury", "members" : [ { "name" : "Robert Baratheon", "age" : 40 }, { "name" : "Renly Baratheon", "age" : 34 } ] }

As can be noticed, each house has a field called members and each entry of members is an embedded document.

We want to find average age of members of each house for this collection. We would need a stage called $unwind in such case. $unwind can be passed a path to an embedded document, and then it can output a flat document for each input document.

Let's try it out:

> db.houses.aggregate( [ { $unwind : "$members" } ] )

This would output:

{ "_id" : ObjectId("61ca90df0ee3a7a24c8f101a"), "name" : "Stark", "sigil" : "Direwolf", "seat" : "Winterfell", "words" : "Winter is coming", "members" : { "name" : "Ned Stark", "age" : 40 } }
{ "_id" : ObjectId("61ca90df0ee3a7a24c8f101a"), "name" : "Stark", "sigil" : "Direwolf", "seat" : "Winterfell", "words" : "Winter is coming", "members" : { "name" : "Catelyn Stark", "age" : 38 } }
{ "_id" : ObjectId("61ca93840ee3a7a24c8f101b"), "name" : "Lannister", "sigil" : "Lion", "seat" : "Casterly Rock", "words" : "Hear me roar", "members" : { "name" : "Jamie Lannister", "age" : 36 } }
{ "_id" : ObjectId("61ca93840ee3a7a24c8f101b"), "name" : "Lannister", "sigil" : "Lion", "seat" : "Casterly Rock", "words" : "Hear me roar", "members" : { "name" : "Tyrion Lannister", "age" : 34 } }
{ "_id" : ObjectId("61ca94ba0ee3a7a24c8f101c"), "name" : "Baratheon", "sigil" : "Stag", "seat" : "Storm's End", "words" : "Ours is the fury", "members" : { "name" : "Robert Baratheon", "age" : 40 } }
{ "_id" : ObjectId("61ca94ba0ee3a7a24c8f101c"), "name" : "Baratheon", "sigil" : "Stag", "seat" : "Storm's End", "words" : "Ours is the fury", "members" : { "name" : "Renly Baratheon", "age" : 34 } }

This looks like a flat document, hence we can add one more stage in the aggregation pipeline which can group by name and find average of ages for each house.

> db.houses.aggregate( [ { $unwind : "$members" }, {$group: {_id: "$name", average: {$avg: "$members.age"}}} ] )

The output should look like:

{ "_id" : "Stark", "average" : 39 }
{ "_id" : "Lannister", "average" : 35 }
{ "_id" : "Baratheon", "average" : 37 }

Hope this post clarifies few concepts around aggregation pipelines. Stay tuned for more posts on Mongo.