@Shahab : Please correct me if I misunderstood you. Warm Regards, Tariq cloudfront.blogspot.com
On Fri, Jul 26, 2013 at 12:56 AM, Mohammad Tariq <[email protected]> wrote: > Probably by that Shahab means that you can use NullWritable as your key > from the Reducer. If you do so your Reducer will just emit the value > without the key. Something like this : > > output.collect(NullWritable.get(), new MyWritable(text)); > > Warm Regards, > Tariq > cloudfront.blogspot.com > > > On Fri, Jul 26, 2013 at 12:48 AM, Felipe Gutierrez < > [email protected]> wrote: > >> Sorry, I think I didnt understand, >> Does NullWritable go to replate MyWritable? But this is may value. My key >> is a Text. >> Regards, >> Felipe >> >> >> >> On Thu, Jul 25, 2013 at 4:07 PM, Shahab Yunus <[email protected]>wrote: >> >>> I think uou can use NullWritable as key. >>> >>> http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/NullWritable.html >>> >>> >>> Regards, >>> Shahab >>> >>> >>> On Thu, Jul 25, 2013 at 2:58 PM, Felipe Gutierrez < >>> [email protected]> wrote: >>> >>>> I did a MapReduce program to execute a Grep function. I know there is a >>>> Grep function at hadoop examples, but I want to make my Grep MapReduce to >>>> explain to other. >>>> My problem is that my out put shows the key/value. I want to show only >>>> the value, since I saved the line number at this value. Example: >>>> >>>> 00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689] >>>> wlan0: associated ] >>>> >>>> Here is my code. Thanks, >>>> Felipe >>>> >>>> package grep; >>>> >>>> import java.io.File; >>>> import java.io.FileReader; >>>> import java.io.LineNumberReader; >>>> >>>> import org.apache.hadoop.fs.Path; >>>> import org.apache.hadoop.io.Text; >>>> import org.apache.hadoop.mapred.FileInputFormat; >>>> import org.apache.hadoop.mapred.FileOutputFormat; >>>> import org.apache.hadoop.mapred.JobClient; >>>> import org.apache.hadoop.mapred.JobConf; >>>> import org.apache.hadoop.mapred.TextInputFormat; >>>> import org.apache.hadoop.mapred.TextOutputFormat; >>>> >>>> public class Main { >>>> >>>> public static void main(String[] args) throws Exception { >>>> >>>> if (args == null || args.length != 3) { >>>> System.err.println("Usage: Main <in> <out> <regex>"); >>>> System.exit(-1); >>>> } >>>> >>>> JobConf conf = new JobConf(Main.class); >>>> conf.setJobName("grep"); >>>> >>>> String input = args[0]; >>>> String output = args[1]; >>>> String regex = args[2]; >>>> >>>> File arquivoLeitura = new File(input); >>>> LineNumberReader linhaLeitura = new LineNumberReader(new FileReader( >>>> arquivoLeitura)); >>>> linhaLeitura.skip(arquivoLeitura.length()); >>>> String lines = String.valueOf(linhaLeitura.getLineNumber() + 1); >>>> conf.set("grep.regex", regex); >>>> conf.set("grep.lines", lines); >>>> >>>> conf.setOutputKeyClass(Text.class); >>>> conf.setOutputValueClass(MyWritable.class); >>>> >>>> conf.setMapperClass(GrepMapper.class); >>>> conf.setCombinerClass(GrepReducer.class); >>>> conf.setReducerClass(GrepReducer.class); >>>> >>>> conf.setInputFormat(TextInputFormat.class); >>>> conf.setOutputFormat(TextOutputFormat.class); >>>> >>>> FileInputFormat.setInputPaths(conf, new Path(input)); >>>> FileOutputFormat.setOutputPath(conf, new Path(output)); >>>> >>>> JobClient.runJob(conf); >>>> } >>>> } >>>> >>>> package grep; >>>> >>>> import java.io.IOException; >>>> import java.text.DecimalFormat; >>>> >>>> import org.apache.hadoop.io.LongWritable; >>>> import org.apache.hadoop.io.Text; >>>> import org.apache.hadoop.mapred.JobConf; >>>> import org.apache.hadoop.mapred.MapReduceBase; >>>> import org.apache.hadoop.mapred.Mapper; >>>> import org.apache.hadoop.mapred.OutputCollector; >>>> import org.apache.hadoop.mapred.Reporter; >>>> >>>> public class GrepMapper extends MapReduceBase implements >>>> Mapper<LongWritable, Text, Text, MyWritable> { >>>> >>>> private static long line = 1; >>>> private static long n = 0; >>>> private static long divisor = 1; >>>> private static long qtdLines = 0; >>>> private Text k = new Text(); >>>> >>>> public void map(LongWritable key, Text value, >>>> OutputCollector<Text, MyWritable> output, Reporter reporter) >>>> throws IOException { >>>> String str = value.toString(); >>>> MyWritable text = new MyWritable("line " + line + " : " + str); >>>> if ((line % divisor) == 0) { >>>> n++; >>>> } >>>> k.set(customFormat("00000", n)); >>>> output.collect(k, text); >>>> line++; >>>> } >>>> >>>> @Override >>>> public void configure(JobConf job) { >>>> qtdLines = Long.parseLong(job.get("grep.lines")); >>>> if (qtdLines <= 500) { >>>> divisor = 10; >>>> } else if (qtdLines <= 1000) { >>>> divisor = 20; >>>> } else if (qtdLines <= 1500) { >>>> divisor = 30; >>>> } else if (qtdLines <= 2000) { >>>> divisor = 40; >>>> } else if (qtdLines <= 2500) { >>>> divisor = 50; >>>> } else if (qtdLines <= 3000) { >>>> divisor = 60; >>>> } else if (qtdLines <= 3500) { >>>> divisor = 70; >>>> } else if (qtdLines <= 4000) { >>>> divisor = 80; >>>> } else if (qtdLines <= 4500) { >>>> divisor = 90; >>>> } else if (qtdLines <= 5000) { >>>> divisor = 100; >>>> } else if (qtdLines <= 5500) { >>>> divisor = 110; >>>> } else if (qtdLines <= 6000) { >>>> divisor = 120; >>>> } else if (qtdLines <= 6500) { >>>> divisor = 130; >>>> } else if (qtdLines <= 7000) { >>>> divisor = 140; >>>> } >>>> } >>>> >>>> static public String customFormat(String pattern, double value) { >>>> DecimalFormat myFormatter = new DecimalFormat(pattern); >>>> return myFormatter.format(value); >>>> } >>>> } >>>> >>>> package grep; >>>> >>>> import java.io.IOException; >>>> import java.util.Iterator; >>>> import java.util.regex.Matcher; >>>> import java.util.regex.Pattern; >>>> >>>> import org.apache.hadoop.io.Text; >>>> import org.apache.hadoop.mapred.JobConf; >>>> import org.apache.hadoop.mapred.MapReduceBase; >>>> import org.apache.hadoop.mapred.OutputCollector; >>>> import org.apache.hadoop.mapred.Reducer; >>>> import org.apache.hadoop.mapred.Reporter; >>>> >>>> public class GrepReducer extends MapReduceBase implements >>>> Reducer<Text, MyWritable, Text, MyWritable> { >>>> >>>> private Pattern pattern; >>>> >>>> @Override >>>> public void configure(JobConf job) { >>>> pattern = Pattern.compile(job.get("grep.regex")); >>>> } >>>> >>>> public void reduce(Text key, Iterator<MyWritable> values, >>>> OutputCollector<Text, MyWritable> output, Reporter reporter) >>>> throws IOException { >>>> >>>> while (values.hasNext()) { >>>> String text = (String) values.next().get(); >>>> Matcher matcher = pattern.matcher(text); >>>> while (matcher.find()) { >>>> output.collect(key, new MyWritable(text)); >>>> } >>>> } >>>> } >>>> } >>>> >>>> package grep; >>>> >>>> import java.io.DataInput; >>>> import java.io.DataOutput; >>>> import java.io.IOException; >>>> import java.lang.reflect.Array; >>>> import java.util.HashMap; >>>> import java.util.Map; >>>> >>>> import org.apache.hadoop.conf.Configurable; >>>> import org.apache.hadoop.conf.Configuration; >>>> import org.apache.hadoop.conf.Configured; >>>> import org.apache.hadoop.io.UTF8; >>>> import org.apache.hadoop.io.Writable; >>>> import org.apache.hadoop.io.WritableFactories; >>>> >>>> public class MyWritable implements Writable, Configurable { >>>> >>>> private Class declaredClass; >>>> private Object instance; >>>> private Configuration conf; >>>> >>>> public MyWritable() { >>>> } >>>> >>>> public MyWritable(Object instance) { >>>> set(instance); >>>> } >>>> >>>> public MyWritable(Class declaredClass, Object instance) { >>>> this.declaredClass = declaredClass; >>>> this.instance = instance; >>>> } >>>> >>>> /** Return the instance, or null if none. */ >>>> public Object get() { >>>> return instance; >>>> } >>>> >>>> /** Return the class this is meant to be. */ >>>> public Class getDeclaredClass() { >>>> return declaredClass; >>>> } >>>> >>>> /** Reset the instance. */ >>>> public void set(Object instance) { >>>> this.declaredClass = instance.getClass(); >>>> this.instance = instance; >>>> } >>>> >>>> public String toString() { >>>> return "[ " + instance + " ]"; >>>> } >>>> >>>> public void readFields(DataInput in) throws IOException { >>>> readObject(in, this, this.conf); >>>> } >>>> >>>> public void write(DataOutput out) throws IOException { >>>> writeObject(out, instance, declaredClass, conf); >>>> } >>>> >>>> private static final Map<String, Class<?>> PRIMITIVE_NAMES = new >>>> HashMap<String, Class<?>>(); >>>> static { >>>> PRIMITIVE_NAMES.put("boolean", Boolean.TYPE); >>>> PRIMITIVE_NAMES.put("byte", Byte.TYPE); >>>> PRIMITIVE_NAMES.put("char", Character.TYPE); >>>> PRIMITIVE_NAMES.put("short", Short.TYPE); >>>> PRIMITIVE_NAMES.put("int", Integer.TYPE); >>>> PRIMITIVE_NAMES.put("long", Long.TYPE); >>>> PRIMITIVE_NAMES.put("float", Float.TYPE); >>>> PRIMITIVE_NAMES.put("double", Double.TYPE); >>>> PRIMITIVE_NAMES.put("void", Void.TYPE); >>>> } >>>> >>>> private static class NullInstance extends Configured implements >>>> Writable { >>>> private Class<?> declaredClass; >>>> >>>> public NullInstance() { >>>> super(null); >>>> } >>>> >>>> public NullInstance(Class declaredClass, Configuration conf) { >>>> super(conf); >>>> this.declaredClass = declaredClass; >>>> } >>>> >>>> public void readFields(DataInput in) throws IOException { >>>> String className = UTF8.readString(in); >>>> declaredClass = PRIMITIVE_NAMES.get(className); >>>> if (declaredClass == null) { >>>> try { >>>> declaredClass = getConf().getClassByName(className); >>>> } catch (ClassNotFoundException e) { >>>> throw new RuntimeException(e.toString()); >>>> } >>>> } >>>> } >>>> >>>> public void write(DataOutput out) throws IOException { >>>> UTF8.writeString(out, declaredClass.getName()); >>>> } >>>> } >>>> >>>> /** >>>> * Write a {@link Writable}, {@link String}, primitive type, or an >>>> array of >>>> * the preceding. >>>> */ >>>> public static void writeObject(DataOutput out, Object instance, >>>> Class declaredClass, Configuration conf) throws IOException { >>>> >>>> if (instance == null) { // null >>>> instance = new NullInstance(declaredClass, conf); >>>> declaredClass = Writable.class; >>>> } >>>> >>>> UTF8.writeString(out, declaredClass.getName()); // always write declared >>>> >>>> if (declaredClass.isArray()) { // array >>>> int length = Array.getLength(instance); >>>> out.writeInt(length); >>>> for (int i = 0; i < length; i++) { >>>> writeObject(out, Array.get(instance, i), >>>> declaredClass.getComponentType(), conf); >>>> } >>>> >>>> } else if (declaredClass == String.class) { // String >>>> UTF8.writeString(out, (String) instance); >>>> >>>> } else if (declaredClass.isPrimitive()) { // primitive type >>>> >>>> if (declaredClass == Boolean.TYPE) { // boolean >>>> out.writeBoolean(((Boolean) instance).booleanValue()); >>>> } else if (declaredClass == Character.TYPE) { // char >>>> out.writeChar(((Character) instance).charValue()); >>>> } else if (declaredClass == Byte.TYPE) { // byte >>>> out.writeByte(((Byte) instance).byteValue()); >>>> } else if (declaredClass == Short.TYPE) { // short >>>> out.writeShort(((Short) instance).shortValue()); >>>> } else if (declaredClass == Integer.TYPE) { // int >>>> out.writeInt(((Integer) instance).intValue()); >>>> } else if (declaredClass == Long.TYPE) { // long >>>> out.writeLong(((Long) instance).longValue()); >>>> } else if (declaredClass == Float.TYPE) { // float >>>> out.writeFloat(((Float) instance).floatValue()); >>>> } else if (declaredClass == Double.TYPE) { // double >>>> out.writeDouble(((Double) instance).doubleValue()); >>>> } else if (declaredClass == Void.TYPE) { // void >>>> } else { >>>> throw new IllegalArgumentException("Not a primitive: " >>>> + declaredClass); >>>> } >>>> } else if (declaredClass.isEnum()) { // enum >>>> UTF8.writeString(out, ((Enum) instance).name()); >>>> } else if (Writable.class.isAssignableFrom(declaredClass)) { // >>>> Writable >>>> UTF8.writeString(out, instance.getClass().getName()); >>>> ((Writable) instance).write(out); >>>> >>>> } else { >>>> throw new IOException("Can't write: " + instance + " as " >>>> + declaredClass); >>>> } >>>> } >>>> >>>> /** >>>> * Read a {@link Writable}, {@link String}, primitive type, or an >>>> array of >>>> * the preceding. >>>> */ >>>> public static Object readObject(DataInput in, Configuration conf) >>>> throws IOException { >>>> return readObject(in, null, conf); >>>> } >>>> >>>> /** >>>> * Read a {@link Writable}, {@link String}, primitive type, or an array >>>> of >>>> * the preceding. >>>> */ >>>> @SuppressWarnings("unchecked") >>>> public static Object readObject(DataInput in, MyWritable >>>> objectWritable, >>>> Configuration conf) throws IOException { >>>> String className = UTF8.readString(in); >>>> Class<?> declaredClass = PRIMITIVE_NAMES.get(className); >>>> if (declaredClass == null) { >>>> try { >>>> declaredClass = conf.getClassByName(className); >>>> } catch (ClassNotFoundException e) { >>>> throw new RuntimeException("readObject can't find class " >>>> + className, e); >>>> } >>>> } >>>> >>>> Object instance; >>>> >>>> if (declaredClass.isPrimitive()) { // primitive types >>>> >>>> if (declaredClass == Boolean.TYPE) { // boolean >>>> instance = Boolean.valueOf(in.readBoolean()); >>>> } else if (declaredClass == Character.TYPE) { // char >>>> instance = Character.valueOf(in.readChar()); >>>> } else if (declaredClass == Byte.TYPE) { // byte >>>> instance = Byte.valueOf(in.readByte()); >>>> } else if (declaredClass == Short.TYPE) { // short >>>> instance = Short.valueOf(in.readShort()); >>>> } else if (declaredClass == Integer.TYPE) { // int >>>> instance = Integer.valueOf(in.readInt()); >>>> } else if (declaredClass == Long.TYPE) { // long >>>> instance = Long.valueOf(in.readLong()); >>>> } else if (declaredClass == Float.TYPE) { // float >>>> instance = Float.valueOf(in.readFloat()); >>>> } else if (declaredClass == Double.TYPE) { // double >>>> instance = Double.valueOf(in.readDouble()); >>>> } else if (declaredClass == Void.TYPE) { // void >>>> instance = null; >>>> } else { >>>> throw new IllegalArgumentException("Not a primitive: " >>>> + declaredClass); >>>> } >>>> >>>> } else if (declaredClass.isArray()) { // array >>>> int length = in.readInt(); >>>> instance = Array.newInstance(declaredClass.getComponentType(), >>>> length); >>>> for (int i = 0; i < length; i++) { >>>> Array.set(instance, i, readObject(in, conf)); >>>> } >>>> >>>> } else if (declaredClass == String.class) { // String >>>> instance = UTF8.readString(in); >>>> } else if (declaredClass.isEnum()) { // enum >>>> instance = Enum.valueOf((Class<? extends Enum>) declaredClass, >>>> UTF8.readString(in)); >>>> } else { // Writable >>>> Class instanceClass = null; >>>> String str = ""; >>>> try { >>>> str = UTF8.readString(in); >>>> instanceClass = conf.getClassByName(str); >>>> } catch (ClassNotFoundException e) { >>>> throw new RuntimeException( >>>> "readObject can't find class " + str, e); >>>> } >>>> >>>> Writable writable = WritableFactories.newInstance(instanceClass, >>>> conf); >>>> writable.readFields(in); >>>> instance = writable; >>>> >>>> if (instanceClass == NullInstance.class) { // null >>>> declaredClass = ((NullInstance) instance).declaredClass; >>>> instance = null; >>>> } >>>> } >>>> >>>> if (objectWritable != null) { // store values >>>> objectWritable.declaredClass = declaredClass; >>>> objectWritable.instance = instance; >>>> } >>>> >>>> return instance; >>>> >>>> } >>>> >>>> public void setConf(Configuration conf) { >>>> this.conf = conf; >>>> } >>>> >>>> public Configuration getConf() { >>>> return this.conf; >>>> } >>>> } >>>> >>>> >>>> -- >>>> *-- >>>> -- Felipe Oliveira Gutierrez >>>> -- [email protected] >>>> -- https://sites.google.com/site/lipe82/Home/diaadia* >>>> >>> >>> >> >> >> -- >> *-- >> -- Felipe Oliveira Gutierrez >> -- [email protected] >> -- https://sites.google.com/site/lipe82/Home/diaadia* >> > >
