Hey Sid, started using OnFileUnorderedPartitionedKVOutput. Now i’m having the same problem with my configured keySerializationClass. With OnFileSortedOutput i think this is covered by the comparatorConf map i’m passing the custom properties for comparator and serialiser with.
Any ideas ? Btw would it be an idea to have input and output custom properties instead of partitionConf, comparatorConf and maybe serializerConf !? Johannes On 11 Aug 2014, at 11:55, Johannes Zillmann <[email protected]> wrote: > Awesome, that works, Thanks Sid! > > Johannes > > On 06 Aug 2014, at 19:21, Siddharth Seth <[email protected]> wrote: > >> Johannes, >> You need to be using the confMap available on the setComparator API to make >> it visible for the comparator. >> >> >> On Wed, Aug 6, 2014 at 5:54 AM, Johannes Zillmann <[email protected]> >> wrote: >> Hey Sid, >> >> that was fast. Unluckily that doesn’t solve the problem. >> Passing in the custom property via partitionConfMap makes it available at >> the edgeInput, but not at the edgeOutput. >> Job fails at: >> at myPartitionerClassName.setConf(TezRecordComparator.java:39) >> at >> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) >> at >> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) >> at >> org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateInputKeyComparator(ConfigUtils.java:135) >> at >> org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.finalMerge(MergeManager.java:808) >> at >> org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.close(MergeManager.java:465) >> at >> org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.cleanupMerger(Shuffle.java:413) >> at >> org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.cleanupIgnoreErrors(Shuffle.java:428) >> at >> org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.access$1900(Shuffle.java:75) >> at >> org.apache.tez.runtime.library.common.shuffle.impl.Shuffle$ShuffleRunnerFutureCallback.onFailure(Shuffle.java:474) >> at com.google.common.util.concurrent.Futures$6.run(Futures.java:977) >> >> Johannes >> >> >> On 06 Aug 2014, at 09:08, Siddharth Seth <[email protected]> wrote: >> >>> TEZ-1379 went in. You should be able to use this properly now. >>> >>> >>> On Tue, Aug 5, 2014 at 11:27 PM, Johannes Zillmann >>> <[email protected]> wrote: >>> Hey Sid, >>> On 05 Aug 2014, at 21:05, Siddharth Seth <[email protected]> wrote: >>> >>>> The last configuration parameter to " >>>> OrderedPartitionedKVEdgeConfigurer.newBuilder(keyClassName, >>>> valueClassName, myPartitionerClassName, jobConfForShuffleSort);" is the >>>> configuration for the partitioner itself. That's only used in the Output - >>>> and hence is not available in the consuming Input. >>>> >>>> It looks like we're missing the option to set a Configuration for the >>>> comparator. There's a couple of other changes required in the >>>> EdgeConfigurers - I'll create a jira and post a patch later today. >>> Cool, thanks! >>> >>>> >>>> One of the big reasons to separate out the Configurations is to limit the >>>> size of the payload generated. Using a generic conf (which usually ends up >>>> inheriting from JobConf etc) ends up setting a large number of keys (1000+ >>>> in cases), off which very few are actually used. setFromConfiguration(...) >>>> actually strips out unused keys. The partitionerConf parameter is meant to >>>> be a very specific Configuration only for the Partitioner (should only >>>> contain the limited set of keys required to run the partitioner). >>>> Similarly for the Comparator conf - once it is added. Tez has no way of >>>> knowing what a valid set of keys for the partitioner, comparator and >>>> combiner are - since these are all user specified classes. >>> >>> ++++1 yeah, basically i like moving away from configuration! >>> Just this time it hit me a bit ;) >>> >>>> >>>> Till I can get a patch going for this, your usage model to get this >>>> working is likely the only one which will work. >>> >>> Ok will do! >>> Johannes >>> >>>> >>>> >>>> On Tue, Aug 5, 2014 at 8:23 AM, Johannes Zillmann >>>> <[email protected]> wrote: >>>> Hey guys, >>>> >>>> i just upgraded my application to the most current master code of Tez. >>>> Run into a problem with setting up my custom key comparator. >>>> It implements org.apache.hadoop.conf.Configurable and expects a custom >>>> property in the passed in configuration. >>>> >>>> So initially i tried: >>>> JobConf jobConfForShuffleSort = new JobConf(); >>>> jobConfForShuffleSort.set(“myCustomProperty”,”value”) >>>> Builder edgeConfBuilder = >>>> OrderedPartitionedKVEdgeConfigurer.newBuilder(keyClassName, >>>> valueClassName, myPartitionerClassName, jobConfForShuffleSort); >>>> >>>> But the property does not come through to the instance of >>>> ‘myPartitionerClassName’. >>>> Basically i see the comparator instantiated 2 times: >>>> >>>> (1) Here the custom property is available: >>>> java.lang.Exception >>>> at myPartitionerClassName.setConf(TezRecordComparator.java:42) >>>> at >>>> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) >>>> at >>>> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) >>>> at >>>> org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateOutputKeyComparator(ConfigUtils.java:125) >>>> at >>>> org.apache.tez.runtime.library.common.sort.impl.ExternalSorter.<init>(ExternalSorter.java:158) >>>> at >>>> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.<init>(DefaultSorter.java:116) >>>> at >>>> org.apache.tez.runtime.library.output.OnFileSortedOutput.start(OnFileSortedOutput.java:109) >>>> at >>>> SimpleVertexProcessor.initializeInputOutputs(SimpleVertexProcessor.java:190) >>>> >>>> (2) Here it is not: >>>> java.lang.Exception >>>> at myPartitionerClassName.setConf(TezRecordComparator.java:42) >>>> at >>>> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) >>>> at >>>> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) >>>> at >>>> org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateInputKeyComparator(ConfigUtils.java:135) >>>> at >>>> org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.finalMerge(MergeManager.java:808) >>>> at >>>> org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.close(MergeManager.java:465) >>>> at >>>> org.apache.tez.runtime.library.common.shuffle.impl.Shuffle$RunShuffleCallable.call(Shuffle.java:344) >>>> >>>> >>>> Found following workaround: >>>> Configuration payloadConf = >>>> TezUtils.createConfFromUserPayload(edgeProperty.getEdgeDestination().getUserPayload()); >>>> payloadConf(“myCustomProperty”,”value”) >>>> >>>> edgeProperty.getEdgeDestination().setUserPayload(TezUtils.createUserPayloadFromConf(payloadConf)); >>>> >>>> I think it boils down to that the property is passed to the edge input but >>>> not to its destination !? >>>> However, is there some smarter way making that property available to all >>>> instantiations of the comparator ? >>>> I tried using >>>> edgeConfBuilder.setAdditionalConfiguration(...) >>>> edgeConfBuilder.configureOutput().setAdditionalConfiguration(…) >>>> but that seems to filter out custom properties. >>>> >>>> Also do you plan to use a non-configuration based payload mechanism for >>>> the edge stuff like you did for the input, output, processor ? >>>> >>>> Any enlightenment appreciated! >>>> Johannes >>>> >>>> >>>> >>> >>> >> >> >
