摘要

   MapReduce是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 Map 函数处理一个基于 key/value pair 的数据集合,输出中间的基于 key/value pair 的数据集合;然后再创建一个 Reduce 函数用来合并所有的具有相同中间 key 值的中间 value 值。现实世界中有很多满足上述处理模型的例子,本论文将详细描述这个模型。
   MapReduce 架构的程序能够在大量的普通配置的计算机上实现并行化处理。这个系统在运行时只关心:如何分割输入数据,在大量计算机组成的集群上的调度,集群中计算机的错误处理,管理集群中计算机之间必要的通信。采用 MapReduce 架构可以使那些没有并行计算和分布式处理系统开发经验的程序员有效利用分布式系统的丰富资源。
  我们的 MapReduce 实现运行在规模可以灵活调整的由普通机器组成的集群上:一个典型的 MapReduce计算往往由几千台机器组成、处理以 TB 计算的数据。程序员发现这个系统非常好用:已经实现了数以百计的 MapReduce 程序,在 Google 的集群上,每天都有 1000 多个 MapReduce 程序在执行。

1 介绍

  在过去的 5 年里,包括本文作者在内的 Google 的很多程序员,为了处理海量的原始数据,已经实现了数以百计的、专用的计算方法。这些计算方法用来处理大量的原始数据,比如,文档抓取(类似网络爬虫的程序)、Web 请求日志等等;也为了计算处理各种类型的衍生数据,比如倒排索引、Web 文档的图结构的各种表示形势、每台主机上网络爬虫抓取的页面数量的汇总、每天被请求的最多的查询的集合等等。大多数这样的数据处理运算在概念上很容易理解。然而由于输入的数据量巨大,因此要想在可接受的时间内完成运算,只有将这些计算分布在成百上千的主机上。如何处理并行计算、如何分发数据、如何处理错误?所有这些问题综合在一起,需要大量的代码处理,因此也使得原本简单的运算变得难以处理。
  为了解决上述复杂的问题,我们设计一个新的抽象模型,使用这个抽象模型,我们只要表述我们想要执行的简单运算即可,而不必关心并行计算、容错、数据分布、负载均衡等复杂的细节,这些问题都被封装在了一个库里面。设计这个抽象模型的灵感来自 Lisp 和许多其他函数式语言的 Map 和 Reduce 的原语。我们意识到我们大多数的运算都包含这样的操作:在输入数据的“逻辑”记录上应用 Map 操作得出一个中间 key/value pair 集合,然后在所有具有相同 key 值的 value 值上应用 Reduce 操作,从而达到合并中间的数据,得到一个想要的结果的目的。使用 MapReduce 模型,再结合用户实现的 Map 和 Reduce 函数,我们就可以非常容易的实现大规模并行化计算;通过 MapReduce 模型自带的“再次执行”(re-execution)功能,也提供了初级的容灾实现方案。
  这个工作(实现一个 MapReduce 框架模型)的主要贡献是通过简单的接口来实现自动的并行化和大规模的分布式计,通过使用 MapReduce 模型接口实现在大量普通的 PC 机上高性能计算。
  第二部分描述基本的编程模型和一些使用案例。
  第三部分描述了一个经过裁剪的、适合我们的基于集群的计算环境的 MapReduce 实现。
  第四部分描述我们认为在 MapReduce 编程模型中一些实用的技巧。
  第五部分对于各种不同的任务,测量我们 MapReduce 实现的性能。
  第六部分揭示了在 Google 内部如何使用 MapReduce 作为基础重写我们的索引系统产品,包括其它一些使用 MapReduce 的经验。
  第七部分讨论相关的和未来的工作。

2 编程模型

  MapReduce 编程模型的原理是:利用一个输入 key/value pair 集合来产生一个输出的 key/value pair 集合。MapReduce 库的用户用两个函数表达这个计算:Map 和 Reduce。
  用户自定义的 Map 函数接受一个输入的 key/value pair 值,然后产生一个中间 key/value pair 值的集合。
MapReduce 库把所有具有相同中间 key 值 I 的中间 value 值集合在一起后传递给 reduce 函数。
  用户自定义的 Reduce 函数接受一个中间 key 的值 I 和相关的一个 value 值的集合。Reduce 函数合并这些
value 值,形成一个较小的 value 值的集合。一般的,每次 Reduce 函数调用只产生 0 或 1 个输出 value 值。通常我们通过一个迭代器把中间 value 值提供给 Reduce 函数,这样我们就可以处理无法全部放入内存中的大量的 value 值的集合。

