The Serializer interface itself does not extend Configured / Configurable. It's specific Serializers which do extends these - and since Tez does not have it's own serialization mechanism, I think we need to support this.
With OnFileSortedOutput i think this is covered by the comparatorConf This is something that will break in the future. At the moment, internally we just end up merging the configurations, but eventually - only the configuration specified for a component, will be made available to the component. I wouldn't rely on this behaviour for the OnFileSortedOutput case. On Thu, Aug 14, 2014 at 5:12 AM, Johannes Zillmann <[email protected] > wrote: > > On 14 Aug 2014, at 12:01, Siddharth Seth <[email protected]> wrote: > > > The raw serializer interfaces itself does not expose methods to help > with Configuration. Is the serializer that you are using making use of the > Configured / Configurable interface ? > Yep its implementing Configurable. > > > If we add this, that's yet another configuration - potentially 3 maps > being setup to configure the Input - and maybe more in the future; although > I don't think there's too many configurable pieces left which you haven't > run into. The single configuration case may just be the best bet in terms > of usability of the API - except it ends up opening up a way for giant > 1000+ key configurations to make it though, because a user wouldn't really > need to think about which specific keys are required. OTOH, that's possible > today as well with the conf accepted on the partitioner / combiner / > comparator. File another jira please. *sigh*. > Its https://issues.apache.org/jira/browse/TEZ-1423. > For me i would’t worry so much about the 1000+ keys since a) as you said > its still possible and b) changing to Map from Configuration always helped > a lot here since a conf object is usually full of everything. > Having 3 maps is probably easier for a user since he do not have to know > about what (partitioner/comparator/serializer) gets executed where > (input/output of the edge). Once you know this having a input and an output > configuration could be preferable. So… not sure ;) > > Johannes > > > > > > > On Thu, Aug 14, 2014 at 2:47 AM, Johannes Zillmann < > [email protected]> wrote: > > Hey Sid, > > > > started using OnFileUnorderedPartitionedKVOutput. Now i’m having the > same problem with my configured keySerializationClass. > > With OnFileSortedOutput i think this is covered by the comparatorConf > map i’m passing the custom properties for comparator and serialiser with. > > > > Any ideas ? > > Btw would it be an idea to have input and output custom properties > instead of partitionConf, comparatorConf and maybe serializerConf !? > > > > Johannes > > > > > > On 11 Aug 2014, at 11:55, Johannes Zillmann <[email protected]> > wrote: > > > > > Awesome, that works, Thanks Sid! > > > > > > Johannes > > > > > > On 06 Aug 2014, at 19:21, Siddharth Seth <[email protected]> wrote: > > > > > >> Johannes, > > >> You need to be using the confMap available on the setComparator API > to make it visible for the comparator. > > >> > > >> > > >> On Wed, Aug 6, 2014 at 5:54 AM, Johannes Zillmann < > [email protected]> wrote: > > >> Hey Sid, > > >> > > >> that was fast. Unluckily that doesn’t solve the problem. > > >> Passing in the custom property via partitionConfMap makes it > available at the edgeInput, but not at the edgeOutput. > > >> Job fails at: > > >> at myPartitionerClassName.setConf(TezRecordComparator.java:39) > > >> at > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) > > >> at > org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) > > >> at > org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateInputKeyComparator(ConfigUtils.java:135) > > >> at > org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.finalMerge(MergeManager.java:808) > > >> at > org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.close(MergeManager.java:465) > > >> at > org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.cleanupMerger(Shuffle.java:413) > > >> at > org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.cleanupIgnoreErrors(Shuffle.java:428) > > >> at > org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.access$1900(Shuffle.java:75) > > >> at > org.apache.tez.runtime.library.common.shuffle.impl.Shuffle$ShuffleRunnerFutureCallback.onFailure(Shuffle.java:474) > > >> at > com.google.common.util.concurrent.Futures$6.run(Futures.java:977) > > >> > > >> Johannes > > >> > > >> > > >> On 06 Aug 2014, at 09:08, Siddharth Seth <[email protected]> wrote: > > >> > > >>> TEZ-1379 went in. You should be able to use this properly now. > > >>> > > >>> > > >>> On Tue, Aug 5, 2014 at 11:27 PM, Johannes Zillmann < > [email protected]> wrote: > > >>> Hey Sid, > > >>> On 05 Aug 2014, at 21:05, Siddharth Seth <[email protected]> wrote: > > >>> > > >>>> The last configuration parameter to " > OrderedPartitionedKVEdgeConfigurer.newBuilder(keyClassName, valueClassName, > myPartitionerClassName, jobConfForShuffleSort);" is the configuration for > the partitioner itself. That's only used in the Output - and hence is not > available in the consuming Input. > > >>>> > > >>>> It looks like we're missing the option to set a Configuration for > the comparator. There's a couple of other changes required in the > EdgeConfigurers - I'll create a jira and post a patch later today. > > >>> Cool, thanks! > > >>> > > >>>> > > >>>> One of the big reasons to separate out the Configurations is to > limit the size of the payload generated. Using a generic conf (which > usually ends up inheriting from JobConf etc) ends up setting a large number > of keys (1000+ in cases), off which very few are actually used. > setFromConfiguration(...) actually strips out unused keys. The > partitionerConf parameter is meant to be a very specific Configuration only > for the Partitioner (should only contain the limited set of keys required > to run the partitioner). Similarly for the Comparator conf - once it is > added. Tez has no way of knowing what a valid set of keys for the > partitioner, comparator and combiner are - since these are all user > specified classes. > > >>> > > >>> ++++1 yeah, basically i like moving away from configuration! > > >>> Just this time it hit me a bit ;) > > >>> > > >>>> > > >>>> Till I can get a patch going for this, your usage model to get this > working is likely the only one which will work. > > >>> > > >>> Ok will do! > > >>> Johannes > > >>> > > >>>> > > >>>> > > >>>> On Tue, Aug 5, 2014 at 8:23 AM, Johannes Zillmann < > [email protected]> wrote: > > >>>> Hey guys, > > >>>> > > >>>> i just upgraded my application to the most current master code of > Tez. > > >>>> Run into a problem with setting up my custom key comparator. > > >>>> It implements org.apache.hadoop.conf.Configurable and expects a > custom property in the passed in configuration. > > >>>> > > >>>> So initially i tried: > > >>>> JobConf jobConfForShuffleSort = new JobConf(); > > >>>> jobConfForShuffleSort.set(“myCustomProperty”,”value”) > > >>>> Builder edgeConfBuilder = > OrderedPartitionedKVEdgeConfigurer.newBuilder(keyClassName, valueClassName, > myPartitionerClassName, jobConfForShuffleSort); > > >>>> > > >>>> But the property does not come through to the instance of > ‘myPartitionerClassName’. > > >>>> Basically i see the comparator instantiated 2 times: > > >>>> > > >>>> (1) Here the custom property is available: > > >>>> java.lang.Exception > > >>>> at > myPartitionerClassName.setConf(TezRecordComparator.java:42) > > >>>> at > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) > > >>>> at > org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) > > >>>> at > org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateOutputKeyComparator(ConfigUtils.java:125) > > >>>> at > org.apache.tez.runtime.library.common.sort.impl.ExternalSorter.<init>(ExternalSorter.java:158) > > >>>> at > org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.<init>(DefaultSorter.java:116) > > >>>> at > org.apache.tez.runtime.library.output.OnFileSortedOutput.start(OnFileSortedOutput.java:109) > > >>>> at > SimpleVertexProcessor.initializeInputOutputs(SimpleVertexProcessor.java:190) > > >>>> > > >>>> (2) Here it is not: > > >>>> java.lang.Exception > > >>>> at > myPartitionerClassName.setConf(TezRecordComparator.java:42) > > >>>> at > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) > > >>>> at > org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) > > >>>> at > org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateInputKeyComparator(ConfigUtils.java:135) > > >>>> at > org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.finalMerge(MergeManager.java:808) > > >>>> at > org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.close(MergeManager.java:465) > > >>>> at > org.apache.tez.runtime.library.common.shuffle.impl.Shuffle$RunShuffleCallable.call(Shuffle.java:344) > > >>>> > > >>>> > > >>>> Found following workaround: > > >>>> Configuration payloadConf = > TezUtils.createConfFromUserPayload(edgeProperty.getEdgeDestination().getUserPayload()); > > >>>> payloadConf(“myCustomProperty”,”value”) > > >>>> > > edgeProperty.getEdgeDestination().setUserPayload(TezUtils.createUserPayloadFromConf(payloadConf)); > > >>>> > > >>>> I think it boils down to that the property is passed to the edge > input but not to its destination !? > > >>>> However, is there some smarter way making that property available > to all instantiations of the comparator ? > > >>>> I tried using > > >>>> edgeConfBuilder.setAdditionalConfiguration(...) > > >>>> > edgeConfBuilder.configureOutput().setAdditionalConfiguration(…) > > >>>> but that seems to filter out custom properties. > > >>>> > > >>>> Also do you plan to use a non-configuration based payload mechanism > for the edge stuff like you did for the input, output, processor ? > > >>>> > > >>>> Any enlightenment appreciated! > > >>>> Johannes > > >>>> > > >>>> > > >>>> > > >>> > > >>> > > >> > > >> > > > > > > > > >
