[ 
https://issues.apache.org/jira/browse/CRUNCH-486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14270086#comment-14270086
 ] 

Brandon Vargo commented on CRUNCH-486:
--------------------------------------

This is about what I had for testing, except that I wasn't checking for the 
writable type family. This fixes the issue for joins, but I think there will 
still be an issue for grouping by a key that is a TupleWritable containing a 
custom writable outside of a join in Hadoop versions before 2.5.0, since the 
comaparator is not being set in that case. It does fix everything under 2.5.0, 
though, since the comparator will get configured now that it implements 
Configurable.

I don't know the Crunch codebase well enough though to know if there's a place 
in the MapReduce implementation that allows the configuration property to be 
set by all groupBy operations that use TupleWritable as a key. That might be a 
better place to set it, if it exists. Perhaps WritableGroupedTableType's 
configureShuffle method, if the key is an instance of TupleWritable? I don't 
know how to limit it to just the MapReduce pipeline there, though, but perhaps 
it doesn't matter if a MapReduce property is set in the configuration when 
running under a non-MapReduce pipeline. The only reason I was using the 
DefaultJoinStrategy is because that was the easiest place I found to inject the 
parameter for only one job in the pipeline for testing. Also, it looks like 
there are two other comparators for TupleWritables that have different logic: 
one under lib.sort and another in lib.join.JoinUtils, which appear to be used 
for secondary sorts. I don't know if setting this property more broadly would 
break those classes.

Also, the constant "mapreduce.job.output.key.comparator.class" is available as 
{{MRJobConfig.KEY_COMPARATOR}}. Under Hadoop1, it looks like the property name 
was "mapred.output.key.comparator.class" instead. 
{optionsBuilder.conf(MRJobConfig.KEY_COMPARATOR, 
TupleWritable.Comparator.class.getName());} compiles under Hadoop1, but that is 
as much testing as I did.

I tried to write a test for the patch, but I couldn't find a good way to do it; 
all of the tests appear to run in the same JVM, so the global writable codes 
map would be affected by the other phases running in the same JVM, not to 
mention the other tests. The test would be testing itself more than the patch.

So your patch fixes the join issue that I am seeing and looks good to me, 
unless you know of a better place to insert the configuration option so that 
all groupBy operations work on pre-2.5.0 Hadoop2.

Thanks!

> Join with custom Writable PType registered using Writables.registerComparable 
> NPEs during shuffle
> -------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-486
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-486
>             Project: Crunch
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 0.11.0
>            Reporter: Brandon Vargo
>            Assignee: Josh Wills
>            Priority: Minor
>         Attachments: CRUNCH-486.patch
>
>
> When joining two PTables on a key that is a custom writable PType, the 
> shuffler will fail with the following NullPointerException under Hadoop2 if 
> the custom type has been registered using Writables.registerComparable. This 
> happens regardless of whether a specific integer code is provided or the 
> default hashCode()-based value is used.
> {noformat}
> org.apache.hadoop.mapred.YarnChild: Exception running child : 
> org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: Error while 
> doing final merge 
>       at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:160)
>       at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
>       at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:415)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
>       at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
> Caused by: java.lang.NullPointerException
>       at java.lang.Class.isAssignableFrom(Native Method)
>       at 
> org.apache.crunch.types.writable.TupleWritable$Comparator.compareField(TupleWritable.java:317)
>       at 
> org.apache.crunch.types.writable.TupleWritable$Comparator.compare(TupleWritable.java:284)
>       at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:578)
>       at org.apache.hadoop.util.PriorityQueue.upHeap(PriorityQueue.java:128)
>       at org.apache.hadoop.util.PriorityQueue.put(PriorityQueue.java:55)
>       at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:669)
>       at org.apache.hadoop.mapred.Merger.merge(Merger.java:193)
>       at 
> org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.finalMerge(MergeManagerImpl.java:804)
>       at 
> org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.close(MergeManagerImpl.java:369)
>       at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:158)
>       ... 6 more
> {noformat}
> It appears that the Writables.WRITABLE_CODES entries are not deserialized 
> from the configuration during the shuffle phase of a join until 
> TupleWritable.setConf() is called. However, because TupleWritable.Comparator 
> is registered as a raw comparator for TupleWritable, the shuffler uses the 
> comparator without instantiating or configuring a TupleWritable instance. As 
> a result, the type codes for the custom types are not available when the 
> comparator starts to run.
> HADOOP-10686 made WritableComparator implement Configurable, but this was not 
> released until Hadoop 2.5. If I build Crunch against Hadoop 2.5 and copy 
> TupleWritable's setConf() function to TupleWritable.Comparator, then the 
> shuffle works as expected. However, since Crunch currently targets Hadoop 
> 2.2, this does not work for the current version of Crunch.
> As as a workaround, it appears that if the 
> {{mapreduce.job.output.key.comparator.class}} property is set in the 
> configuration, then the instance is created in 
> JobConf.getOutputKeyComparator() using ReflectionUtils instead of using the 
> WritableComparator registration. ReflectionUtils will pass the configuration 
> to anything that implements Configurable, so setting 
> {{mapreduce.job.output.key.comparator.class}} to TupleWritable.Comparator and 
> implementing Configurable might work for Hadoop versions older than 2.5. I 
> have yet to try this, though, and I have not looked into Hadoop1 to see if 
> this would also work there.
> If the shuffle is able to register the type codes via either method above, 
> then there is one small secondary issue that I hit: 
> Writables.registerComparable checks if the type code is already present in 
> the map; if the type code is already in use, then it throws an exception, 
> even if the class being registered is the same as the existing class. With 
> the type codes being initialized during the shuffle phase, any later call to 
> registerComparable for the same type code and class will fail. I currently 
> have my registerComparable call in a static initialization block for my 
> PType, so it is called whenever my writable type is first used under Crunch; 
> in this case, it happens when the reduce phase starts. Checking to see if the 
> class being registered and the existing class are equal inside of 
> registerComparable before throwing an error, similar to the one that is in 
> Guava's AbstractBiMap, prevents this exception from being thrown.
> The above was happening using 0.11.0-hadoop2 on Hadoop 2.5.0 (CDH 5.2). The 
> modifications I mention above were made on top of {{d4f23c4}} and also tested 
> on CDH 5.2.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to