2.1 例子

例如,计算一个大的文档集合中每个单词出现的次数,下面是伪代码段:

map(String key, String value):
  // key: document name
  // value: document contents
  for each word w in value:
    EmitIntermediate(w, “1″);
reduce(String key, Iterator values):
  // key: a word
  // values: a list of counts
  int result = 0;
  for each v in values:
    result += ParseInt(v);
  Emit(AsString(result));

  Map 函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是 1)。Reduce 函数把 Map函数产生的每一个特定的词的计数累加起来。
  另外,用户编写代码,使用输入和输出文件的名字、可选的调节参数来完成一个符合 MapReduce 模型规范的对象,然后调用 MapReduce 函数,并把这个规范对象传递给它。用户的代码和 MapReduce 库链接在一起(用 C++实现)。附录 A 包含了这个实例的全部程序代码。

2.2 类型

  尽管在前面例子的伪代码中使用了以字符串表示的输入输出值,但是在概念上,用户定义的Map和Reduce函数都有相关联的类型:

map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)

  比如,输入的 key 和 value 值与输出的 key 和 value 值在类型上推导的域不同。此外,中间 key 和 value值与输出 key 和 value 值在类型上推导的域相同。
  我们的 C++中使用字符串类型作为用户自定义函数的输入输出,用户在自己的代码中对字符串进行适当的类型转换。

2.3 更多的例子

  这里还有一些有趣的简单例子,可以很容易的使用 MapReduce 模型来表示:
  分布式的 Grep:Map 函数输出匹配某个模式的一行,Reduce 函数是一个恒等函数,即把中间数据复制到输出。
  计算 URL 访问频率:Map 函数处理日志中 web 页面请求的记录,然后输出(URL,1)。Reduce 函数把相同URL 的 value 值都累加起来,产生(URL,记录总数)结果。
  倒转网络链接图:Map 函数在源页面(source)中搜索所有的链接目标(target)并输出为(target,source)。Reduce 函数把给定链接目标(target)的链接组合成一个列表,输出(target,list(source))。
  每个主机的检索词向量:检索词向量用一个(词,频率)列表来概述出现在文档或文档集中的最重要的一些词。Map 函数为每一个输入文档输出(主机名,检索词向量),其中主机名来自文档的 URL。Reduce 函数接收给定主机的所有文档的检索词向量,并把这些检索词向量加在一起,丢弃掉低频的检索词,输出一个最终的(主机名,检索词向量)。
  倒排索引:Map 函数分析每个文档输出一个(词,文档号)的列表,Reduce 函数的输入是一个给定词的所有(词,文档号),排序所有的文档号,输出(词,list(文档号))。所有的输出集合形成一个简单的倒排索引,它以一种简单的算法跟踪词在文档中的位置。
  分布式排序:Map 函数从每个记录提取 key,输出(key,record)。Reduce 函数不改变任何的值。这个运算依赖分区机制(在 4.1 描述)和排序属性(在 4.2 描述)。

3 实现

  MapReduce 模型可以有多种不同的实现方式。如何正确选择取决于具体的环境。例如,一种实现方式适用于小型的共享内存方式的机器,另外一种实现方式则适用于大型 NUMA 架构的多处理器的主机,而有的实现方式更适合大型的网络连接集群。
  本章节描述一个适用于 Google 内部广泛使用的运算环境的实现:用以太网交换机连接、由普通 PC 机组成的大型集群。在我们的环境里包括:

  1. x86 架构、运行 Linux 操作系统、双处理器、2-4GB 内存的机器。
  2. 普通的网络硬件设备,每个机器的带宽为百兆或者千兆,但是远小于网络的平均带宽的一半。
  3. 集群中包含成百上千的机器,因此,机器故障是常态。
  4. 存储为廉价的内置 IDE 硬盘。一个内部分布式文件系统用来管理存储在这些磁盘上的数据。文件系统通过数据复制来在不可靠的硬件上保证数据的可靠性和有效性。
  5. 用户提交工作(job)给调度系统。每个工作(job)都包含一系列的任务(task),调度系统将这些任务调度到集群中多台可用的机器上。

3.1 执行概括

  通过将 Map 调用的输入数据自动分割为 M 个数据片段的集合,Map 调用被分布到多台机器上执行。输入的数据片段能够在不同的机器上并行处理。使用分区函数将 Map 调用产生的中间 key 值分成 R 个不同分区(例如,hash(key) mod R),Reduce 调用也被分布到多台机器上执行。分区数量(R)和分区函数由用户来指定。
