L
L

Hadoop Streaming介绍及使用

Hadoop MapReduce和HDFS采用Java实现,默认提供Java编程接口,另外提供了C++编程接口和Streaming框架。Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中使用,方便已有程序向Hadoop平台移植。

Streaming的原理是用Java实现一个包装用户程序的MapReduce程序,该程序负责调用MapReduce Java接口获取key/value对输入,创建一个新的进程启动包装的用户程序,将数据通过管道传递给包装的用户程序处理,然后调用MapReduce Java接口将用户程序的输出切分成key/value对输出。如下图所示,其中Streaming Java Mapper通过管道将key/value输入传递给用户mapper程序的标准输入,并获取用户mapper程序的标准输出;Streaming Java Reducer调用Java接口通过InputFormat从HDFS获取输入数据,从管道将key/value传递给用户 reducer程序的标准输入,获取用户reducer程序的标准输出并调用Java接口通过OutputFormat输出数据;用户mapper和reducer程序负责处理数据,都从标准输入读取数据,向标准输出写入数据。

Streaming有如下一些优点: 
1)开发效率高,很多现有程序(包括脚本)能够方便的移植到hadoop平台上去运行 
2)某些程序运行效率高,对于某些cpu密集型程序,如果map-reduce程序用C++编写,效率有可能提高 
Streaming存在如下一些不足: 

  1. Hadoop Streaming默认只能处理文本数据。 
  2. Streaming中的mapper和reducer默认只能向标准输出写数据,不能方便地处理多路输出。 
  3. 用Java编写的MapReduce程序直接处理框架从输入数据中得到的key/value对,在Streaming中Java程序不直接处理key/value对,而是通过管道写到mapper程序的标准输入,mapper程序再从key/tvalue中解析出key/value对,这个过程多了两次数据拷贝和解析(分割),带来一定的开销。对于reducer也是一样的。

streaming命令参数列表:

-input <path>

输入数据路径

-output <path>

输出数据路径

-mapper <cmd|JavaClassName>

mapper可执行程序或Java类

-reducer <cmd|JavaClassName>

reducer可执行程序或Java类

-file _<file> _Optional

分发本地文件

-cacheFile _<file> _Optional

分发HDFS文件

-cacheArchive _<file> _Optional

分发HDFS压缩文件

-numReduceTasks <num> Optional

reduce任务个数

-jobconf | -D NAME=VALUE Optional

作业配置参数

-combiner <JavaClassName> Optional

Combiner Java类

-partitioner <JavaClassName> Optional

Partitioner Java类

-inputformat _<JavaClassName>_Optional

InputFormat Java类

-outputformat _<JavaClassName>_Optional

OutputFormat Java类

-inputreader <spec> Optional

InputReader配置

-cmdenv <n>=<v> Optional

传给mapper和reducer的环境变量

-mapdebug <path> Optional

mapper失败时运行的debug程序

-reducedebug <path> Optional

reducer失败时运行的debug程序

-verbose Optional

详细输出模式

下面是对各个参数的详细说明:

-input <path>:指定作业输入,path可以是文件或者目录,可以使用*通配符,-input选项可以使用多次指定多个文件或目录作为输入。&#160;
-output <path>:指定作业输出目录,path必须不存在,而且执行作业的用户必须有创建该目录的权限,-output只能使用一次。&#160;
-mapper:指定mapper可执行程序或Java类,必须指定且唯一。&#160;
-reducer:指定reducer可执行程序或Java类,必须指定且唯一。&#160;
-file, -cacheFile, -cacheArchive:分别用于向计算节点分发本地文件、HDFS文件和HDFS压缩文件。&#160;
-numReduceTasks:指定reducer的个数,如果设置-numReduceTasks 0或者-reducer NONE则没有reducer程序,mapper的输出直接作为整个作业的输出。&#160;
-combiner:指定combiner Java类,对应的Java类文件打包成jar文件后用-file分发。&#160;
-partitioner:指定partitioner Java类,Streaming提供了一些实用的partitioner实现,参考_KeyBasedFiledPartitonerIntHashPartitioner_。&#160;
-inputformat, -outputformat:指定inputformat和outputformat Java类,用于读取输入数据和写入输出数据,分别要实现InputFormat和OutputFormat接口。如果不指定,默认使用TextInputFormat和TextOutputFormat。&#160;
-cmdenv NAME=VALUE:给mapper和reducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值。&#160;
-mapdebug, -reducedebug:分别指定mapper和reducer程序失败时运行的debug程序。&#160;
-verbose:指定输出详细信息,例如分发哪些文件,实际作业配置参数值等,可以用于调试。&#160;
-jobconf | -D NAME=VALUE:指定作业参数,NAME是参数名,VALUE是参数值,可以指定的参数参考hadoop-default.xml。特别建议用-jobconf mapred.job.name=’My Job Name’设置作业名,使用-jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW设置作业优先级,使用-jobconf mapred.job.map.capacity=M设置同时最多运行M个map任务,使用-jobconf mapred.job.reduce.capacity=N设置同时最多运行N个reduce任务。

常见的作业配置参数如下表所示:

mapred.job.name

作业名

mapred.job.priority

作业优先级

mapred.job.map.capacity

最多同时运行map任务数

mapred.job.reduce.capacity

最多同时运行reduce任务数

hadoop.job.ugi

作业执行权限

mapred.map.tasks

map任务个数

mapred.reduce.tasks

reduce任务个数

mapred.job.groups

作业可运行的计算节点分组

mapred.task.timeout

任务没有响应(输入输出)的最大时间

mapred.compress.map.output

map的输出是否压缩

mapred.map.output.compression.codec

map的输出压缩方式

mapred.output.compress

reduce的输出是否压缩

mapred.output.compression.codec

reduce的输出压缩方式

stream.map.output.field.separator

map输出分隔符

&#160;

Streaming应用实例

write two c++ codes

1. map.cpp

#include <string>
#include <iostream>

using namespace std;

int main()
{
string line;
while (cin>>line)
{
cout<<line<<"\t"<<1<<endl;
}
return 0;
}

2. reduce.cpp

#include <map>
#include <string>
#include <iostream>

using namespace std;
int main()
{
string key;
string value;
map<string,int> word_count;
map<string,int> :: iterator it;
while (cin>>key)
{
cin>>value;
it=word_count.find(key);
if(it!=word_count.end())
{
++(it->second);
}
else
{
word_count.insert(make_pair(key,1));
}
}
for(it=word_count.begin();it!=word_count.end();++it)
cout<<it->first<<"\t"<<it->second<<endl;
return 0;
}

g++ -o map map.cpp

g++ -o reduce reduce.cpp

2. prepare for the input files

creat two txt files

File1.txt:hello hadoop helloworld

File2.txt:this is a firsthadoop

3. start hadoop

format hadoop file system

bin``/hadoop ``namenode -``format

start hadoop

bin``/start-all``.sh

3. upload input files to the Hadoop file system

hadoop fs –put File1.txt File2.txt&#160; ans

4. set input output folders

5. run the program

bin/hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar  

-file /home/hadoop/Documents/c/hadooptest/mapper 
-file /home/hadoop/Documents/c/hadooptest/reduce 
-input /usr/local/hadoop/inputfiles/* 
-output /usr/local/hadoop/outputfiles
-inputformat WholeFileInputFormat        (name of class)

inputformat 可选内置或用户自定义

-mapper /home/hadoop/Documents/c/hadooptest/mapper 
-reducer /home/hadoop/Documents/c/hadooptest/reduce
6\. see the status