本文共 6889 字,大约阅读时间需要 22 分钟。
我们在使用mapreudce来处理数据的时候会遇到许多类型的数据,如文本数据,二进制数据,数据库表等文件,mapreduce中对应其中常用的数据格式有许多输入类来对应实现这些输入格式数据的输入。
注意这几种输入格式实现类的切片
**TextInputFormat:**按文件切片,一个文件大了切片,但是再小也是一个切片
**KeyValue:**同TextInputFormat
**NLineInputformat:**按文件行数切片,N行切一片。
**CombineTextInputFormat:**设置文件最小值,文件小于最小值,合并切片。
**自定义Inputformat:**同TextInputFormat,只不过自定义设置了Kv。
这大概是最为常用的inputformat输入类了,用来读取文本数据。
按行读取每条记录键值为在整个文件中的字节偏移量,为Longwriteable类型。值为这行的内容,不包括任何终止符,为Text类型。
每一行均为一条记录,被分割符分割为键和值,可以在驱动类中自定义分隔符使用:
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," ");
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nwi5lFby-1603113769548)(https://s1.ax1x.com/2020/10/18/0juD10.png)]
使用NLineInputFormat来指定用来分配给每个maptask任务的文本行数,即指每个inpusplit不在按照blocksize来划分,而是按照指定的行数及逆行划分。
即输入文件的总行数/N=切片数,如若不整除,切片数 = 商+1
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-C6Chs3Gq-1603113769551)(https://s1.ax1x.com/2020/10/18/0juONd.png)]
这里的键和值和TextInputFormat生成的一样。
统计输入文件中每一行的第一个单词相同的行数。
输入数据:
banzhang ni haoxihuan hadoop banzhangbanzhang ni haoxihuan hadoop banzhang
期望:
banzhang 2xihuan 2
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Jc5wCn6x-1603113769554)(https://s1.ax1x.com/2020/10/18/0jl1Ug.png)]
package com.map.kv;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class KVTextMapper extends Mapper{ LongWritable v = new LongWritable(1); @Override protected void map(Text key,Text value,Context context) throws IOException, InterruptedException { context.write(key,v); }}
package com.map.kv;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class KVTextReducer extends Reducer{ LongWritable v = new LongWritable(); @Override protected void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException { long sum = 0L; for (LongWritable value : values) { sum+=value.get(); } v.set(sum); context.write(key,v); }}
package com.map.kv;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class KVTextDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{ "D:/mapreduceinput/input1","D:/mapreduceoutput/output1"}; Configuration conf = new Configuration(); conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," "); Job job = Job.getInstance(conf); job.setJarByClass(KVTextDriver.class); job.setMapperClass(KVTextMapper.class); job.setReducerClass(KVTextReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); job.setInputFormatClass(KeyValueTextInputFormat.class); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); }}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Qaqhxxy4-1603113769556)(https://s1.ax1x.com/2020/10/18/0jlsPJ.png)]
package com.atguigu.mapreduce.nline;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class NLineMapper extends Mapper{ private Text k = new Text(); private LongWritable v = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] splited = line.split(" "); // 3 循环写出 for (int i = 0; i < splited.length; i++) { k.set(splited[i]); context.write(k, v); } }}
package com.atguigu.mapreduce.nline;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class NLineReducer extends Reducer{ LongWritable v = new LongWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long sum = 0l; // 1 汇总 for (LongWritable value : values) { sum += value.get(); } v.set(sum); // 2 输出 context.write(key, v); }}
package com.atguigu.mapreduce.nline;import java.io.IOException;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class NLineDriver { public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "e:/input/inputword", "e:/output1" }; // 1 获取job对象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 7设置每个切片InputSplit中划分三条记录 NLineInputFormat.setNumLinesPerSplit(job, 3); // 8使用NLineInputFormat处理记录数 job.setInputFormatClass(NLineInputFormat.class); // 2设置jar包位置,关联mapper和reducer job.setJarByClass(NLineDriver.class); job.setMapperClass(NLineMapper.class); job.setReducerClass(NLineReducer.class); // 3设置map输出kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 4设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 5设置输入输出数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6提交job job.waitForCompletion(true); }}
转载地址:http://cicki.baihongyu.com/