我之前写过一篇文章,「 Windows下使用IntelliJ IDEA 远程编写和调试hadoop集群 」,这是基础,我习惯在windows下,而且习惯使用IDEA,因为高版本的hadoop中,eclipse的插件已经gg了。

当然我的集群是hadoop-2.7.1的HA,四台机器搭建的集群,两篇文章配置参考如下(node1/node2上有namenode):

一、前言

WordCount 是hadoop最著名的一个程序,而且应该是hadoop运行最多的一个程序了,当然也有PI的计算等等其他的一些示例程序。

通过WordCount学习mapreduce是入门基础,也是传说中“MapReduce 八股文编程”的入门路径。

记录并简单说明一下wordcount以及mapreduce的八股文编程的简单套路,同时记录一下在IDEA中(windows环境下)提交job任务并运行。

二、Map部分

mapreduce的原理就不写了,都懂,三大部分中先说map部分。

最通俗的说法就是:有一些文件(以文本文件举例),把这些文件依次每行都输入到map中,然后进行map以及后续的shuffle或者是combine/sort等操作,最后输出给reduce进行操作。

上面过程中,我个人理解,hadoop已经做好了将hdfs中的文件每行输入给map这个工作了,我们需要做的就是对每一行数据进行map输出就行了(这里不考虑我们自己进行分组/排序等操作,wordcount也没有这个需求)。

所以,根据wordcount 的需求,我们在每一行中读取每一个单词,然后将其作为1进行输出即可,意思是每个单词出现一次,然后map就输出了,其中我们不进行任何的组合或者是排序操作。

我的map如下:(别人都会使用stringTokenizer,目前不建议了,建议使用正则,我就用了正则,当然我的正则只是空格分开而已,"_"或者是"-"我都没有分开)

对于wordcount来说,比较重要的是输入输出的格式,根据我上面输的,输入是一行数据,不过这一行数据因为要进行某些比较之类的,都会使用LongWritable的格式,其实之后很多map都会使用LongWritable的输入,当然value的输入一般没有什么用处

对于wordcount的输出来说,就是根据程序需求而决定,我们只需要最简单的统计,因此我们将每个单词的标记为出现一次,输出就行了,其他的操作交给reduce去操作这样无疑是最简单的,但是也是效率最低下的。

/**
     *  map area
     *  KEYIN   VALUEIN     KEYOUT  VALUEOUT
     *  输入key   ·输入值    输出key   输出值
     */
    static class TestMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
        //设置每个单词出现一次
        private final static IntWritable one = new IntWritable(1);
        //保存每个单词的内容
        private Text word = new Text();

        @Override
        protected void map(LongWritable  key, Text value, Context context) throws IOException, InterruptedException {
            //获得每行的值
            String lineValue = value.toString();
            //分割单词 虽然很多程序示例都会使用StringTokenizar 但是并不建议继续使用
//            StringTokenizer stringTokenizer = new StringTokenizer(lineValue);
            //遍历
//            while (stringTokenizer.hasMoreTokens()) {
//                //获取每个值
//                String wordValue = stringTokenizer.nextToken();
//                //设置map输出的key值
//                word.set(wordValue);
//                //上下文输出map的key和value
//                context.write(word, one);
//            }

            //通过正则分割
            /**
             * 当然 通过正则分割就要写好正则 我就简单的通过空格分割单词了
             * 这只是个简单的示例而已
             */
            String[] words = lineValue.split(" ");
            for(String singleWord:words){
                word.set(singleWord);
                context.write(word,one);
            }

        }
    }

需要特别明白的是,mapreduce在map之后自动进行combine以及一些优化操作不是我们手动去编程实现的,而是计算框架本身的优化策略!比如我们map是每个单词出现1次而设置的,但是在map之后,进行组合优化等,会将同一个单词归到一起,但是map的输出不会改变。比如:postbird:1/postbird:2会组合成posthird:1,2 这样子

三、Reduce部分

上面是map的操作,结束之后,我们很不负责任的将每个单词出现一次这个输出交给了reduce处理。

需要注意的是,map输出之后,要根据map输出的格式来定义reduce的输入格式,根据上卖弄绿字部分的提示,实际虽然我们的map输出的是key是Text类型,value是1(IntWritable类型),但是进入reduce之后,value就变成了iterable的可遍历,当然iterable内还是IntWritable类型。

wordcount既然是count,sum运算还是需要的。

因此reduce最终的计算就变成了遍历每个单词的每个出现次数,将他们累加,然后输出即可

/**
     *  reduce area
     *  将key相同的value放在一起
     */
     static class TestReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        private IntWritable result = new IntWritable();
        @Override
        protected void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
            //累加数值
            int sum=0;
            //循环遍历iterable
            for (IntWritable value:values){
                //累加
                sum+=value.get();
            }
            //总次数
            result.set(sum);
            //上下文输出
            context.write(key,result);
        }
    }

四、客户端部分

我个人习惯称其为客户端部分,因为map和reduce都处理好了,现在要开始接入hdfs文件系统以及yarn的集群上提交job计算了。

为了方便,我还是直接在windows下通过idea直接提交,而不是打包成jar在放到hadoop的集群上去运行。

因此这里设计的问题中就包含如何通过idea远程提交job

1、windows下在idea中提交job配置参数

用我们的wordcount举例,我们打包成jar在集群中运行的时候,使用的是args的参数列表。

因此我们也应当配置idea的args的参数列表,同时通过代码进行获取。

配置的两个参数一个是输入的目录一个是输出的目录,同时应当使用actice的namenode

注意:输出目录应当删除(可以在自己写一个方便的小程序,每次运行前,跑一次删除输出的目录,使用FileSystem API即可,可以参照:http://www.ptbird.cn/hdfs-filesystem-api.html

<img src="http://static.ptbird.cn/usr/uploads/2017/02/1309267011.png" class="center-block" alt="
postbird" />

2、修改参数的读取配置

使用配置的参数,就要去读取该配置,如果没有参数应当进行错误提示

//修改命令行的配置
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

3、完整的客户端代码

至于上面的问题解决之后,后面依旧是固定的格式,包括设置mapper和reducer的类型什么的。

//client
    public static void main(String args[]) throws IOException,InterruptedException,ClassNotFoundException{
        //获取配置
        Configuration conf=new Configuration();

        //修改命令行的配置
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        //创建Job
        Job job=new Job(conf,"wc");

        //1.设置job运行的类
        job.setJarByClass(WordCount.class);
        //2.设置map和reduce的类
        job.setMapperClass(TestMapper.class);
        job.setReducerClass(TestReducer.class);
        //3.设置输入文件的目录和输出文件的目录
        FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
        //4.输出结果的key和value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //5.提交job 等待运行结束并在客户端显示运行信息
        boolean isSuccess=job.waitForCompletion(true);
        //6.结束程序
        System.exit(isSuccess ?0:1);
    }

五、完整代码示例

我在git@osc存了一份完整的代码示例:

地址:https://git.oschina.net/postbird/codes/fi6qshl4oa7wpgt2yz8d089