MongoDB Aggregate Framework初探

MongoDB Aggregate Framework

MongoDB 2.1 多了新Feature - Aggregation Framework. 最近工作需要就稍微看了下,Mark之

Overview

Aggregation提供的功能map-reduce也能做(诸如统计平均值,求和等)。官方那个大胖子说这东西比map-reduce简单, map-reduce 我没用过, 不过从使用Aggregation的情况来看, 进行统计等操作还是蛮方便的。

总体而言,Aggregation就是类似Unix-like中的管道的概念,可以将很多数据流串起来,不同的数据处理阶段可以再上一个阶段的基础上再次加工。

Pipeline-Operator

比较常用的有:

$limit - 限制返回个数,你懂的
$group - 统计操作, 还提供了一系列子命令, $avg, $sum ….
$project - 可以重构数据
$skip - 你懂的
$unwind - 可以将一个包含数组的文档切分成多个, 比如你的文档有 中有个数组字段 A, A中有10个元素, 那么经过 $unwind处理后会产生10个文档,这些文档只有 字段 A不同
$sort - 排序
$match - 可以实现类似query的功能

Usage - Java

我在db中造了些数据(数据时随机生成的, 能用即可),没有建索引,文档结构如下:

Document结:
  {
   "_id":ObjectId("509944545"),
   "province":"海南",
   "age" : 21,
   "subjects":[
{
"name":"语文",
"score":53
},
{
"name":"数学",
"score":27
},
{
"name":"英语",
"score":35
}
 ],
   "name":"刘雨"
  }

接下来要实现两个功能:

  1. 统计上海学生平均年龄

  2. 统计每个省各科平均成绩

接下来一一道来

统计上海学生平均年龄

从这个需求来讲,要实现功能要有几个步骤: 1. 找出上海的学生. 2. 统计平均年龄 (当然也可以先算出所有省份的平均值再找出上海的)。如此思路也就清晰了

首先$match取出上海学生

{$match:{'province':'上海'}}

接下来$group统计平均年龄

{$group:{_id:’$province’,$avg:’$age’}}

$avg$group的子命令,用于求平均值,类似的还有 $sum, $max ….

上面两个命令等价于

select province, avg(age) 
from student 
where province = '上海'
group by province

下面是Java代码

Mongo m = new Mongo("localhost", 27017);
DB db = m.getDB("test");
DBCollection coll = db.getCollection("student");

/*创建 $match, 作用相当于query*/
DBObject match = new BasicDBObject("$match", new BasicDBObject("province", "上海"));

/* Group操作*/
DBObject groupFields = new BasicDBObject("_id", "$province");
groupFields.put("AvgAge", new BasicDBObject("$avg", "$age"));
DBObject group = new BasicDBObject("$group", groupFields);

/* 查看Group结果 */
AggregationOutput output = coll.aggregate(match, group); // 执行 aggregation命令
System.out.println(output.getCommandResult());

输出结果:

{ "serverUsed":"localhost/127.0.0.1:27017" ,        
  "result":[ 
{"_id":"上海", "AvgAge":32.09375}
],          
   "ok":1.0
 }

如此工程就结束了,再看另外一个需求

统计每个省各科平均成绩

首先更具数据库文档结构,subjects是数组形式,需要先开,然后再进行统计

主要处理步骤如下:

首先$unwind拆数组.

{$unwind:’$subjects’}

然后按照province, subject 分组并求各科目平均分

