`

Hadoop ChainMap

 
阅读更多

单一MapReduce对一些非常简单的问题提供了很好的支持。但是如果处理过程变得更加复杂,这种复杂性应该体现为更多地MapReduce工作,而不是更加复杂的map函数和reduce函数。

 

在hadoop 中一个Job中可以按顺序运行多个mapper对数据进行前期的处理,再进行reduce,经reduce后的结果可经个经多个按顺序执行的mapper进行后期的处理,这样的Job是不会保存中间结果的,并大大减少了I/O操作。

 

例如:在一个Job中,按顺序执行 Map1->Map2->Reduce->Map3->Map4 ,在这种链式结构中,要将Map2与Reduce看成这个MapReduce的核心部分,partitioning与shuffling(奇迹发生的地方)在此处才会被应用到。所以Map1作为前期处理,而Map3与Map4作为后期处理。

 

代码示例:

 

Configuration conf = getConf();
JobConf job = new JobConf(conf);

job.setJobName(“ChainJob”);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);

JobConf map1Conf = new JobConf(false);

//将map1加入到Job中
ChainMapper.addMapp(job,
                    Map1.class,
                    LongWritable.class, 
                    Text.class,
                    Text.class,
                    Text.class,
                    true,
                    map1Conf);

//将map2加入到Job中
JobConf map2Conf = new JobConf(false);
ChainMapper.addMapper(job,
                      Map.class,
                      Text.class,
                      Text.class, 
                      LongWritable.class,
                      Text.class,
                      true,
                      map2Conf);

//将Reduce加入到Job中
JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer(job,
                        Reduce.class,
                        LongWritable.class,
                        Text.class,
                        Text.class,
                        Text.class,
                        true,
                        reduceConf);

//将map3加入到Job中
JobConf map3Conf = new JobConf(false);
ChainReducer.addMapper(job,
                       Map3.class,
                       Text.class,
                       Text.class,
                       LongWritable.class, 
                       Text.class,
                       true,
                       map3Conf);

//将map4加入到Job中
JobConf map4Conf = new JobConf(false);
ChainReducer.addMapper(job,
                       Map4.class,
                       LongWritable.class,
                       Text.class, 
                       LongWritable.class,
                       Text.class,
                       true,
                       map4Conf);


JobClient.runJob(job);
 

 

 

addMapper方法的签名(setReducer方法与此类似)

 

public static <K1,V1,K2,V2> void

addMapper(JobConf job,

                    Class<? extends Mapper<K1,V1,K2,V2>> klass,

                    Class<? extends K1> inputKeyClass,

                    Class<? extends V1> inputValueClass,

                    Class<? extends K2> outputKeyClass,

                    Class<? extends V2> outputValueClass,

                    boolean byValue,

                    JobConf mapperConf)

 

关于byValue参数:

 

在标准的Mapper模型中,键/值对的输出在序列化之后写入磁盘,等待被shuffer到一个可能完全不同的节点上。形式上认为这个过程采用的是值传递。发送的是键/值对副本。在目前的情况下,我们可以将一个Mapper与另一个相链接,在相同的JVM线程中一起执行。因此,键/值对的发送有可能采用引用传递,初始Mapper的输出放在内存中,后续的Mapper直接引用相同的内存位置。当Map1调用context.write(K k, V v)时,对象k和v直接传递给Map2的map方法。mapper之间可能有大量的数据需要传递,避免复制这些数据可以让性能得以提高。但是这么做违背了Hadoop中MapReduce Model的一个微妙约定:对Context.write(K k, V v)的调用一定不会改变k和v的内容。

如果我们可以确信Map1的map()方法在调用Context.write(k, v)之后不会再使用k和v的内容,或者Map2并不改变k和v在其上的输入值,可以通过设定byValue为false来获得一定的性能提升。如果我们对Mapper的内部代码不太了解,最好依旧采用值传递,确保Mapper会按预期的方式工作。

 

0
0
分享到:
评论
3 楼 brandNewUser 2014-09-18  
楼主你好,问个问题,为什么我写的如下的:
JobConf phase1JobConf = new JobConf(false);
        ChainMapper.addMapper(jobConf, Phase1Mapper.class, Text.class, History.class, Phase1KeyDecorator.class,
                BytesWritable.class, true, phase1JobConf);

        JobConf phase2ReducerConf = new JobConf(false);
        ChainReducer.setReducer(jobConf, Phase1Reducer.class, Phase1KeyDecorator.class, BytesWritable.class,
                Text.class, Text.class, true, phase2ReducerConf);

        JobConf phase3ChainJobConf = new JobConf(false);
        ChainMapper.addMapper(jobConf, Phase3ChainMapper.class, Text.class, Text.class, Phase2KeyDecorator.class,
                BytesWritable.class, true, phase3ChainJobConf);

会抛出错误:


Exception in thread "main" java.lang.IllegalArgumentException: The specified Mapper input key class does not match the previous Mapper's output key class.
at org.apache.hadoop.mapreduce.lib.chain.Chain.validateKeyValueTypes(Chain.java:695)
at org.apache.hadoop.mapred.lib.Chain.addMapper(Chain.java:104)

Hadoop2.2.0版本,提示在检查的时候Key/Value类型错误,难道是中间的Reducer类型没起作用?

2 楼 Genie13 2012-05-29  
有个问题,如何咋0.20.2这个版本实现ChainMApper
好奇怪啊,这个版本Job  Mapper Reducer 是新的api  可是ChainMapper ChainReducer 确是旧的api

0.22.0 这个版本是新的ChainMapper ChainReducer
可是1.0的这个又变成旧的ChainMapper ChainReducer
好奇怪啊
1 楼 chenwq 2012-05-27  
http://www.distream.org/?p=379

相关推荐

    hadoop2.7.3 hadoop.dll

    在windows环境下开发hadoop时,需要配置HADOOP_HOME环境变量,变量值D:\hadoop-common-2.7.3-bin-master,并在Path追加%HADOOP_HOME%\bin,有可能出现如下错误: org.apache.hadoop.io.nativeio.NativeIO$Windows....

    《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf

    《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf...

    Hadoop下载 hadoop-2.9.2.tar.gz

    Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo 的工程师 Doug Cutting 和 Mike Cafarella Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo...

    Hadoop下载 hadoop-3.3.3.tar.gz

    Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进 Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不...

    Hadoop权威指南 中文版

    本书从hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍hado叩这一高性能处理海量数据集的理想工具。全书共14章,3个附录,涉及的主题包括:haddoop简介:mapreduce简介:hadoop分布式文件系统;hadoop的i...

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    hadoop-3.3.4 版本(最新版)

    Apache Hadoop (hadoop-3.3.4.tar.gz)项目为可靠、可扩展的分布式计算开发开源软件。官网下载速度非常缓慢,因此将hadoop-3.3.4 版本放在这里,欢迎大家来下载使用! Hadoop 架构是一个开源的、基于 Java 的编程...

    hadoop配置资源 ,hadoop-3.0.0,hadoop.dll,winutils

    调用保存文件的算子,需要配置Hadoop依赖 将文件夹中的 hadoop-3.0.0 解压到电脑任意位置 在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’ winutils.exe,并放入Hadoop解压...

    Hadoop集群pdf文档

    Hadoop 集群配置详解 Hadoop_Hadoop集群(第1期)_CentOS安装配置 Hadoop_Hadoop集群(第2期)_机器信息分布表 Hadoop_Hadoop集群(第4期)_SecureCRT使用 Hadoop_Hadoop集群(第5期)_Hadoop安装配置 Hadoop_Hadoop...

    hadoop2.7.3 Winutils.exe hadoop.dll

    hadoop2.7.3 Winutils.exe hadoop.dll

    hadoop的dll文件 hadoop.zip

    hadoop的dll文件 hadoop.zip

    hadoop_tutorial hadoop入门经典

    hadoop_tutorial hadoop入门经典 Hadoop 是一个能够对大量数据进行分布式处理的软件框架。Hadoop 是可靠的,因为它假设计算元素和存储会失败,因此它维护多个工作数据副本,确保能够针对失败的节点重新分布处理。...

    Hadoop多版本 hadoop.dll和winutils.exe 下载

    支持如下版本的Hadoop hadoop-2.6.0 hadoop-2.6.3 hadoop-2.6.4 hadoop-2.7.1 hadoop-2.8.1 hadoop-2.8.3 hadoop-3.0.0

    hadoop2.6.0 hadoop.dll包括winutils.exe

    hadoop2.6.0 hadoop.dll包括winutils.exe

    Hadoop开发环境的插件hadoop-eclipse-plugin-2.10.1

    Hadoop Eclipse是Hadoop开发环境的插件,用户在创建Hadoop程序时,Eclipse插件会自动导入Hadoop编程接口的jar文件,这样用户就可以在Eclipse插件的图形界面中进行编码、调试和运行Hadop程序,也能通过Eclipse插件...

    hadoop-3.1.3安装包

    Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合...

    hadoop的hadoop.dll和winutils.exe

    hadoop hadoop的hadoop.dll和winutils.exe 解决方法, 把winutils.exe加入你的hadoop-x.x.x/bin下 Could not locate executable null\bin\winutils.exe in the Hadoop binaries

    Hadoop集群程序设计与开发

    《Hadoop集群程序设计与开发(数据科学与大数据技术专业系列规划教材)》系统地介绍了基于Hadoop的大数据处理和系统开发相关技术,包括初识Hadoop、Hadoop基础知识、Hadoop开发环境配置与搭建、Hadoop分布式文件系统、...

    windows64位hadoop2.7.7版本hadoop.dll

    windows下做hadoop入门,会出现hdfs报错,2.7.7版本兼容 windows下做hadoop入门,会出现hdfs报错,2.7.7版本兼容 windows下做hadoop入门,会出现hdfs报错,2.7.7版本兼容

    Hadoop大数据资料集锦

    Hadoop大数据资料集锦Hadoop大数据资料集锦Hadoop大数据资料集锦Hadoop大数据资料集锦

Global site tag (gtag.js) - Google Analytics