博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce几种常见的inputformat实现类
阅读量:3968 次
发布时间:2019-05-24

本文共 6889 字,大约阅读时间需要 22 分钟。

我们在使用mapreudce来处理数据的时候会遇到许多类型的数据,如文本数据,二进制数据,数据库表等文件,mapreduce中对应其中常用的数据格式有许多输入类来对应实现这些输入格式数据的输入。

注意这几种输入格式实现类的切片

**TextInputFormat:**按文件切片,一个文件大了切片,但是再小也是一个切片

**KeyValue:**同TextInputFormat

**NLineInputformat:**按文件行数切片,N行切一片。

**CombineTextInputFormat:**设置文件最小值,文件小于最小值,合并切片。

**自定义Inputformat:**同TextInputFormat,只不过自定义设置了Kv。

TextInputFormat

这大概是最为常用的inputformat输入类了,用来读取文本数据。

按行读取每条记录键值为在整个文件中的字节偏移量,为Longwriteable类型。值为这行的内容,不包括任何终止符,为Text类型。

KeyValueTextInputFormat

每一行均为一条记录,被分割符分割为键和值,可以在驱动类中自定义分隔符使用:

conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," ");

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nwi5lFby-1603113769548)(https://s1.ax1x.com/2020/10/18/0juD10.png)]

NLineInputFormat

使用NLineInputFormat来指定用来分配给每个maptask任务的文本行数,即指每个inpusplit不在按照blocksize来划分,而是按照指定的行数及逆行划分。

即输入文件的总行数/N=切片数,如若不整除,切片数 = 商+1

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-C6Chs3Gq-1603113769551)(https://s1.ax1x.com/2020/10/18/0juONd.png)]

这里的键和值和TextInputFormat生成的一样。

KeyValueInputFormat实例

统计输入文件中每一行的第一个单词相同的行数。

输入数据:

banzhang ni haoxihuan hadoop banzhangbanzhang ni haoxihuan hadoop banzhang

期望:

banzhang	2xihuan	2

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Jc5wCn6x-1603113769554)(https://s1.ax1x.com/2020/10/18/0jl1Ug.png)]

mapper

package com.map.kv;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class KVTextMapper extends Mapper
{
LongWritable v = new LongWritable(1); @Override protected void map(Text key,Text value,Context context) throws IOException, InterruptedException {
context.write(key,v); }}

Reducer

package com.map.kv;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class KVTextReducer extends Reducer
{
LongWritable v = new LongWritable(); @Override protected void reduce(Text key,Iterable
values,Context context) throws IOException, InterruptedException {
long sum = 0L; for (LongWritable value : values) {
sum+=value.get(); } v.set(sum); context.write(key,v); }}

driver

package com.map.kv;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class KVTextDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{
"D:/mapreduceinput/input1","D:/mapreduceoutput/output1"}; Configuration conf = new Configuration(); conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," "); Job job = Job.getInstance(conf); job.setJarByClass(KVTextDriver.class); job.setMapperClass(KVTextMapper.class); job.setReducerClass(KVTextReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); job.setInputFormatClass(KeyValueTextInputFormat.class); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); }}

NLineInputFormat实例

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Qaqhxxy4-1603113769556)(https://s1.ax1x.com/2020/10/18/0jlsPJ.png)]

mapper

package com.atguigu.mapreduce.nline;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class NLineMapper extends Mapper
{
private Text k = new Text(); private LongWritable v = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行 String line = value.toString(); // 2 切割 String[] splited = line.split(" "); // 3 循环写出 for (int i = 0; i < splited.length; i++) {
k.set(splited[i]); context.write(k, v); } }}

reducer

package com.atguigu.mapreduce.nline;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class NLineReducer extends Reducer
{
LongWritable v = new LongWritable(); @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
long sum = 0l; // 1 汇总 for (LongWritable value : values) {
sum += value.get(); } v.set(sum); // 2 输出 context.write(key, v); }}

driver

package com.atguigu.mapreduce.nline;import java.io.IOException;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class NLineDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] {
"e:/input/inputword", "e:/output1" }; // 1 获取job对象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 7设置每个切片InputSplit中划分三条记录 NLineInputFormat.setNumLinesPerSplit(job, 3); // 8使用NLineInputFormat处理记录数 job.setInputFormatClass(NLineInputFormat.class); // 2设置jar包位置,关联mapper和reducer job.setJarByClass(NLineDriver.class); job.setMapperClass(NLineMapper.class); job.setReducerClass(NLineReducer.class); // 3设置map输出kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 4设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 5设置输入输出数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6提交job job.waitForCompletion(true); }}

转载地址:http://cicki.baihongyu.com/

你可能感兴趣的文章
LDDR3中scull编译问题
查看>>
内核模块转
查看>>
内核模块转
查看>>
ARM中断原理,&nbsp;中断嵌套的误区,中…
查看>>
ARM中断原理,&nbsp;中断嵌套的误区,中…
查看>>
struct&nbsp;device&nbsp;中的dev_id哪里去了…
查看>>
struct&nbsp;device&nbsp;中的dev_id哪里去了…
查看>>
Platform总线
查看>>
Platform总线
查看>>
Linux驱动程序中的platform总线详…
查看>>
Linux驱动程序中的platform总线详…
查看>>
按键驱动--platform设备的例子
查看>>
按键驱动--platform设备的例子
查看>>
mini2440按键驱动及详细解释(转)
查看>>
mini2440按键驱动及详细解释(转)
查看>>
在中断上下文使用disable_irq()的…
查看>>
在中断上下文使用disable_irq()的…
查看>>
内核定时器
查看>>
内核定时器
查看>>
中断与内核定时器
查看>>