{$group:{
 _id:{
     subjname:"$subjects.name",   // 指定group字段之一 subjects.name, 并重命名为 subjname
     province:"$province"         // 指定group字段之一 province, 并重命名为 province(没变)
  },
 AvgScore:{
    $avg:"$subjects.score"        // 对 subjects.score 求平均
 }
 }

Java代码如下:
Mongo m = new Mongo(“localhost”, 27017);
DB db = m.getDB(“test”);
DBCollection coll = db.getCollection(“student”);

/* 创建 $unwind 操作, 用于切分数组*/
DBObject unwind = new BasicDBObject("$unwind", "$subjects");

/* Group操作*/
DBObject groupFields = new BasicDBObject("_id", new BasicDBObject("subjname", "$subjects.name").append("province", "$province"));
groupFields.put("AvgScore", new BasicDBObject("$avg", "$subjects.scores"));
DBObject group = new BasicDBObject("$group", groupFields);

/* 查看Group结果 */
AggregationOutput output = coll.aggregate(unwind, group);  // 执行 aggregation命令
System.out.println(output.getCommandResult());
  输出结果

{ "serverUsed":"localhost/127.0.0.1:27017" , 
"result":[ 
  { "_id":{"subjname":"英语", "province":"海南"}, "AvgScore":58.1} , 
  { "_id":{"subjname":"数学", "province":"海南"}, "AvgScore" : 60.485} ,
  { "_id":{"subjname":"语文", "province":"江西"}, "AvgScore" : 55.538} , 
  { "_id":{"subjname":"英语", "province":"上海"}, "AvgScore" : 57.65625} , 
  { "_id":{"subjname":"数学", "province":"广东"}, "AvgScore" : 56.690} , 
  { "_id":{"subjname":"数学", "province":"上海"}, "AvgScore" : 55.671875} ,
  { "_id":{"subjname":"语文", "province":"上海"}, "AvgScore" : 56.734375} , 
  { "_id":{"subjname":"英语", "province":"云南"}, "AvgScore" : 55.7301 } ,
  .
  .
  .
  .
 "ok" : 1.0

}

统计就此结束…. 稍等,似乎有点太粗糙了,虽然统计出来的,但是根本没法看,同一个省份的科目都不在一起. 囧, 接下来进行下加强.

支线任务: 将同一省份的科目成绩统计到一起


在这个步骤中我期望将结果展示位

'province':'xxxxx', avgscores:[ {'xxx':xxx}, ....] 这样的形式

要做的有一件事,在前面的统计结果的基础上,先用 $project 将平均分和成绩揉到一起,即形如下面的样子

{ "subjinfo" : { "subjname" : "英语" ,"AvgScores" : 58.1 } ,"province" : "海南" }

再按省份group, 将各科目的平均分push到一块, 命令如下:

首先 $project 重构group结果

{$project:{province:"$_id.province", subjinfo:{"subjname":"$_id.subjname", "avgscore":"$AvgScore"}}

使用 $group 再次分组

{$group:{_id:"$province", avginfo:{$push:"$subjinfo"}}}

Java 代码如下:
Mongo m = new Mongo(“localhost”, 27017);
DB db = m.getDB(“test”);
DBCollection coll = db.getCollection(“student”);

/* 创建 $unwind 操作, 用于切分数组*/
DBObject unwind = new BasicDBObject("$unwind", "$subjects");

/* Group操作*/
DBObject groupFields = new BasicDBObject("_id", new BasicDBObject("subjname", "$subjects.name").append("province", "$province"));
groupFields.put("AvgScore", new BasicDBObject("$avg", "$subjects.scores"));
DBObject group = new BasicDBObject("$group", groupFields);

/* Reshape Group Result*/
DBObject projectFields = new BasicDBObject();
projectFields.put("province", "$_id.province");
projectFields.put("subjinfo", new BasicDBObject("subjname","$_id.subjname").append("avgscore", "$AvgScore"));
DBObject project = new BasicDBObject("$project", projectFields);

/* 将结果push到一起*/
DBObject groupAgainFields = new BasicDBObject("_id", "$province");
groupAgainFields.put("avginfo", new BasicDBObject("$push", "$subjinfo"));
DBObject reshapeGroup = new BasicDBObject("$group", groupAgainFields);

/* 查看Group结果 */
AggregationOutput output = coll.aggregate(unwind, group, project, reshapeGroup);
System.out.println(output.getCommandResult());

结果如下:

{ "serverUsed" : "localhost/127.0.0.1:27017" , 
  "result" : [ 
   {"_id":"辽宁", "avginfo":[{"subjname":"数学", "avgscore":56.46666666666667}, {"subjname":"英语", "avgscore":52.093333333333334}, {"subjname":"语文", "avgscore":50.53333333333333}]} , 
   {"_id":"四川", "avginfo":[{"subjname":"数学", "avgscore":52.72727272727273}, {"subjname":"英语", "avgscore":55.90909090909091}, {"subjname":"语文", "avgscore":57.59090909090909}]} , 
   {"_id":"重庆", "avginfo":[{ "subjname":"语文", "avgscore":56.077922077922075}, { "subjname":"英语", "avgscore":54.84415584415584}, { "subjname":"数学", "avgscore":55.33766233766234}]} , 
   {"_id":"安徽", "avginfo":[{ "subjname":"英语", "avgscore":55.458333333333336}, { "subjname":"数学", "avgscore":54.47222222222222}, { "subjname":"语文", "avgscore":52.80555555555556}]} 
    .
    .
    .
   ] , "ok" : 1.0}

至此,功能也就完成了,呼

结语


Aggravation 这就介绍完了, 当然还有很多细节没说清楚,更多的资料可以参考MongoDB官方文档. 期待后期再深入挖掘其功能.