hi devraj,

so, i researched the topic with the counters further with some success.
for one i can reproduce it now with a Test.

i am waiting for the password for my JIRA account to get started there - 
somehow i didnt get the password after registration, i sent a mail to 
owen.
i am not familiar with the proceedings on JIRA / ASF... so if you see 
mistakes please guide me. thanks. 

if you are interested, try out the attached testcase. i figured it is some 
timing issue within the localjobrunner (in my case, dont know about 
distributed running yet)  and i will try to provide a patch.
can i now submit this testcase trough JIRA? do i need to change something 
on it?

kind regards,

ud







"Devaraj Das" <[EMAIL PROTECTED]> 
04/16/2008 01:18 PM
Please respond to
core-user@hadoop.apache.org


To
<core-user@hadoop.apache.org>
cc

Subject
RE: Counters giving double values






Pls file a jira for the counter updates part. It will be excellent if you
can also attach a testcase that can reproduce the problem (maybe a 
stripped
down version of your app or something). 

> -----Original Message-----
> From: [EMAIL PROTECTED] [mailto:[EMAIL PROTECTED] 
> Sent: Wednesday, April 16, 2008 4:35 PM
> To: core-user@hadoop.apache.org
> Subject: RE: Counters giving double values
> 
> hadoop 0.16.2
> (and as i remember, i had the same issue with 0.16.0)
> 
> Yes, the final data output at the end IS CORRECT. 
> only the counter values are wrong.
> 
> i didnt try to run it in a distributed environment yet. only local.
> 
> 
> 
> 
> 
> 
> "Devaraj Das" <[EMAIL PROTECTED]>
> 04/16/2008 12:56 PM
> Please respond to
> core-user@hadoop.apache.org
> 
> 
> To
> <core-user@hadoop.apache.org>
> cc
> 
> Subject
> RE: Counters giving double values
> 
> 
> 
> 
> 
> 
> Also, in those cases where you see wrong counter values, did 
> you validate
> the final (reduce) output for correctness (I am just trying 
> to see whether
> the problem is with the Counter updates). 
> 
> > -----Original Message-----
> > From: Devaraj Das [mailto:[EMAIL PROTECTED] 
> > Sent: Wednesday, April 16, 2008 4:23 PM
> > To: core-user@hadoop.apache.org
> > Subject: RE: Counters giving double values
> > 
> > Thanks for the detailed answer. Which hadoop version are you 
> > on? If you are confident that it is not a problem with your 
> > app, pls raise a jira.
> > 
> > 
> >   _____ 
> > 
> > From: [EMAIL PROTECTED] [mailto:[EMAIL PROTECTED]
> > Sent: Wednesday, April 16, 2008 3:25 PM
> > To: core-user@hadoop.apache.org
> > Subject: RE: Counters giving double values
> > 
> > 
> > 
> > Thanks so far. 
> > 
> > key and values are custom implementations. 
> > 
> > key implements WritableComparable 
> > value extends VersionedWritable 
> > 
> > btw. The only problem i encounter is that the Counter values 
> > are wrong. If i
> > check the records in the MapFile (re-read it) which is 
> > written as the output
> > of the mapred job, the amount of records is correct and 
> > represents the halve
> > of the reported counter value. 
> > the same applies for the results of the operations which are 
> > carried out in
> > the reduce(). everything is correct, except the counter values. 
> > 
> > the whole thing happens only sometimes. 
> > 
> > 
> > Key serializing / Deserializing, i guess you want to see this 
> > part of the
> > code: 
> > 
> >         public int language; 
> >         public String term; 
> > 
> >         public void readFields(DataInput in) throws IOException { 
> >                 language = in.readInt(); 
> >                 term =         Text.readString(in).toString(); 
> >         } 
> > 
> >         public void write(DataOutput out) throws IOException { 
> >                 out.writeInt(language); 
> >                 Text.writeString(out, term); 
> >         } 
> > 
> > 
> > 
> > 
> > 
> 
> 
> 


Index: 
/tmp/hadoop-core-trunk/trunk/src/test/org/apache/hadoop/mapred/TestCounterValues.java
===================================================================
--- 
/tmp/hadoop-core-trunk/trunk/src/test/org/apache/hadoop/mapred/TestCounterValues.java
       (revision 0)
+++ 
/tmp/hadoop-core-trunk/trunk/src/test/org/apache/hadoop/mapred/TestCounterValues.java
       (revision 0)
@@ -0,0 +1,176 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Counters.Group;
+
+public class TestCounterValues extends TestCase {
+       enum MyCounter {
+               SOME_COUNTER
+       };
+
+       public static final Log LOG = LogFactory
+                       
.getLog(org.apache.hadoop.mapred.TestCounterValues.class);
+
+       public static int numtries = 0;
+
+       public static int iterationsToReproduceBug = 100;
+
+       public static Random random = new Random();
+
+       public static String inputPath = "in";
+
+       public static String outputPath = "out";
+
+       private static Configuration conf = new Configuration();
+
+       private static FileSystem fs;
+
+       public static JobClient jclient;
+
+       public static JobConf jconf = new JobConf();
+
+       protected void setUp() throws Exception {
+               super.setUp();
+
+               fs = FileSystem.get(conf);
+               fs.mkdirs(new Path("in"));
+               ;
+
+               jconf = getJobConf(inputPath, new Path(outputPath));
+               jclient = new JobClient(jconf);
+       }
+
+       protected void tearDown() throws Exception {
+               jclient.close();
+               fs.delete(new Path(inputPath), true);
+               fs.delete(new Path(outputPath), true);
+       }
+
+       public void writeInputFile(int numWords) {
+               LOG.info("creating input file containing " + numWords + " 
words");
+               try {
+
+                       FSDataOutputStream out = fs.create(new Path(inputPath + 
"/"
+                                       + "in.txt"));
+
+                       for (int i = 0; i < numWords; i++) {
+                               out.writeUTF(new Integer(i).toString() + "\n");
+                       }
+                       out.close();
+               } catch (Exception e) {
+                       System.err.println("Error: " + e.getMessage());
+               }
+       }
+
+       public JobConf getJobConf(String in, Path out) {
+               JobConf conf = new JobConf();
+               conf.setJobName("wordcount");
+
+               conf.setOutputKeyClass(Text.class);
+               conf.setOutputValueClass(IntWritable.class);
+               conf.setMapperClass(MapClass.class);
+               conf.setReducerClass(Reduce.class);
+
+               FileInputFormat.setInputPaths(conf, in.toString());
+               FileOutputFormat.setOutputPath(conf, out);
+               return conf;
+       }
+
+       public void testWordCount() throws IOException {
+
+               for (int z = 0; z < iterationsToReproduceBug; z++) {
+                       LOG.debug("");
+                       int wordNum = random.nextInt(1000000);
+                       LOG.info("=================== iteration " + z + ": " + 
wordNum
+                                       + " records");
+
+                       fs.delete(new Path(outputPath), true);
+                       writeInputFile(wordNum);
+                       RunningJob rj = jclient.submitJob(jconf);
+                       while (!rj.isComplete()) {
+                               try {
+                                       Thread.sleep(2000);
+                               } catch (InterruptedException e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               }
+                               // System.out.print(".");
+                       }
+
+                       Counters c = rj.getCounters();
+                       if (rj.isSuccessful()) {
+                               assertEquals(wordNum, 
c.getCounter(MyCounter.SOME_COUNTER));
+                       } else {
+                               LOG.debug("job not successfull");
+                       }
+                       long mapout = 0;
+                       long redin = 0;
+                       for (Group group : c) {
+                               if (group.getDisplayName().equals("Map-Reduce 
Framework")) {
+                                       for (Counter counter : group) {
+                                               if 
(counter.getDisplayName().equals(
+                                                               "Map output 
records")) {
+                                                       mapout = 
counter.getCounter();
+                                               }
+                                               if 
(counter.getDisplayName().equals(
+                                                               "Reduce input 
records")) {
+                                                       redin = 
counter.getCounter();
+                                               }
+                                       }
+                               }
+                       }
+
+                       assertEquals(redin, mapout);
+
+                       LOG.info("=================== iteration " + z + " END");
+
+               }
+       }
+
+       public static class Reduce extends MapReduceBase implements
+                       Reducer<Text, IntWritable, Text, IntWritable> {
+
+               public void reduce(Text key, Iterator<IntWritable> values,
+                               OutputCollector<Text, IntWritable> output, 
Reporter reporter)
+                               throws IOException {
+                       int sum = 0;
+                       while (values.hasNext()) {
+                               reporter.incrCounter(MyCounter.SOME_COUNTER, 1);
+                               sum += values.next().get();
+                       }
+                       output.collect(key, new IntWritable(sum));
+               }
+       }
+
+       public static class MapClass extends MapReduceBase implements
+                       Mapper<LongWritable, Text, Text, IntWritable> {
+
+               private final static IntWritable one = new IntWritable(1);
+
+               private Text word = new Text();
+
+               public void map(LongWritable key, Text value,
+                               OutputCollector<Text, IntWritable> output, 
Reporter reporter)
+                               throws IOException {
+                       String line = value.toString();
+                       word.set(line);
+                       output.collect(word, one);
+               }
+       }
+
+}

Reply via email to