MapReduce编程–单词统计,去重,排序一气呵成

发布于 2020-04-26  108 次阅读



任务不是很重,做的时候只要注意两点就好了:

  • 一是导入包的时候要导入common,hdfs,mapreduce,yarn四个文件夹下面的jar包,以及他们的lib下面的jar包,
  • 二是注意测试文件的位置,以及输出文件的位置,hdfs dfs的默认操作目录是在user/wangyang/的,也就是大多数的/user/hadoop/,创建文件时,不带/代表的就是创建在该目录下,带/意味着创建在根目录下。最后也没遇到什么比较困难的错误,NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException是因为自己少导入包,之后再导入jar包引起的,所以在建项目时候把jar包导入完全,创建项目过后在导入jar包,会出现一些问题!

  • 除了第一个计数,去重和排序的流程基本都是:
    准备你的测试文件=>开启hadoop=>hdfs创建文件夹=>上传测试文件到hdfs=>创建项目=>导入jar包=>填写代码更改自己的hdfs路径=>执行程序=>终端cat查看结果
  • 导入如下几个目录的相关jar包(其实有些可以不用导入这么多,我懒得分了,全部导入准没错):
  • /usr/local/hadoop/share/hadoop/common
  • /usr/local/hadoop/share/hadoop/common/lib
  • /usr/local/hadoop/share/hadoop/mapreduce
  • /usr/local/hadoop/share/hadoop/mapreduce/lib
  • /usr/local/hadoop/share/hadoop/hdfs
  • /usr/local/hadoop/share/hadoop/hdfs/lib
  • /usr/local/hadoop/share/hadoop/yarn
  • /usr/local/hadoop/share/hadoop/yarn/lib

1.WordCount练习

完成WordCount程序的编写,并打包成.Jar文件在终端执行:
程序代码:

package exp4;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Exp4 {
    public Exp4() {
    }
     public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        if(otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(Exp4.class);
        job.setMapperClass(Exp4.TokenizerMapper.class);
        job.setCombinerClass(Exp4.IntSumReducer.class);
        job.setReducerClass(Exp4.IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class); 
        for(int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public TokenizerMapper() {
        }
        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString()); 
            while(itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one);
            }
        }
    }
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public IntSumReducer() {
        }
        @SuppressWarnings("rawtypes")
        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            IntWritable val;
            for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                val = (IntWritable)i$.next();
            }
            this.result.set(sum);
            context.write(key, this.result);
        }
    }
}

这里题目要求打包成jar包,自行百度,然后运行你的jar包:

start-dfs.sh
hadoop jar /user/local/hadoop/myapp/WordCount.jar /user/wangyang/input/text5.abc /user/wangyang/output

/user/local/hadoop/myapp/WordCount.jar为jar包路径,/user/wangyang/input/text5.abc为测试文件路径,output为输出结果文件的路径
查看结果

hdfs dfs -cat /user/wangyang/output/*

2.数据去重

这里不需要打包jar包,运行程序就好,流程和开头的流程一样:

hdfs dfs -mkdir dedup_in
hdfs dfs -put ~/file1 dedup_in
hdfs dfs -put ~/file2 dedup_in
#查看结果:
hdfs dfs -cat qc/*

程序代码:

package demo2;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Demo2 {
    static String INPUT_PATH = "hdfs://localhost:9000/user/wangyang/dedup_in"; // 将文件file1和file2放在该目录下
    static String OUTPUT_PATH = "hdfs://localhost:9000/user/wangyang/qc";
    static class MyMapper extends Mapper<Object, Text, Text, Text> { // 将输入输出作为string类型,对应Text类型
        private static Text line = new Text(); // 每一行作为一个数据
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            line = value;
            context.write(line, new Text(",")); // key是唯一的,作为数据,即实现去重
        }
    }
    static class MyReduce extends Reducer<Text, Text, Text, Text> {
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            context.write(key, new Text(" ")); // map传给reduce的数据已经做完数据去重,输出即可
        }
    }
    public static void main(String[] args) throws Exception {
        Path outputpath = new Path(OUTPUT_PATH);
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(job, INPUT_PATH);
        FileOutputFormat.setOutputPath(job, outputpath);
        job.waitForCompletion(true);

    }

}

3.数据排序

和上面的去重一样,不再累赘讲述。
直接上代码:

package demo3;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Demo3 {
    static String INPUT_PATH = "hdfs://localhost:9000/user/wangyang/sortin";
    static String OUTPUT_PATH = "hdfs://localhost:9000/user/wangyang/sortout";
    static class MyMapper extends Mapper<Object, Object, IntWritable, NullWritable> { // 选择为Int类型,value值任意
        IntWritable output_key = new IntWritable();
        NullWritable output_value = NullWritable.get();
        protected void map(Object key, Object value, Context context) throws IOException, InterruptedException {
            int val = Integer.parseUnsignedInt(value.toString().trim()); // 进行数据类型转换
            output_key.set(val);
            context.write(output_key, output_value); // key值确定
        }
    }
    static class MyReduce extends Reducer<IntWritable, NullWritable, IntWritable, IntWritable> { // 输入是map的输出,输出行号和数据为int
        IntWritable output_key = new IntWritable();
        int num = 1;
        protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            output_key.set(num++); // 循环赋值作为行号
            context.write(output_key, key); // key为map传入的数据
        }
    }
    public static void main(String[] args) throws Exception {
        Path outputpath = new Path(OUTPUT_PATH);
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        FileInputFormat.setInputPaths(job, INPUT_PATH);
        FileOutputFormat.setOutputPath(job, outputpath);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReduce.class);
        job.setMapOutputKeyClass(IntWritable.class); // 因为map和reduce的输出类型不一样
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        job.waitForCompletion(true);

    }

}
  • 觉得有用的记得点赞哦!!!!