[
https://issues.apache.org/jira/browse/CRUNCH-486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14268502#comment-14268502
]
Josh Wills commented on CRUNCH-486:
-----------------------------------
Wow, that was an awesome JIRA report-- thanks! Fixing the small secondary issue
should be no problem; I'll try the mapreduce.job.output.key.comparator.class
trick and see if it allows us to hack around this, if only for 2.0 through 2.4
versions.
> Join with custom Writable PType registered using Writables.registerComparable
> NPEs during shuffle
> -------------------------------------------------------------------------------------------------
>
> Key: CRUNCH-486
> URL: https://issues.apache.org/jira/browse/CRUNCH-486
> Project: Crunch
> Issue Type: Bug
> Components: Core
> Affects Versions: 0.11.0
> Reporter: Brandon Vargo
> Assignee: Josh Wills
> Priority: Minor
>
> When joining two PTables on a key that is a custom writable PType, the
> shuffler will fail with the following NullPointerException under Hadoop2 if
> the custom type has been registered using Writables.registerComparable. This
> happens regardless of whether a specific integer code is provided or the
> default hashCode()-based value is used.
> {noformat}
> org.apache.hadoop.mapred.YarnChild: Exception running child :
> org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: Error while
> doing final merge
> at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:160)
> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
> at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
> Caused by: java.lang.NullPointerException
> at java.lang.Class.isAssignableFrom(Native Method)
> at
> org.apache.crunch.types.writable.TupleWritable$Comparator.compareField(TupleWritable.java:317)
> at
> org.apache.crunch.types.writable.TupleWritable$Comparator.compare(TupleWritable.java:284)
> at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:578)
> at org.apache.hadoop.util.PriorityQueue.upHeap(PriorityQueue.java:128)
> at org.apache.hadoop.util.PriorityQueue.put(PriorityQueue.java:55)
> at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:669)
> at org.apache.hadoop.mapred.Merger.merge(Merger.java:193)
> at
> org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.finalMerge(MergeManagerImpl.java:804)
> at
> org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.close(MergeManagerImpl.java:369)
> at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:158)
> ... 6 more
> {noformat}
> It appears that the Writables.WRITABLE_CODES entries are not deserialized
> from the configuration during the shuffle phase of a join until
> TupleWritable.setConf() is called. However, because TupleWritable.Comparator
> is registered as a raw comparator for TupleWritable, the shuffler uses the
> comparator without instantiating or configuring a TupleWritable instance. As
> a result, the type codes for the custom types are not available when the
> comparator starts to run.
> HADOOP-10686 made WritableComparator implement Configurable, but this was not
> released until Hadoop 2.5. If I build Crunch against Hadoop 2.5 and copy
> TupleWritable's setConf() function to TupleWritable.Comparator, then the
> shuffle works as expected. However, since Crunch currently targets Hadoop
> 2.2, this does not work for the current version of Crunch.
> As as a workaround, it appears that if the
> {{mapreduce.job.output.key.comparator.class}} property is set in the
> configuration, then the instance is created in
> JobConf.getOutputKeyComparator() using ReflectionUtils instead of using the
> WritableComparator registration. ReflectionUtils will pass the configuration
> to anything that implements Configurable, so setting
> {{mapreduce.job.output.key.comparator.class}} to TupleWritable.Comparator and
> implementing Configurable might work for Hadoop versions older than 2.5. I
> have yet to try this, though, and I have not looked into Hadoop1 to see if
> this would also work there.
> If the shuffle is able to register the type codes via either method above,
> then there is one small secondary issue that I hit:
> Writables.registerComparable checks if the type code is already present in
> the map; if the type code is already in use, then it throws an exception,
> even if the class being registered is the same as the existing class. With
> the type codes being initialized during the shuffle phase, any later call to
> registerComparable for the same type code and class will fail. I currently
> have my registerComparable call in a static initialization block for my
> PType, so it is called whenever my writable type is first used under Crunch;
> in this case, it happens when the reduce phase starts. Checking to see if the
> class being registered and the existing class are equal inside of
> registerComparable before throwing an error, similar to the one that is in
> Guava's AbstractBiMap, prevents this exception from being thrown.
> The above was happening using 0.11.0-hadoop2 on Hadoop 2.5.0 (CDH 5.2). The
> modifications I mention above were made on top of {{d4f23c4}} and also tested
> on CDH 5.2.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)