缘起: #
工作当中需要用了MongoDB作为数据库,python写的爬虫, 我们需要对爬取的数据做统计,想了想刚好 可以用MongoDB的MapReduce处理就可以,于是就研究了一下这个该如何使用,也翻阅了不少资料,但是在实际操作中还 是有不少出入的,所以写点总结,已方便日后翻阅
原理: #
所谓按图索骥,我们要知道MapReduce到底是解决什么样的问题,那么解决这个问题的方法有哪些,
有哪些缺点然后根据需求选择能用到的技术.我们的数据不多但也不少,如果全部放到程序层面做并行运算确实也没
那个必要,所以就使用了MongoDB的Mapreduce啦,其实MongoDB的Mapreduce有点类似于Mysql的group_by,就是
先map->映射,像分类,然后reduce统计计算,
实现: #
就像图中所示的这样,首先collections进入map,形成key-values的键值对,其中key就是分包的关键,你需要原始数据 种哪些数据为分包的关键值,比如id,部门号等等,
-
直接上代码:
var m=function(){ emit(this.key,this.values); }
其中key可以是map,values也一样,如:
var m=function(){
emit({reportDate:this.reportDate, symbol:this.symbol},
{totalChanges:this.change, totalOI:this.openInterest})
};
}
-
当数据分好后,就进入reduce做处理,这里就是做统计,计算,等等:
var r = function(key,values){ var res={totalChanges:0, totalOI:0} values.forEach(function(val){ res.totalChanges += val.totalChanges; res.totalOI += val.totalOI }); return res; } -
当我们数据处理好以后,就可以做运行
Command函数了db.runCommand({ mapreduce:"t",//这里是准备处理的程序 map:m,//map函数 reduce:r,//reduce函数 finalize:f,//最后处理的函数 out:"out_collections"//输出的集合名 } )
runCommand中的参数说明: #
- query : 文档,发往map函数前先给过渡文档]
- sort : 文档,发往map函数前先给文档排序]
- limit : 整数,发往map函数的文档数量上限]
- out : 字符串,统计结果保存的集合]
- keeptemp: 布尔值,链接关闭时临时结果集合是否保存]
- finalize : 函数,将reduce的结果送给这个函数,做最后的处理]
- scope : 文档,js代码中要用到的变量]
- jsMode : 布尔值,是否减少执行过程中BSON和JS的转换,默认true] //注:false时 BSON–>JS–>map–>BSON–>JS–>reduce–>BSON,可处理非常大的mapreduce,
//true时BSON–>js–>map–>reduce–>BSON - verbose : 布尔值,是否产生更加详细的服务器日志,默认true]
其中finalize函数会在执行完map、reduce后再对key和value进行一次计算并返回一个最终结果, 这是处理过程的最后一步,所以finalize就是一个计算平均数,剪裁数组,清除多余信息的恰当时机
整合进程序中: #
了解玩查询语句如何处理了,那么在程序中就很好解决了,在这里就不戏写了,我用官方文档的实例来说明:
-
定义map函数,Code接收一个字符串,字符串中为js代码,也就是MongoDB的’SQL’语句
mapper = Code(""" function () { this.tags.forEach(function(z) { emit(z, 1); }); } """) -
定义reduce函数:
reducer = Code(""" function (key, values) { var total = 0; for (var i = 0; i < values.length; i++) { total += values[i]; } return total; } """) -
传入map_reduce()方法中去:
result = db.things.map_reduce(mapper, reducer, "myresults")
拓展技巧: #
-
full_response=True传入map_redyce()中,返回完整的信息 -
limit query等参数效果等同于文中介绍的方法
-
例如query:
result = db.things.map_reduce(mapper, reducer, "myresults", query={"x": {"$lt": 2}}) -
inline_map_reduce(map, reduce, full_response=False, **kwargs)可以不保存到另一张表,而是返回成dict.
其他语言: #
我看了一眼其他语言的实现方法,基本大同小异,请参考该语言MongoDB驱动官方文档
结语: #
这个API的效率我不敢说,但感觉还是蛮快的,10万以上数据基本上毫秒单位就处理完了,MapReduce是大数据的基础数据结构, 我想其实就是通过这种数据结构服务器集群并行计算,最后统一归档.查询.