Hey guys,
i just upgraded my application to the most current master code of Tez.
Run into a problem with setting up my custom key comparator.
It implements org.apache.hadoop.conf.Configurable and expects a custom property
in the passed in configuration.
So initially i tried:
JobConf jobConfForShuffleSort = new JobConf();
jobConfForShuffleSort.set(“myCustomProperty”,”value”)
Builder edgeConfBuilder =
OrderedPartitionedKVEdgeConfigurer.newBuilder(keyClassName, valueClassName,
myPartitionerClassName, jobConfForShuffleSort);
But the property does not come through to the instance of
‘myPartitionerClassName’.
Basically i see the comparator instantiated 2 times:
(1) Here the custom property is available:
java.lang.Exception
at myPartitionerClassName.setConf(TezRecordComparator.java:42)
at
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at
org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateOutputKeyComparator(ConfigUtils.java:125)
at
org.apache.tez.runtime.library.common.sort.impl.ExternalSorter.<init>(ExternalSorter.java:158)
at
org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.<init>(DefaultSorter.java:116)
at
org.apache.tez.runtime.library.output.OnFileSortedOutput.start(OnFileSortedOutput.java:109)
at
SimpleVertexProcessor.initializeInputOutputs(SimpleVertexProcessor.java:190)
(2) Here it is not:
java.lang.Exception
at myPartitionerClassName.setConf(TezRecordComparator.java:42)
at
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at
org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateInputKeyComparator(ConfigUtils.java:135)
at
org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.finalMerge(MergeManager.java:808)
at
org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.close(MergeManager.java:465)
at
org.apache.tez.runtime.library.common.shuffle.impl.Shuffle$RunShuffleCallable.call(Shuffle.java:344)
Found following workaround:
Configuration payloadConf =
TezUtils.createConfFromUserPayload(edgeProperty.getEdgeDestination().getUserPayload());
payloadConf(“myCustomProperty”,”value”)
edgeProperty.getEdgeDestination().setUserPayload(TezUtils.createUserPayloadFromConf(payloadConf));
I think it boils down to that the property is passed to the edge input but not
to its destination !?
However, is there some smarter way making that property available to all
instantiations of the comparator ?
I tried using
edgeConfBuilder.setAdditionalConfiguration(...)
edgeConfBuilder.configureOutput().setAdditionalConfiguration(…)
but that seems to filter out custom properties.
Also do you plan to use a non-configuration based payload mechanism for the
edge stuff like you did for the input, output, processor ?
Any enlightenment appreciated!
Johannes