The last configuration parameter to "
OrderedPartitionedKVEdgeConfigurer.newBuilder(keyClassName,
valueClassName, myPartitionerClassName, jobConfForShuffleSort);" is the
configuration for the partitioner itself. That's only used in the Output -
and hence is not available in the consuming Input.

It looks like we're missing the option to set a Configuration for the
comparator. There's a couple of other changes required in the
EdgeConfigurers - I'll create a jira and post a patch later today.

One of the big reasons to separate out the Configurations is to limit the
size of the payload generated. Using a generic conf (which usually ends up
inheriting from JobConf etc) ends up setting a large number of keys (1000+
in cases), off which very few are actually used. setFromConfiguration(...)
actually strips out unused keys. The partitionerConf parameter is meant to
be a very specific Configuration only for the Partitioner (should only
contain the limited set of keys required to run the partitioner). Similarly
for the Comparator conf - once it is added. Tez has no way of knowing what
a valid set of keys for the partitioner, comparator and combiner are -
since these are all user specified classes.

Till I can get a patch going for this, your usage model to get this working
is likely the only one which will work.


On Tue, Aug 5, 2014 at 8:23 AM, Johannes Zillmann <[email protected]>
wrote:

> Hey guys,
>
> i just upgraded my application to the most current master code of Tez.
> Run into a problem with setting up my custom key comparator.
> It implements org.apache.hadoop.conf.Configurable and expects a custom
> property in the passed in configuration.
>
> So initially i tried:
>         JobConf jobConfForShuffleSort = new JobConf();
>         jobConfForShuffleSort.set(“myCustomProperty”,”value”)
>         Builder edgeConfBuilder =
> OrderedPartitionedKVEdgeConfigurer.newBuilder(keyClassName, valueClassName,
> myPartitionerClassName, jobConfForShuffleSort);
>
> But the property does not come through to the instance of
> ‘myPartitionerClassName’.
> Basically i see the comparator instantiated 2 times:
>
> (1) Here the custom property is available:
>  java.lang.Exception
>         at myPartitionerClassName.setConf(TezRecordComparator.java:42)
>         at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
>         at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
>         at
> org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateOutputKeyComparator(ConfigUtils.java:125)
>         at
> org.apache.tez.runtime.library.common.sort.impl.ExternalSorter.<init>(ExternalSorter.java:158)
>         at
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.<init>(DefaultSorter.java:116)
>         at
> org.apache.tez.runtime.library.output.OnFileSortedOutput.start(OnFileSortedOutput.java:109)
>         at
> SimpleVertexProcessor.initializeInputOutputs(SimpleVertexProcessor.java:190)
>
> (2) Here it is not:
>   java.lang.Exception
>         at myPartitionerClassName.setConf(TezRecordComparator.java:42)
>         at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
>         at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
>         at
> org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateInputKeyComparator(ConfigUtils.java:135)
>         at
> org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.finalMerge(MergeManager.java:808)
>         at
> org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.close(MergeManager.java:465)
>         at
> org.apache.tez.runtime.library.common.shuffle.impl.Shuffle$RunShuffleCallable.call(Shuffle.java:344)
>
>
> Found following workaround:
>         Configuration payloadConf =
> TezUtils.createConfFromUserPayload(edgeProperty.getEdgeDestination().getUserPayload());
>         payloadConf(“myCustomProperty”,”value”)
>
> edgeProperty.getEdgeDestination().setUserPayload(TezUtils.createUserPayloadFromConf(payloadConf));
>
> I think it boils down to that the property is passed to the edge input but
> not to its destination !?
> However, is there some smarter way making that property available to all
> instantiations of the comparator ?
> I tried using
>         edgeConfBuilder.setAdditionalConfiguration(...)
>         edgeConfBuilder.configureOutput().setAdditionalConfiguration(…)
> but that seems to filter out custom properties.
>
> Also do you plan to use a non-configuration based payload mechanism for
> the edge stuff like you did for the input, output, processor ?
>
> Any enlightenment appreciated!
> Johannes
>
>
>

Reply via email to