目录
1 编程前总分析
1.1 数据源
1 |
英语,李沐,85,男,20 数学,李沐,54,男,20 音乐,李沐,54,男,20 体育,李沐,34,男,20 语文,李媛,81,女,20 音乐,李媛,85,女,20 体育,李媛,89,女,20 语文,马珂,75,女,19 英语,马珂,85,女,19 音乐,马珂,75,女,19 体育,马珂,65,女,19 语文,潘琴,42,女,20 英语,潘琴,48,女,20 音乐,潘琴,48,女,20 体育,潘琴,78,女,20 英语,秦灿,75,男,19 数学,秦灿,89,男,19 音乐,秦灿,85,男,19 体育,秦灿,99,男,19 语文,王靓,85,女,21 英语,王靓,85,女,21 数学,王靓,48,女,21 音乐,王靓,86,女,21 音乐,王靓,85,女,21 体育,王靓,96,女,21 体育,王靓,87,女,21 英语,吴起,85,男,20 数学,吴起,85,男,20 英语,张翔,96,男,20 数学,张翔,85,男,20 音乐,张翔,85,男,20 体育,张翔,87,男,20 语文,郑虎,85,男,20 数学,郑虎,85,男,20 音乐,郑虎,88,男,20 体育,郑虎,68,男,20 语文,周伟,76,男,19 英语,周伟,85,男,19 数学,周伟,76,男,19 音乐,周伟,99,男,19 体育,周伟,90,男,19 数学,朱鸿,90,男,21 音乐,朱鸿,80,男,21 体育,朱鸿,81,男,21 |
1.2 需要掌握的知识
1.2.1 Hadoop对比java的数据类型
java | Hadoop |
boolean | BooleanWritable |
Integer/int | IntWritable |
Long/long | LongWritable |
Float/float | FloatWritable |
Double/double | DoubleWritable |
String | Text |
NullWritable |
1.2.2 MapReduce流程简介
MapReduce是一种简化的并行计算编程模型框架,主要解决了分布式计算的相关问题。所谓的分布式计算就是将一个文件里的数据内容,一行行的发送给mapper,mapper接收到一行数据使用split分割后接收,并按key分组后传给reducer,reducer将接收到的一组数据进行处理后输出,当所有的组都处理完成结束一个MapReduce。
1.3.3 MapReduce流程细分
功能:统计每门课程中相同分数的人有多少及他们的名字
思考一下,想要统计每门课程中相同分数的人数及名字,我们需要以什么字段为标准对数据进行分组(mapper最主要的功能就是分组)?想要搞明白上面的问题,试着和mysql的分组查询操作做一下类比,具体sql语句如下:
SELECT 姓名 FROM 表 GROUP BY 课程名称,成绩 ;
参考sql语句的分组查询,mapper功能就相当于按课程和成绩两个字段值对数据进行分组并查询学生姓名。mapper里的最后一句context.write(key,value);里的两个参数,key等于GROUP BY后面的字段名–>课程成绩和成绩的拼接字符串,value等于GROUP BY前面的字段名–>姓名。mapper就实现了将所有key值相同的分为一组,value放在迭代器中,一组组传给reducer,reducer使用一个Text类型的key和迭代器value进行接收。
2 编码阶段
mapreduce拆分:每个mapreduce程序都可以拆分成三个小部分mapper类、reducer类、main方法类。每个类都有其固定的框架,需要改变的就只有mapper和reducer类中重写方法的方法体本身,还有main方法里面的各项参数值。
如果说,当然我的读者肯定都是聪明的亚批,我是说如果你朋友的java编程基础不是很好,我的注释表示它完全可以很细。
2.1 导入依赖
MapReduce不需导入的四个依赖(hadoop-client、hadoop-hdfs、hadoop-common、hadoop-mapreduce-client-core)
1 |
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.3</version> </dependency> </dependencies> |
2.2 mapper
1 |
package course_score_same; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* stu[0]:课程名称 stu[1]:学生姓名 stu[2]:成绩 stu[3]:性别 stu[4]:年龄 该功能实现:统计该课程中成绩相同的学生姓名 */ //Mapper的泛型依次为输入文本的第几行,该行的文本,Mapper的输出key,Mapper的输出value public class CssMapper extends Mapper<LongWritable, Text,Text,Text> { //重写方法:在idea的代码区使用快捷键 alt+insert选择鼠标单击override methods选择map方法 @Override //map方法的三个参数,前两个就是输入文本行号,该行的文本,最后一个Context context固定写法 protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //将文件的每一行传递过来,使用split分割后利用字符数组进行接收 String[] stu = value.toString().split(","); //拼接字符串:课程和成绩 String sc = stu[0]+"\t"+stu[2]; //向Reducer传递参数-> Key:课程+成绩 Value:学生名 context.write(new Text(sc),new Text(stu[1])); } } |
2.3 reducer
1 |
package course_score_same; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; //Reducer的泛型依次为Mapper输出的key作为Reducer的输入,Mapper输出的value作为Reducer的输入,Reducer的输出key,Reducer的输出value public class CssReducer extends Reducer <Text,Text,Text,Text>{ //重写方法与Mapper一样 @Override //reduce方法的三个参数:Mapper输出的key作为Reducer的输入,Mapper输出的value作为Reducer的输入,最后一个Context context固定写法 protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //创建StringBuffer用来接收该课程中成绩相同的学生的姓名 StringBuffer sb = new StringBuffer(); //num变量用来计数 int num = 0; //遍历values参数,将所有的value拼接进sb,并统计学生数量 for(Text value:values){ sb.append(value.toString()).append(","); num++; } //如果num=1,则表明该课程的这个成绩只有一个学生,否则就输出 if(num>1){ String names = "一共有" + num + "名学生,他们的名字是:" +sb.toString(); System.out.println("*************************************************"); System.out.println(key.toString() + names); context.write(key,new Text(names)); } } } |
2.4 main
1 |
package course_score_same; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class CssMain { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException { //创建job和“统计相同课程相同分数的人数”任务入口 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(CssMain.class); //设置Mapper和Reducer的入口 job.setMapperClass(CssMapper.class); job.setReducerClass(CssReducer.class); //设置Mapper的输入输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置Reducer的输入输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //指定输入输出路径 String inputPath = "hdfs://localhost:9000/mapreduce/input/学生成绩.csv"; String outputPath = "hdfs://localhost:9000/mapreduce/output/该课程中成绩相同的学生姓名.txt"; FileInputFormat.setInputPaths(job,new Path(inputPath)); FileOutputFormat.setOutputPath(job,new Path(outputPath)); //输出路径存在的话就删除,不然就只能手动删除,否则会报该文件已存在的异常 FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf); if (fileSystem.exists(new Path(outputPath))) { fileSystem.delete(new Path(outputPath), true); } //执行job job.waitForCompletion(true); } } |
至此,一个完整的MapReduce的编写就已经完全结束了,如果想要别的功能,只需要修改mapper和reducer类中重写方法的方法体本身,还有main方法里面的各项参数值即可。
为了进一步锻炼大家MapReduce确定mapper输出的key和value,下面再找两个例子练习一下(每个人的想法都不一样,所以说并没有标准的答案,合理即可):
- 统计所有学生的信息—>(key:姓名+性别+年龄;value:课程+成绩)
- 计算每门成绩的最高分、最低分、平均分—>(key:课程名称;value:成绩)
- 统计各性别的人数及他们的姓名—>(key:性别;value:姓名)