图 1

图 1

  图 1 展示了我们的 MapReduce 实现中操作的全部流程。当用户调用 MapReduce 函数时,将发生下面的一系列动作(下面的序号和图 1 中的序号一一对应):

  1. 用户程序首先调用的 MapReduce 库将输入文件分成 M 个数据片度,每个数据片段的大小一般从16MB 到 64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。
  2. 这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是 worker 程序,由 master 分配任务。有 M 个 Map 任务和 R 个 Reduce 任务将被分配,master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker。
  3. 被分配了 map 任务的 worker 程序读取相关的输入数据片段,从输入的数据片段中解析出 key/value pair,然后把 key/value pair 传递给用户自定义的 Map 函数,由 Map 函数生成并输出的中间 key/value pair,并缓存在内存中。
  4. 缓存中的 key/value pair 通过分区函数分成 R 个区域,之后周期性的写入到本地磁盘上。缓存的key/value pair 在本地磁盘上的存储位置将被回传给 master,由 master 负责把这些存储位置再传送给Reduce worker。
  5. 当 Reduce worker 程序接收到 master 程序发来的数据存储位置信息后,使用 RPC 从 Map worker 所在主机的磁盘上读取这些缓存数据。当 Reduce worker 读取了所有的中间数据后,通过对 key 进行排序后使得具有相同 key 值的数据聚合在一起。由于许多不同的 key 值会映射到相同的 Reduce 任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。
  6. Reduce worker 程序遍历排序后的中间数据,对于每一个唯一的中间 key 值,Reduce worker 程序将这个 key 值和它相关的中间 value 值的集合传递给用户自定义的 Reduce 函数。Reduce 函数的输出被追加到所属分区的输出文件。
  7. 当所有的 Map 和 Reduce 任务都完成之后,master 唤醒用户程序。在这个时候,在用户程序里的对MapReduce 调用才返回。

  在成功完成任务之后,MapReduce 的输出存放在 R 个输出文件中(对应每个 Reduce 任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这 R 个输出文件合并成一个文件–他们经常把这些文件作为另外一个 MapReduce 的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。

3.2 Master 数据结构

  Master 持有一些数据结构,它存储每一个 Map 和 Reduce 任务的状态(空闲、工作中或完成),以及 Worker机器(非空闲任务的机器)的标识。
  Master 就像一个数据管道,中间文件存储区域的位置信息通过这个管道从 Map 传递到 Reduce。因此,对于每个已经完成的 Map 任务,master 存储了 Map 任务产生的 R 个中间文件存储区域的大小和位置。当 Map任务完成时,Master 接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的 Reduce 任务。

3.3 容错

  因为 MapReduce 库的设计初衷是使用由成百上千的机器组成的集群来处理超大规模的数据,所以,这个库必须要能很好的处理机器故障。

3.3.1 worker 故障

  master 周期性的 ping 每个 worker。如果在一个约定的时间范围内没有收到 worker 返回的信息,master 将把这个 worker 标记为失效。所有由这个失效的 worker 完成的 Map 任务被重设为初始的空闲状态,之后这些任务就可以被安排给其他的 worker。同样的,worker 失效时正在运行的 Map 或 Reduce 任务也将被重新置为空闲状态,等待重新调度。
  当 worker 故障时,由于已经完成的 Map 任务的输出存储在这台机器上,Map 任务的输出已不可访问了,因此必须重新执行。而已经完成的 Reduce 任务的输出存储在全局文件系统上,因此不需要再次执行。
  当一个 Map 任务首先被 worker A 执行,之后由于 worker A 失效了又被调度到 worker B 执行,这个“重新执行”的动作会被通知给所有执行 Reduce 任务的 worker。任何还没有从 worker A 读取数据的 Reduce 任务将从 worker B 读取数据。
  MapReduce 可以处理大规模 worker 失效的情况。比如,在一个 MapReduce 操作执行期间,在正在运行的集群上进行网络维护引起 80 台机器在几分钟内不可访问了,MapReduce master 只需要简单的再次执行那些不可访问的 worker 完成的工作,之后继续执行未完成的任务,直到最终完成这个 MapReduce 操作。

3.3.2 master 失败

  一个简单的解决办法是让 master 周期性的将上面描述的数据结构(alex 注:指 3.2 节)的写入磁盘,即检查点(checkpoint)。如果这个 master 任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master 进程。然而,由于只有一个 master 进程,master 失效后再恢复是比较麻烦的,因此我们现在的实现是如果 master 失效,就中止 MapReduce 运算。客户可以检查到这个状态,并且可以根据需要重新执行 MapReduce操作。

3.3.3 在失效方面的处理机制

(alex 注:原文为”semantics in the presence of failures”)
  当用户提供的 Map 和 Reduce 操作是输入确定性函数(即相同的输入产生相同的输出)时,我们的分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。
  我们依赖对 Map 和 Reduce 任务的输出是原子提交的来完成这个特性。每个工作中的任务把它的输出写到私有的临时文件中。每个 Reduce 任务生成一个这样的文件,而每个 Map 任务则生成 R 个这样的文件(一个 Reduce 任务对应一个文)。当一个 Map 任务完成的时,worker 发送一个包含 R 个临时文件名的完成消息给 master。如果 master 从一个已经完成的 Map 任务再次接收到到一个完成消息,master 将忽略这个消息;否则,master 将这 R 个文件的名字记录在数据结构里。
  当 Reduce 任务完成时,Reduce worker 进程以原子的方式把临时文件重命名为最终的输出文件。如果同一个 Reduce 任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名操作执行。我们依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个 Reduce 任务产生的数据。
  使用 MapReduce 模型的程序员可以很容易的理解他们程序的行为,因为我们绝大多数的 Map 和 Reduce操作是确定性的,而且存在这样的一个事实:我们的失效处理机制等价于一个顺序的执行的操作。当 Map 或 /和 Reduce 操作是不确定性的时候,我们提供虽然较弱但是依然合理的处理机制。当使用非确定操作的时候,一个 Reduce 任务 R1 的输出等价于一个非确定性程序顺序执行产生时的输出。但是,另一个 Reduce 任务 R2的输出也许符合一个不同的非确定顺序程序执行产生的 R2 的输出。
  考虑 Map 任务 M 和 Reduce 任务 R1、R2 的情况。我们设定 e(Ri)是 Ri 已经提交的执行过程(有且仅有一个这样的执行过程)。当 e(R1)读取了由 M 一次执行产生的输出,而 e(R2)读取了由 M 的另一次执行产生的输出,导致了较弱的失效处理。

3.4 存储位置

  在我们的计算运行环境中,网络带宽是一个相当匮乏的资源。我们通过尽量把输入数据(由 GFS 管理)存储在集群中机器的本地磁盘上来节省网络带宽。GFS 把每个文件按 64MB 一个 Block 分隔,每个 Block 保存在多台机器上,环境中就存放了多份拷贝(一般是 3 个拷贝)。MapReduce 的 master 在调度 Map 任务时会考虑输入文件的位置信息,尽量将一个 Map 任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master 将尝试在保存有输入数据拷贝的机器附近的机器上执行 Map 任务(例如,分配到一个和包含输入数据的机器在一个 switch 里的 worker 机器上执行)。当在一个足够大的 cluster 集群上运行大型 MapReduce 操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。

3.5 任务粒度

  如前所述,我们把 Map 拆分成了 M 个片段、把 Reduce 拆分成 R 个片段执行。理想情况下,M 和 R 应当比集群中 worker 的机器数量要多得多。在每台 worker 机器都执行大量的不同任务能够提高集群的动态的负载均衡能力,并且能够加快故障恢复的速度:失效机器上执行的大量 Map 任务都可以分布到所有其他的 worker机器上去执行。
  但是实际上,在我们的具体实现中对 M 和 R 的取值都有一定的客观限制,因为 master 必须执行 O(M+R)次调度,并且在内存中保存 O(MR)个状态(对影响内存使用的因素还是比较小的:O(MR)块状态,大概每对 Map 任务/Reduce 任务 1 个字节就可以了)。
  更进一步,R 值通常是由用户指定的,因为每个 Reduce 任务最终都会生成一个独立的输出文件。实际使用时我们也倾向于选择合适的 M 值,以使得每一个独立任务都是处理大约 16M 到 64M 的输入数据(这样,上面描写的输入数据本地存储优化策略才最有效),另外,我们把 R 值设置为我们想使用的 worker 机器数量的小的倍数。我们通常会用这样的比例来执行MapReduce:M=200000,R=5000,使用 2000 台 worker 机器。

Last modification:December 14th, 2019 at 03:10 pm
如果觉得我的文章对你有用,请随意赞赏