MapReduce经典案例——倒排索引(2)

二、案例实现

1、Map阶段实现

首先,使用idea开发工具打开之前创建的Maven项目HadoopDemo(如何导入Maven包),并且新创建cn.itcast.mr.invertedIndex包,在该路径下编写自定义Mapper类InvertedIndexMapper,

package cn.itcast.mr.invertedIndex;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class InvertedIndexMapper extends Mapper<LongWritable,Text,Text,Text>{
    private static Text keyInfo=new Text();      //存储单词和文档名称
    private static final Text valueInfo=new Text("1");     //存储词频,初始化为1
    @Override
    protected void map(LongWritable key,Text value,Context context)
                         throws IOException,InterruptedException{
        String line=value.toString();
        String[]fields= StringUtils.split(line,"");
        //得到这行数据所在的文件切片
        FileSplit fileSplit=(FileSplit) context.getInputSplit();
        //根据文件切片得到文件名
        String fileName=fileSplit.getPath().getName();
        for (String field : fields){
              //key值由单词和文档名称组成,如“MapReduce:file1.txt"
            keyInfo.set(field+":"+fileName);
            context.write(keyInfo,valueInfo);

        }
    }
}

以上代码的作用是将文本中的单词按照空格进行切割,并以冒号拼接,“单词:文档名称”作为key,单词次数作为value,都以文本方式输出至Combine阶段。

2、Combine阶段实现

接着,根据Map阶段的输出结果,在cn.itcast.mr.invertedIndex包下,自定义实现Combine阶段的类InvertedlndexCombiner对每个文档的单词进行词频统计。

package cn.itcast.mr.invertedIndex;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text> {
    private static Text info=new Text();
    //输入:<MapReduce:file3.txt{1,1,…}>
    //输入:<MapReduce file3.txt:2>
    @Override
    protected void reduce(Text key,Iterable<Text>values,Context context) throws IOException,InterruptedException{
        int sum=0;     //统计词频
        for(Text value : values){
            sum +=Integer.parseInt(value.toString());
        }
        int splitIndex=key.toString().indexOf(":");
        //重新设置value值由文档名称和词频组成
        info.set(key.toString().substring(splitIndex+1) +":"+sum);
        //重新设置key值为单词
        key.set(key.toString().substring(0,splitIndex));
        context.write(key,info);
    }
}

以上代码的作用是对Map阶段的单词次数聚合处理,并重新设置key值为单词,value值由文档名称和词频组成。

3、Reduce阶段实现

根据Combine阶段的输出结果形式,同样在cn.itcast.mr.InvertedIndex包下,自定义Reduce类InvertedIndexMapper。

package cn.itcast.mr.invertedIndex;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class InvertedIndexReducer extends Reducer<Text,Text,Text,Text> {
    private static Text result=new Text();
    //输入:<MapReduce file3.txt3.txt:2>
    //输出:<MapReduce file1.txt:1;file2.txt:1;file3.txt:2;>
    @Override
    protected void reduce(Text key,Iterable<Text>values,
            Context context) throws IOException,InterruptedException{
    //生成文档列表
    String fileList=new String();
    for(Text value : values){
           fileList +=value.toString() +";";
        }
    result.set(fileList);
    context.write(key,result);
  }
}

以上代码的作用是接收Combine阶段输出的数据,并最终案例倒排索引文件需求的样式,将单词作为key,多个文档名称和词频连接作为value,输出到目标目录。

4、Driver程序主类实现

最后编写MapReduce程序运行主类InvertedIndexDriver

package cn.itcast.mr.invertedIndex;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;

public class InvertedIndexDriver {
    public static void main (String[] args) throws IOException,ClassNotFoundException,InterruptedException{
        Configuration conf=new Configuration();
        Job job= Job.getInstance(conf);
        job.setJarByClass(InvertedIndexDriver.class);
        job.setMapperClass(InvertedIndexMapper.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(InvertedIndexReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(job,new Path("/export/data/EX3"));   \指定处理之前输入文件的位置
        FileOutputFormat.setOutputPath(job,new Path("/export/data/EX3/output"));   \指定处理之后输出的结果所保存的位置,注意output目录不能存在
        boolean res=job.waitForCompletion(true);   \ 向YARN集群提交这个job
        System.exit(res ? 0 : 1);

        }
    }

设置MapReduce工作任务的相关参数,由于本次演示的数据量较小,为了方便、快速进行案例演示,本案例采用了本地运行模式,对指定的本地D:\Inverted Index\input目录下的源文件(需要提前准备) 实现倒排索引,并将结果输人到本地D:\Inverted Index\output目录下, 设置完毕后, 运行程序即可。

5.效果测试
为了保证MapReduce程序正常执行, 需要先在本地D:\Inverted Index\input目录下创建filel.txt、file2.txt和file3.txt, 下图所示(即file1.txt输人”MapReduce is simple”,file2.txt输人”MapReduce is powerful is simple”,file3.txt输人”Hello MapReduce bye MapReduce”)接着,执行MapReduce程序的程序人口InvertedIndexDriver类,常执行完成后, 会在指定的D:\InvertedIndex\output下生成结果文件。

Hello   file3.tat:1;
MapReduce   file3.txt:2;file1.txt:1;file2.txt:1;
bye   file3.txt:1;
is   file1.txt:1;file2.txt:2;
powerful   file2.txt:1;
simple   file2.txt:1;filel.txt:1;

至此,倒排索引的案例已实现,从以上可看出各个单词所对应的文档以及出现次数,谢谢观看

如果有什么问题可以联系我,或者在文章评论区留下你的问题或联系方式,一起交流解决! 谢谢观看此文章^_^

欢迎登录https://www.jilespace.net/进极乐的博客查看更多内容!

人已赞赏
Hadoop开发

MapReduce经典案例——倒排索引(1)

2020-7-16 0:49:07

Linux

Linux虚拟机安装详细图文教程

2020-7-7 12:42:03

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索