Mongo pipelines primer
Build a solid understanding of Mongo aggregation pipelines and pipeline stages.

Agenda
In this post, we attempt to understand Mongo aggregation pipeline.
An aggregation pipeline is required to do any kind of aggregation on MongoDB data. They allow us to perform Mongo equivalent for SQL's GROUP BY
clause. We will see multiple examples of it.
We will write queries for following kind of scenarios:
- Group on a particular field
- Apply filter and group the result
- Group and apply ordering on the grouped result
- Group on embedded documents' fields
Setup
We will build on our previous post which is here. We will store GOT characters and their information in our Mongo database.
Let's start a Mongo database.
$ mongo
Let's create a collection called people
.
> db.createCollection('people')
Let's insert some documents in this collection.
> db.people.insert({'name': 'Eddard Stark', 'house': 'Stark', 'location': 'Winterfell', 'age': 42})
> db.people.insertMany([{'name': 'Cat Stark', 'house': 'Tully', 'location': 'Winterfell', 'age': 40}, {'name': 'Arya Stark', 'house': 'Stark', 'location': 'Winterfell', 'age': 12}, {'name': 'Sansa Stark', 'house': 'Stark', 'location': 'Winterfell', 'age': 15}, {'name': 'Jon Snow', 'location': 'Winterfell', 'age': 18}])
> db.people.insertMany([{'name': 'Tywin Lannister', 'house': 'Lannister', 'location': 'Casterly Rock', 'age': 55}, {'name': 'Jamie Lannister', 'house': 'Lannister', 'location': 'Casterly Rock', 'age': 35}, {'name': 'Tyrion Lannister', 'house': 'Lannister', 'location': 'Casterly Rock', alias: 'Imp', age: 30}, {'name': 'Cersei Lannister', 'location': 'Casterly Rock', house: 'Lannister', age: 35}])
Aggregation basic
Aggregations are performed by using method aggregate()
on a collection. aggregate
method expects an array to be passed as an argument to it. The entries of this array are the aggregation stages. Hence multiple stages can be used during aggregation.
A very naive example could be just passing an empty array to aggregate
. In such a case, we aren't applying any aggregation stage on the collection.
> db.people.aggregate([])
This would return all the documents of the collection since we didn't pass any aggregation stage.
There are multiple stages supported by Mongo, few of them are:
- $match
- $group
- $sort
- $limit
- $unwind
Let's see usage of $match
. This stage can be used to filter the documents.
> db.people.aggregate([{$match: {house: "Stark"}}])
The output would look like:
{ "_id" : ObjectId("61c33f5afa3c6ac0076acc0f"), "name" : "Eddard Stark", "house" : "Stark", "location" : "Winterfell", "age" : 42 }
{ "_id" : ObjectId("61c34002fa3c6ac0076acc11"), "name" : "Arya Stark", "house" : "Stark", "location" : "Winterfell", "age" : 12 }
{ "_id" : ObjectId("61c34002fa3c6ac0076acc12"), "name" : "Sansa Stark", "house" : "Stark", "location" : "Winterfell", "age" : 15 }
Here we passed a single aggregation stage to aggregate
which is {$match: {house: "Stark"}}
. $match
is the stage name and the operand passed to it is a pipeline expression. Here, the pipeline expression is {house: "Stark"}
. As is evident, pipeline expression has a similar structure as a document.
There are two things worth mentioning about aggregation stages:
- Every stage produces a list of documents.
- The output of an stage becomes the input of the next stage.
In our current example, applying stage $match
produced a list of documents.
Let's say we want to modify each output document and add a field called words
to it. In such case we can pass a second stage to aggregate
.
> db.people.aggregate([
{$match: {house: "Stark"}},
{$addFields: {words: "Winter is coming!"}}
])
This would output:
{ "_id" : ObjectId("61c33f5afa3c6ac0076acc0f"), "name" : "Eddard Stark", "house" : "Stark", "location" : "Winterfell", "age" : 42, "words" : "Winter is coming!" }
{ "_id" : ObjectId("61c34002fa3c6ac0076acc11"), "name" : "Arya Stark", "house" : "Stark", "location" : "Winterfell", "age" : 12, "words" : "Winter is coming!" }
{ "_id" : ObjectId("61c34002fa3c6ac0076acc12"), "name" : "Sansa Stark", "house" : "Stark", "location" : "Winterfell", "age" : 15, "words" : "Winter is coming!" }
Hence, we passed two stages in this example. As can be noticed, the array passed to aggregate()
has two entries, one corresponding to each stage. The first entry is {$match: {house: "Stark"}}
and the second entry is {$addFields: {words: "Winter is coming!"}}
.
The output of first stage, i.e $match
would have been passed as input to second stage, i.e $addFields
.
Stage $addFields
expect a value which describes the fields that needs to be added and the value for each of those fields. In our example we added only a single field called words
. We could have also added a second field, say sigil
. In such case, it would have looked like the following:
{$addFields: {words: "Winter is coming!", sigil: "Direwolf"}}
Find all distinct houses
Finding distinct houses would require using stage $group
. $group
expects its operand to be a document. This document must have a field called _id
. The value of _id
should be the collection field on which we want the grouping to be applied. The query would look like:
> db.people.aggregate([
{$group: {_id: "$house"}}
])
This would output
{ "_id" : null }
{ "_id" : "Lannister" }
{ "_id" : "Stark" }
{ "_id" : "Tully" }
We want the grouping to be applied on field house
. The field name has to be prepended with a $
, that's why we have used $house
instead of house
.
Say we had a field called region
on each document, and if we wanted the grouping to be applied on the region, then we would have used $region
as value for _id
We are seeing an entry for _id
as null
because document for Jon Snow
doesn't have a house and hence the house can be considered as null for this document.
Find average age per house
This again would require using stage $group
with method aggregate()
. $group
also allows adding computed fields on the output documents. In the last example the output documents only had _id
as we only used _id
while using $group
.
$group
can also accept accumulator expressions. Let's understand it with the example query to get average age per house. The query would look like:
> db.people.aggregate([
{$group: {_id: "$house", averageAge: {$avg: "$age"}}}
])
The output would look like:
{ "_id" : "Stark", "averageAge" : 23 }
{ "_id" : null, "averageAge" : 18 }
{ "_id" : "Tully", "averageAge" : 40 }
{ "_id" : "Lannister", "averageAge" : 38.75 }
As can be noticed from the query, we have added a computed field called averageAge
while using stage $group
. We passed it the accumulator expression {$avg: "$age"}
. The accumulator expression is similar to a document where there is a field and a value. We have used the accumulator operator $avg
because we want to find an average. Similar to this, Mongo also provides accumulator operators like $max
, $min
, $count
etc.
As we want average to be performed on field age
, hence we have used $age
as operand for the accumulator operator $avg
.
Find number of members per house
We can use the following query to find number of members per house:
> db.people.aggregate([
{$group: {_id: "$house", numMembers: {$count: {}}}}
])
The output would look like:
{ "_id" : "Stark", "numMembers" : 3 }
{ "_id" : null, "numMembers" : 1 }
{ "_id" : "Tully", "numMembers" : 1 }
{ "_id" : "Lannister", "numMembers" : 4 }
Sorting the aggregated result
Last query gave us number of members per house. We also want it sorted in decreasing order, so that the house with maximum members shows at the top.
> db.people.aggregate([
{$group: {_id: "$house", numMembers: {$count: {}}}},
{$sort: {numMembers: -1}}
])
We have modified the previous query and added a second stage called $sort
to the aggregation pipeline. $group
stage outputs a document where each document has a numMembers
field. Hence the next stage, i.e $sort
can apply a sort on this field. Hence we have passed {numMembers: -1}
. We used -1
because we want ordering to be in descreasing order.
The output should look like:
{ "_id" : "Lannister", "numMembers" : 4 }
{ "_id" : "Stark", "numMembers" : 3 }
{ "_id" : null, "numMembers" : 1 }
{ "_id" : "Tully", "numMembers" : 1 }
Few more stages
There are several more stages which can come in handy at times.
One such stage is $limit
. Let's assume our dataset is huge and after aggregation and ordering, the output has 1000s of documents. In such cases, we wouldn't want to work on all the documents and instead get first few documents. $limit
can come handy in such scenarios.
Let's modify our previous example, to get first two houses with highest number of members.
> db.people.aggregate([
{$group: {_id: "$house", numMembers: {$count: {}}}},
{$sort: {numMembers: -1}},
{$limit: 2}
])
Here, we added a third stage called $limit
to our aggregation pipeline.
Aggregation on embedded documents
Let's assume we have a collection called houses
which looks like the following:
{ "_id" : ObjectId("61ca90df0ee3a7a24c8f101a"), "name" : "Stark", "sigil" : "Direwolf", "seat" : "Winterfell", "words" : "Winter is coming", "members" : [ { "name" : "Ned Stark", "age" : 40 }, { "name" : "Catelyn Stark", "age" : 38 } ] }
{ "_id" : ObjectId("61ca93840ee3a7a24c8f101b"), "name" : "Lannister", "sigil" : "Lion", "seat" : "Casterly Rock", "words" : "Hear me roar", "members" : [ { "name" : "Jamie Lannister", "age" : 36 }, { "name" : "Tyrion Lannister", "age" : 34 } ] }
{ "_id" : ObjectId("61ca94ba0ee3a7a24c8f101c"), "name" : "Baratheon", "sigil" : "Stag", "seat" : "Storm's End", "words" : "Ours is the fury", "members" : [ { "name" : "Robert Baratheon", "age" : 40 }, { "name" : "Renly Baratheon", "age" : 34 } ] }
As can be noticed, each house has a field called members
and each entry of members
is an embedded document.
We want to find average age of members of each house for this collection. We would need a stage called $unwind
in such case. $unwind
can be passed a path to an embedded document, and then it can output a flat document for each input document.
Let's try it out:
> db.houses.aggregate( [ { $unwind : "$members" } ] )
This would output:
{ "_id" : ObjectId("61ca90df0ee3a7a24c8f101a"), "name" : "Stark", "sigil" : "Direwolf", "seat" : "Winterfell", "words" : "Winter is coming", "members" : { "name" : "Ned Stark", "age" : 40 } }
{ "_id" : ObjectId("61ca90df0ee3a7a24c8f101a"), "name" : "Stark", "sigil" : "Direwolf", "seat" : "Winterfell", "words" : "Winter is coming", "members" : { "name" : "Catelyn Stark", "age" : 38 } }
{ "_id" : ObjectId("61ca93840ee3a7a24c8f101b"), "name" : "Lannister", "sigil" : "Lion", "seat" : "Casterly Rock", "words" : "Hear me roar", "members" : { "name" : "Jamie Lannister", "age" : 36 } }
{ "_id" : ObjectId("61ca93840ee3a7a24c8f101b"), "name" : "Lannister", "sigil" : "Lion", "seat" : "Casterly Rock", "words" : "Hear me roar", "members" : { "name" : "Tyrion Lannister", "age" : 34 } }
{ "_id" : ObjectId("61ca94ba0ee3a7a24c8f101c"), "name" : "Baratheon", "sigil" : "Stag", "seat" : "Storm's End", "words" : "Ours is the fury", "members" : { "name" : "Robert Baratheon", "age" : 40 } }
{ "_id" : ObjectId("61ca94ba0ee3a7a24c8f101c"), "name" : "Baratheon", "sigil" : "Stag", "seat" : "Storm's End", "words" : "Ours is the fury", "members" : { "name" : "Renly Baratheon", "age" : 34 } }
This looks like a flat document, hence we can add one more stage in the aggregation pipeline which can group by name and find average of ages for each house.
> db.houses.aggregate( [ { $unwind : "$members" }, {$group: {_id: "$name", average: {$avg: "$members.age"}}} ] )
The output should look like:
{ "_id" : "Stark", "average" : 39 }
{ "_id" : "Lannister", "average" : 35 }
{ "_id" : "Baratheon", "average" : 37 }
Hope this post clarifies few concepts around aggregation pipelines. Stay tuned for more posts on Mongo.