由于比较简单所以直接上代码,代码中注意的地方右明确注释,有错误地方请指正。
输入数据data.txt:
1950 11950 2211950 2011950 1211950 2011949 1111949 1201949 1341949 13401949 1343
Map类:
1 package com.wyf.maxtemprature; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.MapReduceBase; 9 import org.apache.hadoop.mapred.Mapper;10 import org.apache.hadoop.mapred.OutputCollector;11 import org.apache.hadoop.mapred.Reporter;12 13 /**14 * @author wyf15 *16 */17 public class MaxTempratureMapper extends MapReduceBase implements Mapper{18 19 /**20 * 读取数据按行读取,21 * @param key 行号22 * @param value 行值23 * @param outputCollector 输出容器24 * @param reporter 报告处理监控处理线程25 */26 @Override27 public void map(LongWritable key, Text value, OutputCollector outputCollector, Reporter reporter) throws IOException {28 29 System.out.println("行号(key):" + key + "\t行数据(value):" + value);30 31 String[] split = value.toString().split("\t");32 Text text = new Text(split[0]);33 IntWritable intWritable = new IntWritable(Integer.parseInt(split[1]));34 outputCollector.collect(text, intWritable);35 }36 37 }
Reduce类:
1 package com.wyf.maxtemprature; 2 3 import java.io.IOException; 4 import java.util.Iterator; 5 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.MapReduceBase; 9 import org.apache.hadoop.mapred.OutputCollector;10 import org.apache.hadoop.mapred.Reducer;11 import org.apache.hadoop.mapred.Reporter;12 13 /**14 * @author wyf15 *16 */17 public class MaxTempratureReducer extends MapReduceBase implements Reducer{18 19 /**20 * @param key 年份21 * @param value 气温的连表22 * @param outputCollector 输出容器23 * @param reporter 报告器24 */25 @Override26 public void reduce(Text key, Iterator value, OutputCollector outputCollector, Reporter reporter) throws IOException {27 outputCollector.collect(key, calculateMaxValue(value));28 }29 30 /**31 * 计算最大值32 * @author: wyf33 * @version: Jul 10, 2013 10:50:01 AM34 * @return 返回最大值 35 */36 protected IntWritable calculateMaxValue(Iterator values) {37 38 IntWritable max = values.next();39 40 while(values.hasNext()) {41 IntWritable value = values.next();42 if(max.get() < value.get()) {43 max = value;44 }45 }46 47 return max;48 }49 50 }
应用程序入口类:
1 package com.wyf.maxtemprature; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.FileInputFormat; 9 import org.apache.hadoop.mapred.FileOutputFormat;10 import org.apache.hadoop.mapred.JobClient;11 import org.apache.hadoop.mapred.JobConf;12 import org.apache.hadoop.mapred.TextInputFormat;13 14 15 /** 16 * 执行MapReduce17 * @author: wyf18 * @version: Jul 10, 2013 11:00:26 AM 19 */20 21 public class MaxTemprature {22 23 /** 24 * 程序入口25 * @author: wyf26 * @version: Jul 10, 2013 11:00:26 AM 27 */28 29 public static void main(String[] args) {30 31 try {32 Path inputPath = new Path("/hadoopcase/maxtemprature/input/data.txt");33 Path outPath = new Path("/hadoopcase/maxtemprature/output");34 35 JobConf jobConf = new JobConf(MaxTempratureMapper.class);36 37 jobConf.setJobName("Calculate Max Temprature");38 39 FileInputFormat.setInputPaths(jobConf, inputPath);40 FileOutputFormat.setOutputPath(jobConf, outPath);41 42 43 jobConf.setMapperClass(MaxTempratureMapper.class);44 jobConf.setCombinerClass(MaxTempratureReducer.class);45 jobConf.setReducerClass(MaxTempratureReducer.class);46 47 //**********************************************48 //TextInputFormat格式默认行号为key,每一行的字符为value49 //**********************************************50 jobConf.setInputFormat(TextInputFormat.class);51 52 //**********************************************53 //设置Maper后的输出格式,不设置则出现参数类型不匹配错误54 //**********************************************55 jobConf.setOutputKeyClass(Text.class);56 jobConf.setOutputValueClass(IntWritable.class);57 58 JobClient.runJob(jobConf);59 } catch (IOException e) {60 e.printStackTrace();61 }62 63 }64 65 }