如果要自己编写mapreduce函数,那么自定义排序、分组、分区是一个非常重要的过程。

如同在「 Hadoop」mapreduce对温度数据进行自定义排序、分组、分区等 这篇文章中,对温度的处理就用到了自定义的排序、分组以及将数据分区给不同的reduce程序去操作。

如何更好地去编写排序的函数,怎样定义分组或者是分区的前提是需要很好地了解mapreduce的详细过程。

postbird

一、Map端分析

1、数据溢出

每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(2.x之后是128M默认)为一个分片,当然这个分片依旧是可以设置的。

map输出的结果会暂时存放在一个环形内存缓冲区中(缓冲区默认大小100M,由io.sort.mb控制),当该缓冲区将要溢出的时候(默认为缓冲区的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。

2、数据分区

在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做的目的是为了避免数据倾斜(有的reduce分到相当大的数据,而有的reduce分到很小部分的数据甚至没有数据)

分区其实就是对数据进行hash的过程。

然后对每个分区中的数据进行排序操作,如果此时设置Combiner,将排序后的结果进行Combiner操作,这样做的目的是为了尽可能少的将数据写入磁盘。比如wordcount中,postbird 1 和 postbird 2 两个输出合并之后成为 postbird:1,2 ,只写入一次即可

postbird

3、合并和数据压缩

从上面可以看到,map任务输出到最后一个记录的时候,可能会有很多的溢出文件,这时候需要将这些文件进行文件合并操作,合并的过程中会不断的进行排序和combine,两个目的:

  1. 尽量减少写入磁盘的数据量
  2. 尽量减少下一阶段网络传输的数据量

最后合并成一个已经分区并且已经排序的文件,为了减少网络的传输数据量,也可以对数据进行压缩,只要配置mapred.compress.map.out为true即可。

4、拷贝至reduce

将分区中的数据拷贝给对应的reduce任务。至于不同的reduce任务如何准确的匹配到map拷贝的数据,则是在整个map阶段,由resourcemanager进行控制和调度。

5、shuffle

准确来说,shuffle并不是map阶段的任务,也不是reduce的任务。

shuffle是map和reduce的中间过程,一个map产生的数据,通过hash过程分区分配给了不同的reduce任务

二、Reduce端分析

1、接收数据

reduce会接收到不同的map任务传来的数据,并且每个map传来的数据都是有序的,如果reduce端接收的数据量相当小,则直接存储在内存中(缓冲区有mapred.job.shuffle.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例,则对数据合并后溢写到磁盘中

2、排序和合并

随着溢写文件的增多,后台线程会将他们合并成一个更大的有序的文件,这样做的目的是为了后面的合并能够节省时间。

其实不管在map或者是在reduce,MapReduce都在反复的执行排序、合并操作。

排序是hadoop的灵魂

3、磁盘写入

合并的过程中会产生许多中间文件(已经写入磁盘),但是Mapreduce会让写入磁盘的数据尽可能的少(减少磁盘IO),并且最后一次合并的记过并没有写入磁盘,而是直接输入到reduce函数。

postbird


对于mapreduce来说,搞懂在什么时候排序,在什么时候分区,在什么时候分组(也就是combine操作),对于自定义排序或者是分组等较为高级的程序有很大的帮助,虽然现在很多实际应用早就不在使用mapreduce计算,但是作为hadoop的两大灵魂(之前)之一,mapredude的离线计算思想依旧非常实用。

三、MapReduce的执行过程的总结

1、Map端

  1. 第一步 读取HDFS文件:每一行解析成一个<key,value>,每一个<key,value>调用一次map函数
  2. 第二步 接收【第一步】中产生的<key,value>,进行处理转换为新的<key,value>输出(map函数的工作
  3. 第三步 对【第二步】输出的<key,value>进行分区。默认一个分区(shuffle的过程,通过hash
  4. 第四步 对不同分区中的数据进行排序(按照key)、分组。分组是指相同的key的value放到一个集合中。(merge成group
  5. 第五步 (可选)对分组后的数据进行规约(combine)(这是本地化的reduce

2、Reduce端

  1. 第一步 多个map任务的输出,按照不同的分区,通过网络拷贝到不同的reduce节点上
  2. 第二步 对多个map的输出进行合并、排序。接收的是分组后的数据,实现自己的业务逻辑,处理后产生新的<key,value>输出。
  3. 第三步 将reduce的输出写到hdfs中

四、实例分析

「 Hadoop」mapreduce对温度数据进行自定义排序、分组、分区等 这篇文章中,

首先指定了Map函数输出的key的类型由自己定义的KeyPair的类型。

其次通过

public int getPartition(KeyPair key, Text value, int num) {
        //按照年份进行分区 年份相同,返回的是同一个值
        return (key.getYear()*127)%num;
    }

对数据进行了分区,这是一个简单的hash算法,如果年份相同则一定会返回相同的值。

分区之后就要对每个分区中的数据进行排序和合并操作:

    public int compare(WritableComparable a, WritableComparable b) {
        //按照年份升序排序 按照温度降序排序
        KeyPair o1=(KeyPair)a;
        KeyPair o2=(KeyPair)b;
        int result=Integer.compare(o1.getYear(),o2.getYear());
        //比较年份 如果年份不相等
        if(result != 0){
            return result;
        }
        //两个年份相等 对温度进行降序排序
        return -Integer.compare(o1.getTemp(),o2.getTemp());
    }

上面的排序根据程序需求,首先根据年份升序排序,如果年份相等,再根据温度降序排序。

排序过后需要考虑分组:

 public int compare(WritableComparable a, WritableComparable b) {
        //年份相同返回的是0
        KeyPair o1=(KeyPair)a;
        KeyPair o2=(KeyPair)b;
        return Integer.compare(o1.getYear(),o2.getYear());
    }

上面的分组对两个输入的keyPair进行比较,(使用了强制类型转换)

只不过在reduce我只是将数据原样输出来看看三个reduce task的输出后写入hdfs的情况。

【详细可以看看 http://www.ptbird.cn/mapreduce-tempreture.html 这篇文章】