Brandon Vargo created CRUNCH-486:
------------------------------------

             Summary: Join with custom Writable PType registered using 
Writables.registerComparable NPEs during shuffle
                 Key: CRUNCH-486
                 URL: https://issues.apache.org/jira/browse/CRUNCH-486
             Project: Crunch
          Issue Type: Bug
          Components: Core
    Affects Versions: 0.11.0
            Reporter: Brandon Vargo
            Assignee: Josh Wills
            Priority: Minor


When joining two PTables on a key that is a custom writable PType, the shuffler 
will fail with the following NullPointerException under Hadoop2 if the custom 
type has been registered using Writables.registerComparable. This happens 
regardless of whether a specific integer code is provided or the default 
hashCode()-based value is used.

{noformat}
org.apache.hadoop.mapred.YarnChild: Exception running child : 
org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: Error while doing 
final merge 
        at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:160)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
Caused by: java.lang.NullPointerException
        at java.lang.Class.isAssignableFrom(Native Method)
        at 
org.apache.crunch.types.writable.TupleWritable$Comparator.compareField(TupleWritable.java:317)
        at 
org.apache.crunch.types.writable.TupleWritable$Comparator.compare(TupleWritable.java:284)
        at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:578)
        at org.apache.hadoop.util.PriorityQueue.upHeap(PriorityQueue.java:128)
        at org.apache.hadoop.util.PriorityQueue.put(PriorityQueue.java:55)
        at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:669)
        at org.apache.hadoop.mapred.Merger.merge(Merger.java:193)
        at 
org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.finalMerge(MergeManagerImpl.java:804)
        at 
org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.close(MergeManagerImpl.java:369)
        at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:158)
        ... 6 more
{noformat}

It appears that the Writables.WRITABLE_CODES entries are not deserialized from 
the configuration during the shuffle phase of a join until 
TupleWritable.setConf() is called. However, because TupleWritable.Comparator is 
registered as a raw comparator for TupleWritable, the shuffler uses the 
comparator without instantiating or configuring a TupleWritable instance. As a 
result, the type codes for the custom types are not available when the 
comparator starts to run.

HADOOP-10686 made WritableComparator implement Configurable, but this was not 
released until Hadoop 2.5. If I build Crunch against Hadoop 2.5 and copy 
TupleWritable's setConf() function to TupleWritable.Comparator, then the 
shuffle works as expected. However, since Crunch currently targets Hadoop 2.2, 
this does not work for the current version of Crunch.

As as a workaround, it appears that if the 
{{mapreduce.job.output.key.comparator.class}} property is set in the 
configuration, then the instance is created in JobConf.getOutputKeyComparator() 
using ReflectionUtils instead of using the WritableComparator registration. 
ReflectionUtils will pass the configuration to anything that implements 
Configurable, so setting {{mapreduce.job.output.key.comparator.class}} to 
TupleWritable.Comparator and implementing Configurable might work for Hadoop 
versions older than 2.5. I have yet to try this, though, and I have not looked 
into Hadoop1 to see if this would also work there.

If the shuffle is able to register the type codes via either method above, then 
there is one small secondary issue that I hit: Writables.registerComparable 
checks if the type code is already present in the map; if the type code is 
already in use, then it throws an exception, even if the class being registered 
is the same as the existing class. With the type codes being initialized during 
the shuffle phase, any later call to registerComparable for the same type code 
and class will fail. I currently have my registerComparable call in a static 
initialization block for my PType, so it is called whenever my writable type is 
first used under Crunch; in this case, it happens when the reduce phase starts. 
Checking to see if the class being registered and the existing class are equal 
inside of registerComparable before throwing an error, similar to the one that 
is in Guava's AbstractBiMap, prevents this exception from being thrown.

The above was happening using 0.11.0-hadoop2 on Hadoop 2.5.0 (CDH 5.2). The 
modifications I mention above were made on top of {{d4f23c4}} and also tested 
on CDH 5.2.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to