I think uou can use NullWritable as key. http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/NullWritable.html
Regards, Shahab On Thu, Jul 25, 2013 at 2:58 PM, Felipe Gutierrez < [email protected]> wrote: > I did a MapReduce program to execute a Grep function. I know there is a > Grep function at hadoop examples, but I want to make my Grep MapReduce to > explain to other. > My problem is that my out put shows the key/value. I want to show only the > value, since I saved the line number at this value. Example: > > 00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689] wlan0: > associated ] > > Here is my code. Thanks, > Felipe > > package grep; > > import java.io.File; > import java.io.FileReader; > import java.io.LineNumberReader; > > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.FileInputFormat; > import org.apache.hadoop.mapred.FileOutputFormat; > import org.apache.hadoop.mapred.JobClient; > import org.apache.hadoop.mapred.JobConf; > import org.apache.hadoop.mapred.TextInputFormat; > import org.apache.hadoop.mapred.TextOutputFormat; > > public class Main { > > public static void main(String[] args) throws Exception { > > if (args == null || args.length != 3) { > System.err.println("Usage: Main <in> <out> <regex>"); > System.exit(-1); > } > > JobConf conf = new JobConf(Main.class); > conf.setJobName("grep"); > > String input = args[0]; > String output = args[1]; > String regex = args[2]; > > File arquivoLeitura = new File(input); > LineNumberReader linhaLeitura = new LineNumberReader(new FileReader( > arquivoLeitura)); > linhaLeitura.skip(arquivoLeitura.length()); > String lines = String.valueOf(linhaLeitura.getLineNumber() + 1); > conf.set("grep.regex", regex); > conf.set("grep.lines", lines); > > conf.setOutputKeyClass(Text.class); > conf.setOutputValueClass(MyWritable.class); > > conf.setMapperClass(GrepMapper.class); > conf.setCombinerClass(GrepReducer.class); > conf.setReducerClass(GrepReducer.class); > > conf.setInputFormat(TextInputFormat.class); > conf.setOutputFormat(TextOutputFormat.class); > > FileInputFormat.setInputPaths(conf, new Path(input)); > FileOutputFormat.setOutputPath(conf, new Path(output)); > > JobClient.runJob(conf); > } > } > > package grep; > > import java.io.IOException; > import java.text.DecimalFormat; > > import org.apache.hadoop.io.LongWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.JobConf; > import org.apache.hadoop.mapred.MapReduceBase; > import org.apache.hadoop.mapred.Mapper; > import org.apache.hadoop.mapred.OutputCollector; > import org.apache.hadoop.mapred.Reporter; > > public class GrepMapper extends MapReduceBase implements > Mapper<LongWritable, Text, Text, MyWritable> { > > private static long line = 1; > private static long n = 0; > private static long divisor = 1; > private static long qtdLines = 0; > private Text k = new Text(); > > public void map(LongWritable key, Text value, > OutputCollector<Text, MyWritable> output, Reporter reporter) > throws IOException { > String str = value.toString(); > MyWritable text = new MyWritable("line " + line + " : " + str); > if ((line % divisor) == 0) { > n++; > } > k.set(customFormat("00000", n)); > output.collect(k, text); > line++; > } > > @Override > public void configure(JobConf job) { > qtdLines = Long.parseLong(job.get("grep.lines")); > if (qtdLines <= 500) { > divisor = 10; > } else if (qtdLines <= 1000) { > divisor = 20; > } else if (qtdLines <= 1500) { > divisor = 30; > } else if (qtdLines <= 2000) { > divisor = 40; > } else if (qtdLines <= 2500) { > divisor = 50; > } else if (qtdLines <= 3000) { > divisor = 60; > } else if (qtdLines <= 3500) { > divisor = 70; > } else if (qtdLines <= 4000) { > divisor = 80; > } else if (qtdLines <= 4500) { > divisor = 90; > } else if (qtdLines <= 5000) { > divisor = 100; > } else if (qtdLines <= 5500) { > divisor = 110; > } else if (qtdLines <= 6000) { > divisor = 120; > } else if (qtdLines <= 6500) { > divisor = 130; > } else if (qtdLines <= 7000) { > divisor = 140; > } > } > > static public String customFormat(String pattern, double value) { > DecimalFormat myFormatter = new DecimalFormat(pattern); > return myFormatter.format(value); > } > } > > package grep; > > import java.io.IOException; > import java.util.Iterator; > import java.util.regex.Matcher; > import java.util.regex.Pattern; > > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.JobConf; > import org.apache.hadoop.mapred.MapReduceBase; > import org.apache.hadoop.mapred.OutputCollector; > import org.apache.hadoop.mapred.Reducer; > import org.apache.hadoop.mapred.Reporter; > > public class GrepReducer extends MapReduceBase implements > Reducer<Text, MyWritable, Text, MyWritable> { > > private Pattern pattern; > > @Override > public void configure(JobConf job) { > pattern = Pattern.compile(job.get("grep.regex")); > } > > public void reduce(Text key, Iterator<MyWritable> values, > OutputCollector<Text, MyWritable> output, Reporter reporter) > throws IOException { > > while (values.hasNext()) { > String text = (String) values.next().get(); > Matcher matcher = pattern.matcher(text); > while (matcher.find()) { > output.collect(key, new MyWritable(text)); > } > } > } > } > > package grep; > > import java.io.DataInput; > import java.io.DataOutput; > import java.io.IOException; > import java.lang.reflect.Array; > import java.util.HashMap; > import java.util.Map; > > import org.apache.hadoop.conf.Configurable; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.conf.Configured; > import org.apache.hadoop.io.UTF8; > import org.apache.hadoop.io.Writable; > import org.apache.hadoop.io.WritableFactories; > > public class MyWritable implements Writable, Configurable { > > private Class declaredClass; > private Object instance; > private Configuration conf; > > public MyWritable() { > } > > public MyWritable(Object instance) { > set(instance); > } > > public MyWritable(Class declaredClass, Object instance) { > this.declaredClass = declaredClass; > this.instance = instance; > } > > /** Return the instance, or null if none. */ > public Object get() { > return instance; > } > > /** Return the class this is meant to be. */ > public Class getDeclaredClass() { > return declaredClass; > } > > /** Reset the instance. */ > public void set(Object instance) { > this.declaredClass = instance.getClass(); > this.instance = instance; > } > > public String toString() { > return "[ " + instance + " ]"; > } > > public void readFields(DataInput in) throws IOException { > readObject(in, this, this.conf); > } > > public void write(DataOutput out) throws IOException { > writeObject(out, instance, declaredClass, conf); > } > > private static final Map<String, Class<?>> PRIMITIVE_NAMES = new > HashMap<String, Class<?>>(); > static { > PRIMITIVE_NAMES.put("boolean", Boolean.TYPE); > PRIMITIVE_NAMES.put("byte", Byte.TYPE); > PRIMITIVE_NAMES.put("char", Character.TYPE); > PRIMITIVE_NAMES.put("short", Short.TYPE); > PRIMITIVE_NAMES.put("int", Integer.TYPE); > PRIMITIVE_NAMES.put("long", Long.TYPE); > PRIMITIVE_NAMES.put("float", Float.TYPE); > PRIMITIVE_NAMES.put("double", Double.TYPE); > PRIMITIVE_NAMES.put("void", Void.TYPE); > } > > private static class NullInstance extends Configured implements Writable { > private Class<?> declaredClass; > > public NullInstance() { > super(null); > } > > public NullInstance(Class declaredClass, Configuration conf) { > super(conf); > this.declaredClass = declaredClass; > } > > public void readFields(DataInput in) throws IOException { > String className = UTF8.readString(in); > declaredClass = PRIMITIVE_NAMES.get(className); > if (declaredClass == null) { > try { > declaredClass = getConf().getClassByName(className); > } catch (ClassNotFoundException e) { > throw new RuntimeException(e.toString()); > } > } > } > > public void write(DataOutput out) throws IOException { > UTF8.writeString(out, declaredClass.getName()); > } > } > > /** > * Write a {@link Writable}, {@link String}, primitive type, or an array of > * the preceding. > */ > public static void writeObject(DataOutput out, Object instance, > Class declaredClass, Configuration conf) throws IOException { > > if (instance == null) { // null > instance = new NullInstance(declaredClass, conf); > declaredClass = Writable.class; > } > > UTF8.writeString(out, declaredClass.getName()); // always write declared > > if (declaredClass.isArray()) { // array > int length = Array.getLength(instance); > out.writeInt(length); > for (int i = 0; i < length; i++) { > writeObject(out, Array.get(instance, i), > declaredClass.getComponentType(), conf); > } > > } else if (declaredClass == String.class) { // String > UTF8.writeString(out, (String) instance); > > } else if (declaredClass.isPrimitive()) { // primitive type > > if (declaredClass == Boolean.TYPE) { // boolean > out.writeBoolean(((Boolean) instance).booleanValue()); > } else if (declaredClass == Character.TYPE) { // char > out.writeChar(((Character) instance).charValue()); > } else if (declaredClass == Byte.TYPE) { // byte > out.writeByte(((Byte) instance).byteValue()); > } else if (declaredClass == Short.TYPE) { // short > out.writeShort(((Short) instance).shortValue()); > } else if (declaredClass == Integer.TYPE) { // int > out.writeInt(((Integer) instance).intValue()); > } else if (declaredClass == Long.TYPE) { // long > out.writeLong(((Long) instance).longValue()); > } else if (declaredClass == Float.TYPE) { // float > out.writeFloat(((Float) instance).floatValue()); > } else if (declaredClass == Double.TYPE) { // double > out.writeDouble(((Double) instance).doubleValue()); > } else if (declaredClass == Void.TYPE) { // void > } else { > throw new IllegalArgumentException("Not a primitive: " > + declaredClass); > } > } else if (declaredClass.isEnum()) { // enum > UTF8.writeString(out, ((Enum) instance).name()); > } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable > UTF8.writeString(out, instance.getClass().getName()); > ((Writable) instance).write(out); > > } else { > throw new IOException("Can't write: " + instance + " as " > + declaredClass); > } > } > > /** > * Read a {@link Writable}, {@link String}, primitive type, or an array of > * the preceding. > */ > public static Object readObject(DataInput in, Configuration conf) > throws IOException { > return readObject(in, null, conf); > } > > /** > * Read a {@link Writable}, {@link String}, primitive type, or an array of > * the preceding. > */ > @SuppressWarnings("unchecked") > public static Object readObject(DataInput in, MyWritable objectWritable, > Configuration conf) throws IOException { > String className = UTF8.readString(in); > Class<?> declaredClass = PRIMITIVE_NAMES.get(className); > if (declaredClass == null) { > try { > declaredClass = conf.getClassByName(className); > } catch (ClassNotFoundException e) { > throw new RuntimeException("readObject can't find class " > + className, e); > } > } > > Object instance; > > if (declaredClass.isPrimitive()) { // primitive types > > if (declaredClass == Boolean.TYPE) { // boolean > instance = Boolean.valueOf(in.readBoolean()); > } else if (declaredClass == Character.TYPE) { // char > instance = Character.valueOf(in.readChar()); > } else if (declaredClass == Byte.TYPE) { // byte > instance = Byte.valueOf(in.readByte()); > } else if (declaredClass == Short.TYPE) { // short > instance = Short.valueOf(in.readShort()); > } else if (declaredClass == Integer.TYPE) { // int > instance = Integer.valueOf(in.readInt()); > } else if (declaredClass == Long.TYPE) { // long > instance = Long.valueOf(in.readLong()); > } else if (declaredClass == Float.TYPE) { // float > instance = Float.valueOf(in.readFloat()); > } else if (declaredClass == Double.TYPE) { // double > instance = Double.valueOf(in.readDouble()); > } else if (declaredClass == Void.TYPE) { // void > instance = null; > } else { > throw new IllegalArgumentException("Not a primitive: " > + declaredClass); > } > > } else if (declaredClass.isArray()) { // array > int length = in.readInt(); > instance = Array.newInstance(declaredClass.getComponentType(), > length); > for (int i = 0; i < length; i++) { > Array.set(instance, i, readObject(in, conf)); > } > > } else if (declaredClass == String.class) { // String > instance = UTF8.readString(in); > } else if (declaredClass.isEnum()) { // enum > instance = Enum.valueOf((Class<? extends Enum>) declaredClass, > UTF8.readString(in)); > } else { // Writable > Class instanceClass = null; > String str = ""; > try { > str = UTF8.readString(in); > instanceClass = conf.getClassByName(str); > } catch (ClassNotFoundException e) { > throw new RuntimeException( > "readObject can't find class " + str, e); > } > > Writable writable = WritableFactories.newInstance(instanceClass, > conf); > writable.readFields(in); > instance = writable; > > if (instanceClass == NullInstance.class) { // null > declaredClass = ((NullInstance) instance).declaredClass; > instance = null; > } > } > > if (objectWritable != null) { // store values > objectWritable.declaredClass = declaredClass; > objectWritable.instance = instance; > } > > return instance; > > } > > public void setConf(Configuration conf) { > this.conf = conf; > } > > public Configuration getConf() { > return this.conf; > } > } > > > -- > *-- > -- Felipe Oliveira Gutierrez > -- [email protected] > -- https://sites.google.com/site/lipe82/Home/diaadia* >
