Sorry, I think I didnt understand, Does NullWritable go to replate MyWritable? But this is may value. My key is a Text. Regards, Felipe
On Thu, Jul 25, 2013 at 4:07 PM, Shahab Yunus <[email protected]>wrote: > I think uou can use NullWritable as key. > > http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/NullWritable.html > > > Regards, > Shahab > > > On Thu, Jul 25, 2013 at 2:58 PM, Felipe Gutierrez < > [email protected]> wrote: > >> I did a MapReduce program to execute a Grep function. I know there is a >> Grep function at hadoop examples, but I want to make my Grep MapReduce to >> explain to other. >> My problem is that my out put shows the key/value. I want to show only >> the value, since I saved the line number at this value. Example: >> >> 00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689] wlan0: >> associated ] >> >> Here is my code. Thanks, >> Felipe >> >> package grep; >> >> import java.io.File; >> import java.io.FileReader; >> import java.io.LineNumberReader; >> >> import org.apache.hadoop.fs.Path; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.mapred.FileInputFormat; >> import org.apache.hadoop.mapred.FileOutputFormat; >> import org.apache.hadoop.mapred.JobClient; >> import org.apache.hadoop.mapred.JobConf; >> import org.apache.hadoop.mapred.TextInputFormat; >> import org.apache.hadoop.mapred.TextOutputFormat; >> >> public class Main { >> >> public static void main(String[] args) throws Exception { >> >> if (args == null || args.length != 3) { >> System.err.println("Usage: Main <in> <out> <regex>"); >> System.exit(-1); >> } >> >> JobConf conf = new JobConf(Main.class); >> conf.setJobName("grep"); >> >> String input = args[0]; >> String output = args[1]; >> String regex = args[2]; >> >> File arquivoLeitura = new File(input); >> LineNumberReader linhaLeitura = new LineNumberReader(new FileReader( >> arquivoLeitura)); >> linhaLeitura.skip(arquivoLeitura.length()); >> String lines = String.valueOf(linhaLeitura.getLineNumber() + 1); >> conf.set("grep.regex", regex); >> conf.set("grep.lines", lines); >> >> conf.setOutputKeyClass(Text.class); >> conf.setOutputValueClass(MyWritable.class); >> >> conf.setMapperClass(GrepMapper.class); >> conf.setCombinerClass(GrepReducer.class); >> conf.setReducerClass(GrepReducer.class); >> >> conf.setInputFormat(TextInputFormat.class); >> conf.setOutputFormat(TextOutputFormat.class); >> >> FileInputFormat.setInputPaths(conf, new Path(input)); >> FileOutputFormat.setOutputPath(conf, new Path(output)); >> >> JobClient.runJob(conf); >> } >> } >> >> package grep; >> >> import java.io.IOException; >> import java.text.DecimalFormat; >> >> import org.apache.hadoop.io.LongWritable; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.mapred.JobConf; >> import org.apache.hadoop.mapred.MapReduceBase; >> import org.apache.hadoop.mapred.Mapper; >> import org.apache.hadoop.mapred.OutputCollector; >> import org.apache.hadoop.mapred.Reporter; >> >> public class GrepMapper extends MapReduceBase implements >> Mapper<LongWritable, Text, Text, MyWritable> { >> >> private static long line = 1; >> private static long n = 0; >> private static long divisor = 1; >> private static long qtdLines = 0; >> private Text k = new Text(); >> >> public void map(LongWritable key, Text value, >> OutputCollector<Text, MyWritable> output, Reporter reporter) >> throws IOException { >> String str = value.toString(); >> MyWritable text = new MyWritable("line " + line + " : " + str); >> if ((line % divisor) == 0) { >> n++; >> } >> k.set(customFormat("00000", n)); >> output.collect(k, text); >> line++; >> } >> >> @Override >> public void configure(JobConf job) { >> qtdLines = Long.parseLong(job.get("grep.lines")); >> if (qtdLines <= 500) { >> divisor = 10; >> } else if (qtdLines <= 1000) { >> divisor = 20; >> } else if (qtdLines <= 1500) { >> divisor = 30; >> } else if (qtdLines <= 2000) { >> divisor = 40; >> } else if (qtdLines <= 2500) { >> divisor = 50; >> } else if (qtdLines <= 3000) { >> divisor = 60; >> } else if (qtdLines <= 3500) { >> divisor = 70; >> } else if (qtdLines <= 4000) { >> divisor = 80; >> } else if (qtdLines <= 4500) { >> divisor = 90; >> } else if (qtdLines <= 5000) { >> divisor = 100; >> } else if (qtdLines <= 5500) { >> divisor = 110; >> } else if (qtdLines <= 6000) { >> divisor = 120; >> } else if (qtdLines <= 6500) { >> divisor = 130; >> } else if (qtdLines <= 7000) { >> divisor = 140; >> } >> } >> >> static public String customFormat(String pattern, double value) { >> DecimalFormat myFormatter = new DecimalFormat(pattern); >> return myFormatter.format(value); >> } >> } >> >> package grep; >> >> import java.io.IOException; >> import java.util.Iterator; >> import java.util.regex.Matcher; >> import java.util.regex.Pattern; >> >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.mapred.JobConf; >> import org.apache.hadoop.mapred.MapReduceBase; >> import org.apache.hadoop.mapred.OutputCollector; >> import org.apache.hadoop.mapred.Reducer; >> import org.apache.hadoop.mapred.Reporter; >> >> public class GrepReducer extends MapReduceBase implements >> Reducer<Text, MyWritable, Text, MyWritable> { >> >> private Pattern pattern; >> >> @Override >> public void configure(JobConf job) { >> pattern = Pattern.compile(job.get("grep.regex")); >> } >> >> public void reduce(Text key, Iterator<MyWritable> values, >> OutputCollector<Text, MyWritable> output, Reporter reporter) >> throws IOException { >> >> while (values.hasNext()) { >> String text = (String) values.next().get(); >> Matcher matcher = pattern.matcher(text); >> while (matcher.find()) { >> output.collect(key, new MyWritable(text)); >> } >> } >> } >> } >> >> package grep; >> >> import java.io.DataInput; >> import java.io.DataOutput; >> import java.io.IOException; >> import java.lang.reflect.Array; >> import java.util.HashMap; >> import java.util.Map; >> >> import org.apache.hadoop.conf.Configurable; >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.conf.Configured; >> import org.apache.hadoop.io.UTF8; >> import org.apache.hadoop.io.Writable; >> import org.apache.hadoop.io.WritableFactories; >> >> public class MyWritable implements Writable, Configurable { >> >> private Class declaredClass; >> private Object instance; >> private Configuration conf; >> >> public MyWritable() { >> } >> >> public MyWritable(Object instance) { >> set(instance); >> } >> >> public MyWritable(Class declaredClass, Object instance) { >> this.declaredClass = declaredClass; >> this.instance = instance; >> } >> >> /** Return the instance, or null if none. */ >> public Object get() { >> return instance; >> } >> >> /** Return the class this is meant to be. */ >> public Class getDeclaredClass() { >> return declaredClass; >> } >> >> /** Reset the instance. */ >> public void set(Object instance) { >> this.declaredClass = instance.getClass(); >> this.instance = instance; >> } >> >> public String toString() { >> return "[ " + instance + " ]"; >> } >> >> public void readFields(DataInput in) throws IOException { >> readObject(in, this, this.conf); >> } >> >> public void write(DataOutput out) throws IOException { >> writeObject(out, instance, declaredClass, conf); >> } >> >> private static final Map<String, Class<?>> PRIMITIVE_NAMES = new >> HashMap<String, Class<?>>(); >> static { >> PRIMITIVE_NAMES.put("boolean", Boolean.TYPE); >> PRIMITIVE_NAMES.put("byte", Byte.TYPE); >> PRIMITIVE_NAMES.put("char", Character.TYPE); >> PRIMITIVE_NAMES.put("short", Short.TYPE); >> PRIMITIVE_NAMES.put("int", Integer.TYPE); >> PRIMITIVE_NAMES.put("long", Long.TYPE); >> PRIMITIVE_NAMES.put("float", Float.TYPE); >> PRIMITIVE_NAMES.put("double", Double.TYPE); >> PRIMITIVE_NAMES.put("void", Void.TYPE); >> } >> >> private static class NullInstance extends Configured implements Writable { >> private Class<?> declaredClass; >> >> public NullInstance() { >> super(null); >> } >> >> public NullInstance(Class declaredClass, Configuration conf) { >> super(conf); >> this.declaredClass = declaredClass; >> } >> >> public void readFields(DataInput in) throws IOException { >> String className = UTF8.readString(in); >> declaredClass = PRIMITIVE_NAMES.get(className); >> if (declaredClass == null) { >> try { >> declaredClass = getConf().getClassByName(className); >> } catch (ClassNotFoundException e) { >> throw new RuntimeException(e.toString()); >> } >> } >> } >> >> public void write(DataOutput out) throws IOException { >> UTF8.writeString(out, declaredClass.getName()); >> } >> } >> >> /** >> * Write a {@link Writable}, {@link String}, primitive type, or an array >> of >> * the preceding. >> */ >> public static void writeObject(DataOutput out, Object instance, >> Class declaredClass, Configuration conf) throws IOException { >> >> if (instance == null) { // null >> instance = new NullInstance(declaredClass, conf); >> declaredClass = Writable.class; >> } >> >> UTF8.writeString(out, declaredClass.getName()); // always write declared >> >> if (declaredClass.isArray()) { // array >> int length = Array.getLength(instance); >> out.writeInt(length); >> for (int i = 0; i < length; i++) { >> writeObject(out, Array.get(instance, i), >> declaredClass.getComponentType(), conf); >> } >> >> } else if (declaredClass == String.class) { // String >> UTF8.writeString(out, (String) instance); >> >> } else if (declaredClass.isPrimitive()) { // primitive type >> >> if (declaredClass == Boolean.TYPE) { // boolean >> out.writeBoolean(((Boolean) instance).booleanValue()); >> } else if (declaredClass == Character.TYPE) { // char >> out.writeChar(((Character) instance).charValue()); >> } else if (declaredClass == Byte.TYPE) { // byte >> out.writeByte(((Byte) instance).byteValue()); >> } else if (declaredClass == Short.TYPE) { // short >> out.writeShort(((Short) instance).shortValue()); >> } else if (declaredClass == Integer.TYPE) { // int >> out.writeInt(((Integer) instance).intValue()); >> } else if (declaredClass == Long.TYPE) { // long >> out.writeLong(((Long) instance).longValue()); >> } else if (declaredClass == Float.TYPE) { // float >> out.writeFloat(((Float) instance).floatValue()); >> } else if (declaredClass == Double.TYPE) { // double >> out.writeDouble(((Double) instance).doubleValue()); >> } else if (declaredClass == Void.TYPE) { // void >> } else { >> throw new IllegalArgumentException("Not a primitive: " >> + declaredClass); >> } >> } else if (declaredClass.isEnum()) { // enum >> UTF8.writeString(out, ((Enum) instance).name()); >> } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable >> UTF8.writeString(out, instance.getClass().getName()); >> ((Writable) instance).write(out); >> >> } else { >> throw new IOException("Can't write: " + instance + " as " >> + declaredClass); >> } >> } >> >> /** >> * Read a {@link Writable}, {@link String}, primitive type, or an array >> of >> * the preceding. >> */ >> public static Object readObject(DataInput in, Configuration conf) >> throws IOException { >> return readObject(in, null, conf); >> } >> >> /** >> * Read a {@link Writable}, {@link String}, primitive type, or an array of >> * the preceding. >> */ >> @SuppressWarnings("unchecked") >> public static Object readObject(DataInput in, MyWritable objectWritable, >> Configuration conf) throws IOException { >> String className = UTF8.readString(in); >> Class<?> declaredClass = PRIMITIVE_NAMES.get(className); >> if (declaredClass == null) { >> try { >> declaredClass = conf.getClassByName(className); >> } catch (ClassNotFoundException e) { >> throw new RuntimeException("readObject can't find class " >> + className, e); >> } >> } >> >> Object instance; >> >> if (declaredClass.isPrimitive()) { // primitive types >> >> if (declaredClass == Boolean.TYPE) { // boolean >> instance = Boolean.valueOf(in.readBoolean()); >> } else if (declaredClass == Character.TYPE) { // char >> instance = Character.valueOf(in.readChar()); >> } else if (declaredClass == Byte.TYPE) { // byte >> instance = Byte.valueOf(in.readByte()); >> } else if (declaredClass == Short.TYPE) { // short >> instance = Short.valueOf(in.readShort()); >> } else if (declaredClass == Integer.TYPE) { // int >> instance = Integer.valueOf(in.readInt()); >> } else if (declaredClass == Long.TYPE) { // long >> instance = Long.valueOf(in.readLong()); >> } else if (declaredClass == Float.TYPE) { // float >> instance = Float.valueOf(in.readFloat()); >> } else if (declaredClass == Double.TYPE) { // double >> instance = Double.valueOf(in.readDouble()); >> } else if (declaredClass == Void.TYPE) { // void >> instance = null; >> } else { >> throw new IllegalArgumentException("Not a primitive: " >> + declaredClass); >> } >> >> } else if (declaredClass.isArray()) { // array >> int length = in.readInt(); >> instance = Array.newInstance(declaredClass.getComponentType(), >> length); >> for (int i = 0; i < length; i++) { >> Array.set(instance, i, readObject(in, conf)); >> } >> >> } else if (declaredClass == String.class) { // String >> instance = UTF8.readString(in); >> } else if (declaredClass.isEnum()) { // enum >> instance = Enum.valueOf((Class<? extends Enum>) declaredClass, >> UTF8.readString(in)); >> } else { // Writable >> Class instanceClass = null; >> String str = ""; >> try { >> str = UTF8.readString(in); >> instanceClass = conf.getClassByName(str); >> } catch (ClassNotFoundException e) { >> throw new RuntimeException( >> "readObject can't find class " + str, e); >> } >> >> Writable writable = WritableFactories.newInstance(instanceClass, >> conf); >> writable.readFields(in); >> instance = writable; >> >> if (instanceClass == NullInstance.class) { // null >> declaredClass = ((NullInstance) instance).declaredClass; >> instance = null; >> } >> } >> >> if (objectWritable != null) { // store values >> objectWritable.declaredClass = declaredClass; >> objectWritable.instance = instance; >> } >> >> return instance; >> >> } >> >> public void setConf(Configuration conf) { >> this.conf = conf; >> } >> >> public Configuration getConf() { >> return this.conf; >> } >> } >> >> >> -- >> *-- >> -- Felipe Oliveira Gutierrez >> -- [email protected] >> -- https://sites.google.com/site/lipe82/Home/diaadia* >> > > -- *-- -- Felipe Oliveira Gutierrez -- [email protected] -- https://sites.google.com/site/lipe82/Home/diaadia*
