非常完美方媛全部视频:MongoDB: 10. MapReduce

来源:百度文库 编辑:九乡新闻网 时间:2024/04/29 05:24:59
2011-04-03 00:09
在 MongoDB 上使用 Map/Reduce 进行并行 "统计" 很容易。
db.runCommand(
{
mapreduce : ,
map : ,
reduce :
[, query : ]
[, sort : ] for
[, limit : ] return
[, out : ]
[, keeptemp: < | >] true false
[, finalize : ]
[, scope : ]
[, verbose :  ] true
});
参数说明:
?mapreduce: 要操作的目标集合。
?map: 映射函数 (生成键值对序列,作为 reduce 函数参数)。
?reduce: 统计函数。
?query: 目标记录过滤。
?sort: 目标记录排序。
?limit: 限制目标记录数量。
?out: 统计结果存放集合 (不指定则使用临时集合,在客户端断开后自动删除)。
?keeptemp: 是否保留临时集合。
?finalize: 最终处理函数 (对 reduce 返回结果进行最终整理后存入结果集合)。
?scope: 向 map、reduce、finalize 导入外部变量。
?verbose: 显示详细的时间统计信息。
官方文档有几句话很重要:
map/reduce is invoked via a database.  The database creates a temporary collection to hold output of the operation. The collection is command cleaned up when the client connection closes, or when explicitly dropped. Alternatively, one can specify a permanent output collection name. map and reduce functions are written in JavaScript and execute on the server.
In sharded environments, data processing of map/reduce operations runs in parallel on all shards.
MapReduce jobs on a single mongod process are single threaded. This is due to a design limitation in current JavaScript engines. We are looking into alternatives to solve this issue, but for now if you want to parallelize your MapReduce jobs, you will need to either use sharding or do the aggregation client-side in your code.
先准备点简单的数据练练手。
> for (var i = 0; i < 1000; i++) {
... var u = { name : "user" + i, age : i % 40 + 1, sex : i % 2 };
... db.users.insert(u);
... }
> db.users.ensureIndex({name:1})
> db.users.ensureIndex({age:1})
> db.users.count()
1000
> db.users.find().limit(10)
{ "_id" : ObjectId("4c89bf5b24280691541787b8"), "name" : "user0", "age" : 1, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787b9"), "name" : "user1", "age" : 2, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787ba"), "name" : "user2", "age" : 3, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bb"), "name" : "user3", "age" : 4, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787bc"), "name" : "user4", "age" : 5, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bd"), "name" : "user5", "age" : 6, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787be"), "name" : "user6", "age" : 7, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bf"), "name" : "user7", "age" : 8, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787c0"), "name" : "user8", "age" : 9, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787c1"), "name" : "user9", "age" : 10, "sex" : 1 }
1. Map
Map 函数必须调用 emit(key, value) 返回键值对,使用 this 访问当前待处理的 Document。
> m = function() { emit(this.age, 1) }
function () {
emit(this.age, 1);
}
value 可以使用 JSON Object 传递 (支持多个属性值)。
例如:
emit(this.age, {count:1})
2. Reduce
Reduce 函数接收的参数类似 Group 效果,将 Map 返回的键值序列组合成 { key, [value1, value2, value3, value...] } 传递给 reduce。
> r = function(key, values) {
... var x = 0;
... values.forEach(function(v) { x += v });
... return x;
... }
function (key, values) {
var x = 0;
values.forEach(function (v) {x += v;});
return x;
}
Reduce 函数对这些 values 进行 "统计" 操作,返回结果可以使用 JSON Object。
3. Result
我们不必使用 runCommand,改用 db..mapReduce() 更方便一些。
> res = db.users.mapReduce(m, r)
{
"result" : "tmp.mr.mapreduce_1284097299_10",
"timeMillis" : 156,
"counts" : {
"input" : 1000,
"emit" : 1000,
"output" : 40
},
"ok" : 1,
}
> db[res.result].find()
{ "_id" : 1, "value" : 25 }
{ "_id" : 2, "value" : 25 }
{ "_id" : 3, "value" : 25 }
{ "_id" : 4, "value" : 25 }
{ "_id" : 5, "value" : 25 }
{ "_id" : 6, "value" : 25 }
{ "_id" : 7, "value" : 25 }
{ "_id" : 8, "value" : 25 }
{ "_id" : 9, "value" : 25 }
{ "_id" : 10, "value" : 25 }
{ "_id" : 11, "value" : 25 }
{ "_id" : 12, "value" : 25 }
{ "_id" : 13, "value" : 25 }
{ "_id" : 14, "value" : 25 }
{ "_id" : 15, "value" : 25 }
{ "_id" : 16, "value" : 25 }
{ "_id" : 17, "value" : 25 }
{ "_id" : 18, "value" : 25 }
{ "_id" : 19, "value" : 25 }
{ "_id" : 20, "value" : 25 }
has more
mapReduce() 将结果存储在 "tmp.mr.mapreduce_1284097299_10" 临时集合中。
4. Finalize
利用 finalize() 我们可以对 reduce() 的结果做进一步处理。
> f = function(key, value) { return {age:key, count:value}; }
function (key, value) {
return {age:key, count:value};
}
> res = db.users.mapReduce(m, r, {finalize:f})
{
"result" : "tmp.mr.mapreduce_1284098036_26",
"timeMillis" : 158,
"counts" : {
"input" : 1000,
"emit" : 1000,
"output" : 40
},
"ok" : 1,
}
> db[res.result].find()
{ "_id" : 1, "value" : { "age" : 1, "count" : 25 } }
{ "_id" : 2, "value" : { "age" : 2, "count" : 25 } }
{ "_id" : 3, "value" : { "age" : 3, "count" : 25 } }
{ "_id" : 4, "value" : { "age" : 4, "count" : 25 } }
{ "_id" : 5, "value" : { "age" : 5, "count" : 25 } }
{ "_id" : 6, "value" : { "age" : 6, "count" : 25 } }
{ "_id" : 7, "value" : { "age" : 7, "count" : 25 } }
{ "_id" : 8, "value" : { "age" : 8, "count" : 25 } }
{ "_id" : 9, "value" : { "age" : 9, "count" : 25 } }
{ "_id" : 10, "value" : { "age" : 10, "count" : 25 } }
{ "_id" : 11, "value" : { "age" : 11, "count" : 25 } }
{ "_id" : 12, "value" : { "age" : 12, "count" : 25 } }
{ "_id" : 13, "value" : { "age" : 13, "count" : 25 } }
{ "_id" : 14, "value" : { "age" : 14, "count" : 25 } }
{ "_id" : 15, "value" : { "age" : 15, "count" : 25 } }
{ "_id" : 16, "value" : { "age" : 16, "count" : 25 } }
{ "_id" : 17, "value" : { "age" : 17, "count" : 25 } }
{ "_id" : 18, "value" : { "age" : 18, "count" : 25 } }
{ "_id" : 19, "value" : { "age" : 19, "count" : 25 } }
{ "_id" : 20, "value" : { "age" : 20, "count" : 25 } }
has more
5. Options
我们还可以添加更多的控制细节。
> res = db.users.mapReduce(m, r, {query:{age:{$lt:10}}, sort:{name:1}, limit:5})
{
"result" : "tmp.mr.mapreduce_1284097888_25",
"timeMillis" : 20,
"counts" : {
"input" : 5,
"emit" : 5,
"output" : 3
},
"ok" : 1,
}
> db[res.result].find()
{ "_id" : 1, "value" : 2 }
{ "_id" : 2, "value" : 2 }
{ "_id" : 3, "value" : 1 }
6. Example
MapReduce 的作用不仅仅是 "统计",我们可以直接用这种在服务器端高速并发执行机制批量修改数据。
> m = function() { emit(this._id, this) }
function () {
emit(this._id, this);
}
> r = function(key, values) {
... update = function(v) {
...     db.users.update({_id:key}, {$inc:{age:1}}, false, false);
... }
... values.forEach(update);
... return key;
... }
function (key, values) {
update = function (v) {db.users.update({_id:key}, {$inc:{age:1}}, false, false);};
values.forEach(update);
return key;
}
> db.users.find().limit(10)
{ "_id" : ObjectId("4c89bf5b24280691541787b8"), "name" : "user0", "age" : 1, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787b9"), "name" : "user1", "age" : 2, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787ba"), "name" : "user2", "age" : 3, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bb"), "name" : "user3", "age" : 4, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787bc"), "name" : "user4", "age" : 5, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bd"), "name" : "user5", "age" : 6, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787be"), "name" : "user6", "age" : 7, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bf"), "name" : "user7", "age" : 8, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787c0"), "name" : "user8", "age" : 9, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787c1"), "name" : "user9", "age" : 10, "sex" : 1 }
> res = db.users.mapReduce(m, r, {limit:10})
{
"result" : "tmp.mr.mapreduce_1284098486_27",
"timeMillis" : 28,
"counts" : {
"input" : 10,
"emit" : 10,
"output" : 10
},
"ok" : 1,
}
> db.users.find().limit(10)
{ "_id" : ObjectId("4c89bf5b24280691541787b8"), "age" : 2, "name" : "user0", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787b9"), "age" : 3, "name" : "user1", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787ba"), "age" : 4, "name" : "user2", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bb"), "age" : 5, "name" : "user3", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787bc"), "age" : 6, "name" : "user4", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bd"), "age" : 7, "name" : "user5", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787be"), "age" : 8, "name" : "user6", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bf"), "age" : 9, "name" : "user7", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787c0"), "age" : 10, "name" : "user8", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787c1"), "age" : 11, "name" : "user9", "sex" : 1 }
7. PyMongo
最后当然得在 Python 调用一下。
In [1]: from pymongo import *
In [2]: conn = Connection()
In [3]: db = conn.test
In [4]: m = "function() { emit(this.age, 1); }"
In [5]: r = "function(key, values) { var x = 0; values.forEach(function(v){ x += v }); return x; }"
In [6]: res = db.users.map_reduce(m, r, True)
In [7]: for k in db[res["result"]].find(): print k
....:
{u'_id': 1.0, u'value': 24.0}
{u'_id': 2.0, u'value': 25.0}
{u'_id': 3.0, u'value': 25.0}
{u'_id': 4.0, u'value': 25.0}
{u'_id': 5.0, u'value': 25.0}
{u'_id': 6.0, u'value': 25.0}
{u'_id': 7.0, u'value': 25.0}
{u'_id': 8.0, u'value': 25.0}
{u'_id': 9.0, u'value': 25.0}
{u'_id': 10.0, u'value': 25.0}
{u'_id': 11.0, u'value': 26.0}
{u'_id': 12.0, u'value': 25.0}
{u'_id': 13.0, u'value': 25.0}
{u'_id': 14.0, u'value': 25.0}
{u'_id': 15.0, u'value': 25.0}
{u'_id': 16.0, u'value': 25.0}
{u'_id': 17.0, u'value': 25.0}
{u'_id': 18.0, u'value': 25.0}
{u'_id': 19.0, u'value': 25.0}
{u'_id': 20.0, u'value': 25.0}
{u'_id': 21.0, u'value': 25.0}
{u'_id': 22.0, u'value': 25.0}
{u'_id': 23.0, u'value': 25.0}
{u'_id': 24.0, u'value': 25.0}
{u'_id': 25.0, u'value': 25.0}
{u'_id': 26.0, u'value': 25.0}
{u'_id': 27.0, u'value': 25.0}
{u'_id': 28.0, u'value': 25.0}
{u'_id': 29.0, u'value': 25.0}
{u'_id': 30.0, u'value': 25.0}
{u'_id': 31.0, u'value': 25.0}
{u'_id': 32.0, u'value': 25.0}
{u'_id': 33.0, u'value': 25.0}
{u'_id': 34.0, u'value': 25.0}
{u'_id': 35.0, u'value': 25.0}
{u'_id': 36.0, u'value': 25.0}
{u'_id': 37.0, u'value': 25.0}
{u'_id': 38.0, u'value': 25.0}
{u'_id': 39.0, u'value': 25.0}
{u'_id': 40.0, u'value': 25.0}
附加参数也很容易。
In [10]: res = db.users.map_reduce(m, r, True, limit=10)
In [11]: res
Out[11]:
{u'counts': {u'emit': 10, u'input': 10, u'output': 10},
u'ok': 1.0,
u'result': u'tmp.mr.mapreduce_1284099468_31',
u'timeMillis': 20}
In [12]: for k in db[res["result"]].find(): print k
....:
{u'_id': 2.0, u'value': 1.0}
{u'_id': 3.0, u'value': 1.0}
{u'_id': 4.0, u'value': 1.0}
{u'_id': 5.0, u'value': 1.0}
{u'_id': 6.0, u'value': 1.0}
{u'_id': 7.0, u'value': 1.0}
{u'_id': 8.0, u'value': 1.0}
{u'_id': 9.0, u'value': 1.0}
{u'_id': 10.0, u'value': 1.0}
{u'_id': 11.0, u'value': 1.0}
In [13]: res = db.users.map_reduce(m, r, True, query={"age":{"$lt":20}})
In [14]: res
Out[14]:
{u'counts': {u'emit': 475, u'input': 475, u'output': 19},
u'ok': 1.0,
u'result': u'tmp.mr.mapreduce_1284099533_33',
u'timeMillis': 77}
In [15]: for k in db[res["result"]].find(): print k
....:
{u'_id': 1.0, u'value': 24.0}
{u'_id': 2.0, u'value': 25.0}
{u'_id': 3.0, u'value': 25.0}
{u'_id': 4.0, u'value': 25.0}
{u'_id': 5.0, u'value': 25.0}
{u'_id': 6.0, u'value': 25.0}
{u'_id': 7.0, u'value': 25.0}
{u'_id': 8.0, u'value': 25.0}
{u'_id': 9.0, u'value': 25.0}
{u'_id': 10.0, u'value': 25.0}
{u'_id': 11.0, u'value': 26.0}
{u'_id': 12.0, u'value': 25.0}
{u'_id': 13.0, u'value': 25.0}
{u'_id': 14.0, u'value': 25.0}
{u'_id': 15.0, u'value': 25.0}
{u'_id': 16.0, u'value': 25.0}
{u'_id': 17.0, u'value': 25.0}
{u'_id': 18.0, u'value': 25.0}
{u'_id': 19.0, u'value': 25.0}
更多细节参考官方文档。
http://blog.nosqlfan.com/html/617.html