一个分布式并行计算框架
MapReduce作为计算框架,一定会有input和output,它本身完成了一个单一的工能,就是排序,在MapReduce里边,所有的数据被统一成一种格式:<key,value>的形式,这其实也是一种掣肘。MapReduce对读入的数据的key进行排序,然后经过用户自定义的逻辑之后输出到hdfs。
MapReduce的原理及运行流程:
MapReduce借鉴了函数式程序设计语言lisp的思想,是一种列表处理语言:
add(1,2,3,4)(4,3,2,1)=(5,5,5,5);reduce阶段,将key分组计算。
数据在hdfs上的存储方式是将文件按照一定大小切分为block进行存储,这个大小可以自己设置,hadoop1.0的默认大小是64M,hadoop2.0的默认大小是128M。
MapReduce计算框架在运行的时候分为两个进程,map进程和reduce进程,map和reduce在字面意思上看来,map是映射,处理hfds上的数据,reduce是规约,处理map的输出数据,最终将dealed数据写到hdfs。
这里有个地方需要注意:map的input的是hfds的数据,那么它的规则是什么呢,hdfs的文件按照block来存储,map在读取数据的时候按照一个split来读,split指的是一个逻辑单位,可以由系统参数指定,默认256M,也就是两个block块的数据会被一个map处理。但是要指出的是:hfds文件存储的时候,一个block不是一定会占内存128M,一个文件如果不够128M的话,这个block占的内存就是文件的大小。经过试验发现,一个split的大小是内存的实际大小,并不一定是两个block。
MapReduce的运行流程:
map进程:第一步map按照split读取hfds的数据,并将其转化为<key,value>的形式,其中key是每行的偏移量,value是一行的的数据。第二步是对每个<key,value>对按照key进行partition。第三步是对每个partition内的<key,value>对按照key进行排序。第四步:在本地按照key进行combine。
在map和reduce之间有一个过程叫做shuffle,shuffle本意是洗牌。MapReduce的shuffle过程完成的几个步骤是:fetch/copy(将map端的数据通过IO,移动到reduce所在的节点进行计算),sort&merge(对每个map过来的数据相同的partition进行合并和排序)
reduce进程:reduce首先将map过来的数据做group的操作,即将相同key的value写成一个集合,在MapReduce框架中,这个集合是一个迭代器。最后将处理好的数据写出到hfds上。
下边是MapReduce的经典java代码:WordCount
****************************************************************************************
WordCount:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @Author: yhq
*/
public class WordCount {
public static class MyMapper extends Mapper<Object,Text,Text,IntWritable> {
private static Text text = new Text();
private static IntWritable one = new IntWritable(1);
@Override
public void map (Object key,Text value,Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String word:words
) {
text.set(word);
context.write(text,one);
}
}
}
public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
public void reduce (Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val:values
) {
sum += val.get();
context.write(key,new IntWritable(sum));
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获得集群对象
Configuration conf = new Configuration();
//获得job对象
Job job = Job.getInstance(conf,"word_count");
//设置job运行的主类
job.setJarByClass(WordCount.class);
//设置mapper
job.setMapperClass(MyMapper.class);
//设置reducer
job.setReducerClass(MyReducer.class);
//设置map和reduce端的key输出类型
job.setOutputKeyClass(Text.class);
//设置map和reduce端的value输出类型
job.setOutputValueClass(IntWritable.class);
//设置文件输入路径
FileInputFormat.addInputPath(job,new Path(args[0]));
//设置reduce的输出路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 指定 job执行模式,等待任务执行完成后,提交任务的客户端才会退出!
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容