I'm not sure what your accum.extend(m) does, but in 18, the value records
are reused (rather than in a previous version where a new copy was made). So
if you are storing a reference to your values, note that they all are going
to point to the same thing unless you make a copy of it.

Try:
AggregateRecord m = new AggregateRecord(values.next());

instead of:
AggregateRecord m = values.next();

You may have to write a copy constructor for this. Consequently, in your
readFields, you need to make sure you initialize all the values again
because it could be holding the state from the previous next() call.

Let me know if that helps.

On Mon, Mar 2, 2009 at 2:38 PM, Malcolm Matalka <
mmata...@millennialmedia.com> wrote:

> Sure.
>
> Note:
> I am using my own class for keys and values in this.  The key is called
> StringArrayWritable and it implements WritableComparable.  The value is
> called AggregateRecord and it implements Writable.  I have done some
> debugging and here is what I have found:
> While running in local mode I get 2 of each entry after reducing.  I put
> some debugging in my StringArrayWritable readFeilds method and I see it
> is being called twice.  In the output I see it is called, then reduced,
> then a bit later called again and reduced again.  I output the stack
> trace in both situations and I get:
>
> SAW READ: 3d0b28bd91d2b4e76bbcc0b3ae6a91a5|
> java.lang.Throwable
>        at foo.io.StringArrayWritable.readFields(Unknown Source)
>        at
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ
> er.deserialize(WritableSerialization.java:67)
>        at
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ
> er.deserialize(WritableSerialization.java:40)
>        at
> org.apache.hadoop.mapred.Task$ValuesIterator.readNextKey(Task.java:724)
>        at
> org.apache.hadoop.mapred.Task$ValuesIterator.<init>(Task.java:660)
>        at
> org.apache.hadoop.mapred.Task$CombineValuesIterator.<init>(Task.java:751
> )
>        at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.combineAndSpill(MapTask
> .java:900)
>        at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.ja
> va:785)
>        at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:698)
>        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:228)
>        at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:157)
> [ Some other records ]
> SAW READ: 3d0b28bd91d2b4e76bbcc0b3ae6a91a5|
> java.lang.Throwable
>        at foo.io.StringArrayWritable.readFields(Unknown Source)
>        at
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ
> er.deserialize(WritableSerialization.java:67)
>        at
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializ
> er.deserialize(WritableSerialization.java:40)
>        at
> org.apache.hadoop.mapred.Task$ValuesIterator.readNextKey(Task.java:724)
>        at
> org.apache.hadoop.mapred.Task$ValuesIterator.<init>(Task.java:660)
>        at
> org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.<init>(ReduceTa
> sk.java:221)
>        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:312)
>        at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:201)
>
> [ Read of another record ]
> REDUCER: 3d0b28bd91d2b4e76bbcc0b3ae6a91a5|
> SAW WRITE: 3d0b28bd91d2b4e76bbcc0b3ae6a91a5|
> [ Some other records ]
>
>
> The obvious difference is the call to sortAndSpill and combineAndSpill.
>
> Possibly locations I think could be wrong:
> Hadoop code - I am using an older version, 18.1, perhaps this has been
> fixed
> My key/value Writable implementation
> My reduce/combiner implementation.
>
> Here is my combiner code.  mapRequestData does nothing particularly
> interesting other than convert some data in the aggregate:
>        public void reduce(StringArrayWritable key,
> Iterator<AggregateRecord> values,
>                           OutputCollector<StringArrayWritable,
> AggregateRecord> output, Reporter reporter) throws IOException {
>            /*
>             * Add som checking here to make sure all the keys
> *ACTUALLY* match
>             */
>            AggregateRecord accum = new AggregateRecord();
>            while (values.hasNext()) {
>                AggregateRecord m = values.next();
>                accum.extend(m);
>            }
>            AggregateRecord ar = mapRequestData(accum);
>            if(ar != null) {
>                System.out.println("REDUCER: " + key + " " +
> key.hashCode() + " " + ar);
>                output.collect(new StringArrayWritable(key.content),
> ar);
>            }
>        }
>
>
> Thank you, let me know if you need anything else.
>
>
> -----Original Message-----
> From: Ryan Shih [mailto:ryan.s...@gmail.com]
> Sent: Monday, March 02, 2009 17:02
> To: core-user@hadoop.apache.org
> Subject: Re: Potential race condition (Hadoop 18.3)
>
> Koji - That makes a lot of sense. The two tasks are probably stepping
> over
> each other. I'll give it a try and let you know how it goes.
>
> Malcolm - if you turned off speculative execution and are still getting
> the
> problem, it doesn't sound the same. Do you want to do a cut&paste of
> your
> reduce code and I'll see if I can spot anything suspicious?
>
> On Mon, Mar 2, 2009 at 1:15 PM, Malcolm Matalka <
> mmata...@millennialmedia.com> wrote:
>
> > I have a situation which may be related.  I am running hadoop 0.18.1.
> I
> > am on a cluster with 5 machines and testing on very small input of 10
> > lines.  Mapper produces either 1 or 0 output per line of input yet
> > somehow I get 18 lines of output from the reducer.  For example I have
> > one input where the key is:
> > fd349fc441ff5e726577aeb94cceb1e4
> >
> > However, I added a print to the reducer to print keys right before
> > calling output.collect and I have 3 instances of this key being
> printed.
> >
> > I have turned speculative execution off and still get this.
> >
> > Does this sound related?  A known bug?  Something I'm missing?  Fixed
> in
> > 19.1?
> >
> > - Malcolm
> >
> >
> > -----Original Message-----
> > From: Koji Noguchi [mailto:knogu...@yahoo-inc.com]
> > Sent: Monday, March 02, 2009 15:59
> > To: core-user@hadoop.apache.org
> > Subject: RE: Potential race condition (Hadoop 18.3)
> >
> > Ryan,
> >
> > If you're using getOutputPath, try replacing it with
> getWorkOutputPath.
> >
> >
> http://hadoop.apache.org/core/docs/r0.18.3/api/org/apache/hadoop/mapred/
> >
> FileOutputFormat.html#getWorkOutputPath(org.apache.hadoop.mapred.JobConf
> <http://hadoop.apache.org/core/docs/r0.18.3/api/org/apache/hadoop/mapred
> /%0AFileOutputFormat.html#getWorkOutputPath%28org.apache.hadoop.mapred.J
> obConf>
> > )
> >
> > Koji
> >
> > -----Original Message-----
> > From: Ryan Shih [mailto:ryan.s...@gmail.com]
> > Sent: Monday, March 02, 2009 11:01 AM
> > To: core-user@hadoop.apache.org
> > Subject: Potential race condition (Hadoop 18.3)
> >
> > Hi - I'm not sure yet, but I think I might be hitting a race condition
> > in
> > Hadoop 18.3. What seems to happen is that in the reduce phase, some of
> > my
> > tasks perform speculative execution but when the initial task
> completes
> > successfully, it sends a kill to the new task started. After all is
> said
> > and
> > done, perhaps one in every five or ten which kill their second task
> ends
> > up
> > with zero or truncated output.  When I code it to turn off speculative
> > execution, the problem goes away. Are there known race conditions that
> I
> > should be aware of around this area?
> >
> > Thanks in advance,
> > Ryan
> >
>

Reply via email to