Java MapReduce
明白MapReduce 程序的工作原理之后,下一步便是通过代码来实现它。我们需要三样东西:一个map 函数、一个reduce 函数和一些用来运行作业的代码。map函数由Mapper 接口实现来表示,后者声明了一个map()方法。例2-3 显示了我们的map函数实现。
例2-3. 查找最高气温的Mapper
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseIntdoesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { output.collect(new Text(year), new IntWritable(airTemperature)); } } } 该Mapper接口是一个泛型类型,它有四个形参类型,分别指定map函数的输入键、输入值、输出键和输出值的类型。就目前的示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是年份,输出值是气温(整数)。Hadoop自身提供一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型均可在org.apache.hadoop.io包中找到。这里我们使用LongWritable类型(相当于Java中的Long类型)、Text类型(相当于Java中的String类型)和IntWritable类型(相当于Java 中的Integer类型)。
map()方法的输入是一个键和一个值。我们首先将包含有一行输入的Text值转换成Java的String类型,之后使用substring()方法提取我们感兴趣的列。
map()方法还提供了OutputCollector实例用于输出内容的写入。在这种情况下,我们将年份数据按Text对象进行读/写 (因为我们把年份当作键),将气温值封装在IntWritable 类型中。
我们只在气温数据不缺失并且所对应质量代码显示为正确的气温读数时,才将其写入输出记录中。
reduce函数通过Reducer进行类似的定义,如例2-4 所示。
例2-4. 查找最高气温的Reducer
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } } 同样,针对reduce函数也有四个形式参数类型用于指定其输入和输出类型。reduce 函数的输入类型必须与map 函数的输出类型相匹配:即Text类型和IntWritable类型。在这种情况下,reduce函数的输出类型也必须是Text和IntWritable这两种类型,分别输出年份和最高气温。该最高气温是通过循环比较当前气温与已看到的最高气温获得的。
第三部分代码负责运行MapReduce 作业(请参见例2-5)。
例2-5. 该应用程序在气象数据集中找出最高气温
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; public class MaxTemperature { public static void main(String[] args) throws IOException { if (args.length != 2) { System.err.println("Usage: MaxTemperature<input path> <output path>"); System.exit(-1); } JobConf conf = new JobConf(MaxTemperature.class); conf.setJobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); } } JobConf对象指定了作业执行规范。我们可以用它来控制整个作业的运行。在Hadoop 集群上运行这个作业时,我们需要将代码打包成一个JAR文件(Hadoop会在集群上分发这个文件)。我们无需明确指定JAR 文件的名称,而只需在JobConf的构造函数中传递一个类,Hadoop将通过该类查找包含有该类的JAR文件进而找到相关的JAR文件。
构造JobConf对象之后,需要指定输入和输出数据的路径。调用 FileInputFormat类的静态函数addInputPath()来定义输入数据的路径,该路径可以是单个文件、目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一组文件。由函数名可知,可以多次调用addInputPath()实现多路径的输入。
通过调用FileOutputFormat 类中的静态函数 setOutputPath()来指定输出路径。该函数指定了reduce 函数输出文件的写入目录。在运行任务前该目录不应该存在,否则Hadoop 会报错并拒绝运行该任务。这种预防措施是为了防止数据丢失(一个长时间运行任务的结果被意外地覆盖将是非常恼人的)。
接着,通过setMapperClass()和setReducerClass()指定map和reduce类型。
setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型,正如本例所示,这两个输出类型往往相同。如果不同,map函数的输出类型则通过setMapOutputKeyClass()和setMapOutputValueClass()函数来设置。
输入的类型通过InputFormat类来控制,我们的例子中没有设置,因为使用的是默认的TextInputFormat(文本输入格式)。
在设置定义map 和reduce 函数的类后,便可以开始运行任务。JobClient类的静态函数runJob()会提交作业并等待完成,最后将其进展情况写到控制台。
分享到:
相关推荐
4 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后查看 MapReduce Web 界面。 5. 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后练习 MapReduce Shell 常用命令。 。。
(2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 ...
MapReduce 初级编程实践 姓名: 实验环境: 操作系统:Linux(建议Ubuntu16.04); Hadoop版本:3.2.2; 实验内容与完成情况: (一)编程实现文件合并和去重操作 对于两个输入文件,即文件 A 和文件 B,请...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
Mapreduce实验报告 前言和简介 MapReduce是Google提出的一种编程模型,在这个模型的支持下可以实现大规模并行化计 算。在Mapreduce框架下一个计算机群通过统一的任务调度将一个巨型任务分成许多部分 ,分别解决然后...
提出一种基于MapReduce技术的贝叶斯垃圾邮件过滤机制,一方面对传统贝叶斯过滤技术进行改进,另一方面利用MapReduce模型的海量数据处理优势优化邮件集训练与学习。实验,较之目前流行的传统贝叶斯算法、K最近邻(NN算法...
【MapReduce篇07】MapReduce之数据清洗ETL1
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性...
项目的具体内容如下:1:用MapReduce算法实现贝叶斯分类器的训练过程,并输出训练模型; 2:用输出的模型对测试集文档进行分类测试。测试过程可基于单机Java程序,也可以是MapReduce程序。输出每个测试文档的分类...
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
本节介绍如何编写基本的 MapReduce 程序实现数据分析。本节代码是基于 Hadoop 2.7.3 开发的。 任务准备 单词计数(WordCount)的任务是对一组输入文档中的单词进行分别计数。假设文件的量比较大,每个文档又包含...
Hadoop 用mapreduce实现Wordcount实例,绝对能用
单词计数是最简单也是最能体现 MapReduce 思想的程序之一,可以称为 MapReduce 版“Hello World”。单词计数的主要功能是统计一系列文本文件中每个单词出现的次数。本节通过单词计数实例来阐述采用 MapReduce 解决...
MapReduce求取行平均值 MapReduce小实例 数据有经过处理已经添加行号的 也有未添加的 行平均值的四种求法
MapReduce is the distribution system that the Hadoop MapReduce engine uses to distribute work around a cluster by working parallel on smaller data sets. It is useful in a wide range of applications, ...
《MapReduce2.0源码分析与编程实战》比较系统地介绍了新一代MapReduce2.0的理论体系、架构和程序设计方法。全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce2.0的框架结构和源码分析,...
一个自己写的Hadoop MapReduce实例源码,网上看到不少网友在学习MapReduce编程,但是除了wordcount范例外实例比较少,故上传自己的一个。包含完整实例源码,编译配置文件,测试数据,可执行jar文件,执行脚本及操作...
MapReduce的重要概念 什么是MapReduce 1 MapReduce核心思想 2 MapReduce特点 3 MapReduce适用的开发场景 4 目 录 一、什么是MapReduce MapReduce起源,在介绍大数据编年史时有提到Google最早在04年发表论文MapReduce...
赠送jar包:hadoop-mapreduce-client-common-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.7.3-sources.jar; 赠送Maven依赖信息...
在hadoop平台上,用mapreduce编程实现大数据的词频统计