Thanks for the comment Matthias. After all the discussion (thanks to all participants), I think this (single method that passes in a RecordContext object) is the best alternative. Just a side note: I think KAFKA-3907 [1] can also be integrated into the KIP by adding related method inside RecordContext interface.
[1] https://issues.apache.org/jira/browse/KAFKA-3907 Cheers, Jeyhun On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax <matth...@confluent.io> wrote: > Hi, > > I would like to push this discussion further. It seems we got nice > alternatives (thanks for the summary Jeyhun!). > > With respect to RichFunctions and allowing them to be stateful, I have > my doubt as expressed already. From my understanding, the idea was to > give access to record metadata information only. If you want to do a > stateful computation you should rather use #transform(). > > Furthermore, as pointed out, we would need to switch to a > supplier-pattern introducing many more overloads. > > For those reason, I advocate for a simple interface with a single method > that passes in a RecordContext object. > > > -Matthias > > > On 6/6/17 5:15 PM, Guozhang Wang wrote: > > Thanks for the comprehensive summary! > > > > Personally I'd prefer the option of passing RecordContext as an > additional > > parameter into he overloaded function. But I'm also open to other > arguments > > if there are sth. that I have overlooked. > > > > Guozhang > > > > > > On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov <je.kari...@gmail.com> > wrote: > > > >> Hi, > >> > >> Thanks for your comments Matthias and Guozhang. > >> > >> Below I mention the quick summary of the main alternatives we looked at > to > >> introduce the Rich functions (I will refer to it as Rich functions > until we > >> find better/another name). Initially the proposed alternatives was not > >> backwards-compatible, so I will not mention them. > >> The related discussions are spread in KIP-149 and in this KIP (KIP-159) > >> discussion threads. > >> > >> > >> > >> 1. The idea of rich functions came into the stage with KIP-149, in > >> discussion thread. As a result we extended KIP-149 to support Rich > >> functions as well. > >> > >> 2. To as part of the Rich functions, we provided init > (ProcessorContext) > >> method. Afterwards, Dammian suggested that we should not provide > >> ProcessorContext to users. As a result, we separated the two problems > into > >> two separate KIPs, as it seems they can be solved in parallel. > >> > >> - One approach we considered was : > >> > >> public interface ValueMapperWithKey<K, V, VR> { > >> VR apply(final K key, final V value); > >> } > >> > >> public interface RichValueMapper<K, V, VR> extends RichFunction{ > >> } > >> > >> public interface RichFunction { > >> void init(RecordContext recordContext); > >> void close(); > >> } > >> > >> public interface RecordContext { > >> String applicationId(); > >> TaskId taskId(); > >> StreamsMetrics metrics(); > >> String topic(); > >> int partition(); > >> long offset(); > >> long timestamp(); > >> Map<String, Object> appConfigs(); > >> Map<String, Object> appConfigsWithPrefix(String prefix); > >> } > >> > >> > >> public interface ProcessorContext extends RecordContext { > >> // all methods but the ones in RecordContext > >> } > >> > >> As a result: > >> * . All "withKey" and "withoutKey" interfaces can be converted to their > >> Rich counterparts (with empty init() and close() methods) > >> *. All related Processors will accept Rich interfaces in their > >> constructors. > >> *. So, we convert the related "withKey" or "withoutKey" interfaces to > Rich > >> interface while building the topology and initialize the related > processors > >> with Rich interfaces only. > >> *. We will not need to overloaded methods for rich functions as Rich > >> interfaces extend withKey interfaces. We will just check the object type > >> and act accordingly. > >> > >> > >> > >> > >> 3. There was some thoughts that the above approach does not support > lambdas > >> so we should support only one method, only init(RecordContext), as part > of > >> Rich interfaces. > >> This is still in discussion. Personally I think Rich interfaces are by > >> definition lambda-free and we should not care much about it. > >> > >> > >> 4. Thanks to Matthias's discussion, an alternative we considered was to > >> pass in the RecordContext as method parameter. This might even allow to > >> use Lambdas and we could keep the name RichFunction as we preserve the > >> nature of being a function. > >> "If you go with `init()` and `close()` we basically > >> allow users to have an in-memory state for a function. Thus, we cannot > >> share a single instance of RichValueMapper (etc) over multiple tasks and > >> we would need a supplier pattern similar to #transform(). And this would > >> "break the flow" of the API, as (Rich)ValueMapperSupplier would not > >> inherit from ValueMapper and thus we would need many new overload for > >> KStream/KTable classes". (Copy paste from Matthias's email) > >> > >> > >> Cheers, > >> Jeyhun > >> > >> > >> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax <matth...@confluent.io> > >> wrote: > >> > >>> Yes, we did consider this, and there is no consensus yet what the best > >>> alternative is. > >>> > >>> @Jeyhun: the email thread got pretty long. Maybe you can give a quick > >>> summary of the current state of the discussion? > >>> > >>> > >>> -Matthias > >>> > >>> On 6/4/17 6:04 PM, Guozhang Wang wrote: > >>>> Thanks for the explanation Jeyhun and Matthias. > >>>> > >>>> I have just read through both KIP-149 and KIP-159 and am wondering if > >> you > >>>> guys have considered a slight different approach for rich function, > >> that > >>> is > >>>> to add the `RecordContext` into the apply functions as an additional > >>>> parameter. For example: > >>>> > >>>> --------------------------- > >>>> > >>>> interface RichValueMapper<V, VR> { > >>>> > >>>> VR apply(final V value, final RecordContext context); > >>>> > >>>> } > >>>> > >>>> ... > >>>> > >>>> // then in KStreams > >>>> > >>>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> > >>> mapper); > >>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper <? super V, ? > >>>> extends VR> mapper); > >>>> > >>>> ------------------------------- > >>>> > >>>> The caveat is that it will introduces more overloads; but I think the > >>>> #.overloads are mainly introduced by 1) serde overrides and 2) > >>>> state-store-supplier overides, both of which can be reduced in the > near > >>>> future, and I felt this overloading is still worthwhile, as it has the > >>>> following benefits: > >>>> > >>>> 1) still allow lambda expressions. > >>>> 2) clearer code path (do not need to "convert" from non-rich functions > >> to > >>>> rich functions) > >>>> > >>>> > >>>> Maybe this approach has already been discussed and I may have > >> overlooked > >>> in > >>>> the email thread; anyways, lmk. > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> > >>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax < > >> matth...@confluent.io> > >>>> wrote: > >>>> > >>>>> I agree with Jeyhun. As already mention, the overall API improvement > >>>>> ideas are overlapping and/or contradicting each other. For this > >> reason, > >>>>> not all ideas can be accomplished and some Jira might just be closed > >> as > >>>>> "won't fix". > >>>>> > >>>>> For this reason, we try to do those KIP discussion with are large > >> scope > >>>>> to get an overall picture to converge to an overall consisted API. > >>>>> > >>>>> > >>>>> @Jeyhun: about the overloads. Yes, we might get more overload. It > >> might > >>>>> be sufficient though, to do a single xxxWithContext() overload that > >> will > >>>>> provide key+value+context. Otherwise, if might get too messy having > >>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext, > >>>>> ValueMapperWithKeyWithContext. > >>>>> > >>>>> On the other hand, we also have the "builder pattern" idea as an API > >>>>> change and this might mitigate the overload problem. Not for simple > >>>>> function like map/flatMap etc but for joins and aggregations. > >>>>> > >>>>> > >>>>> On the other hand, as I mentioned in an older email, I am personally > >>>>> fine to break the pure functional interface, and add > >>>>> > >>>>> - interface WithRecordContext with method `open(RecordContext)` (or > >>>>> `init(...)`, or any better name) -- but not `close()`) > >>>>> > >>>>> - interface ValueMapperWithRecordContext extends ValueMapper, > >>>>> WithRecordContext > >>>>> > >>>>> This would allow us to avoid any overload. Of course, we don't get a > >>>>> "pure function" interface and also sacrifices Lambdas. > >>>>> > >>>>> > >>>>> > >>>>> I am personally a little bit undecided what the better option might > >> be. > >>>>> Curious to hear what other think about this trade off. > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote: > >>>>>> Hi Guozhang, > >>>>>> > >>>>>> It subsumes partially. Initially the idea was to support > >> RichFunctions > >>>>> as a > >>>>>> separate interface. Throughout the discussion, however, we > considered > >>>>> maybe > >>>>>> overloading the related methods (with RecodContext param) is better > >>>>>> approach than providing a separate RichFunction interface. > >>>>>> > >>>>>> Cheers, > >>>>>> Jeyhun > >>>>>> > >>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang <wangg...@gmail.com> > >>> wrote: > >>>>>> > >>>>>>> Does this KIP subsume this ticket as well? > >>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125 > >>>>>>> > >>>>>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov < > >> je.kari...@gmail.com > >>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Dear community, > >>>>>>>> > >>>>>>>> As we discussed in KIP-149 [DISCUSS] thread [1], I would like to > >>>>> initiate > >>>>>>>> KIP for rich functions (interfaces) [2]. > >>>>>>>> I would like to get your comments. > >>>>>>>> > >>>>>>>> > >>>>>>>> [1] > >>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj= > >>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+ > >>>>> ValueTransformer+ValueMapper+ > >>>>>>>> and+ValueJoiner > >>>>>>>> [2] > >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams > >>>>>>>> > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Jeyhun > >>>>>>>> -- > >>>>>>>> -Cheers > >>>>>>>> > >>>>>>>> Jeyhun > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> -- Guozhang > >>>>>>> > >>>>> > >>>>> > >>>> > >>>> > >>> > >>> -- > >> -Cheers > >> > >> Jeyhun > >> > > > > > > > > -- -Cheers Jeyhun