Thanks for the KIP. I'm not convinced on the `RichFunction` approach. Do we really want to give every DSL method access to the `ProcessorContext` ? It has a bunch of methods on it that seem in-appropriate for some of the DSL methods, i.e, `register`, `getStateStore`, `forward`, `schedule` etc. It is far too broad. I think it would be better to have a narrower interface like the `RecordContext` - remembering it is easier to add methods/interfaces later than to remove them
On Sat, 13 May 2017 at 22:26 Matthias J. Sax <matth...@confluent.io> wrote: > Jeyhun, > > I am not an expert on Lambdas. Can you elaborate a little bit? I cannot > follow the explanation in the KIP to see what the problem is. > > For updating the KIP title I don't know -- guess it's up to you. Maybe a > committer can comment on this? > > > Further comments: > > - The KIP get a little hard to read -- can you maybe reformat the wiki > page a little bit? I think using `CodeBlock` would help. > > - What about KStream-KTable joins? You don't have overlaods added for > them. Why? (Even if I still hope that we don't need to add any new > overloads) > > - Why do we need `AbstractRichFunction`? > > - What about interfaces Initializer, ForeachAction, Merger, Predicate, > Reducer? I don't want to say we should/need to add to all, but we should > discuss all of them and add where it does make sense (e.g., > RichForachAction does make sense IMHO) > > > Btw: I like the hierarchy `ValueXX` -- `ValueXXWithKey` -- `RichValueXX` > in general -- but why can't we do all this with interfaces only? > > > > -Matthias > > > > On 5/11/17 7:00 AM, Jeyhun Karimov wrote: > > Hi, > > > > Thanks for your comments. I think we cannot extend the two interfaces if > we > > want to keep lambdas. I updated the KIP [1]. Maybe I should change the > > title, because now we are not limiting the KIP to only ValueMapper, > > ValueTransformer and ValueJoiner. > > Please feel free to comment. > > > > [1] > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner > > > > > > Cheers, > > Jeyhun > > > > On Tue, May 9, 2017 at 7:36 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> If `ValueMapperWithKey` extends `ValueMapper` we don't need the new > >> overlaod. > >> > >> And yes, we need to do one check -- but this happens when building the > >> topology. At runtime (I mean after KafkaStream#start() we don't need any > >> check as we will always use `ValueMapperWithKey`) > >> > >> > >> -Matthias > >> > >> > >> On 5/9/17 2:55 AM, Jeyhun Karimov wrote: > >>> Hi, > >>> > >>> Thanks for feedback. > >>> Then we need to overload method > >>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> > >>> mapper); > >>> with > >>> <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends > >> VR> > >>> mapper); > >>> > >>> and in runtime (inside processor) we still have to check it is > >> ValueMapper > >>> or ValueMapperWithKey before wrapping it into the rich function. > >>> > >>> > >>> Please correct me if I am wrong. > >>> > >>> Cheers, > >>> Jeyhun > >>> > >>> > >>> > >>> > >>> On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki < > >>> michal.borowie...@openbet.com> wrote: > >>> > >>>> +1 :) > >>>> > >>>> > >>>> On 08/05/17 23:52, Matthias J. Sax wrote: > >>>>> Hi, > >>>>> > >>>>> I was reading the updated KIP and I am wondering, if we should do the > >>>>> design a little different. > >>>>> > >>>>> Instead of distinguishing between a RichFunction and non-RichFunction > >> at > >>>>> runtime level, we would use RichFunctions all the time. Thus, on the > >> DSL > >>>>> entry level, if a user provides a non-RichFunction, we wrap it by a > >>>>> RichFunction that is fully implemented by Streams. This RichFunction > >>>>> would just forward the call omitting the key: > >>>>> > >>>>> Just to sketch the idea (incomplete code snippet): > >>>>> > >>>>>> public StreamsRichValueMapper implements RichValueMapper() { > >>>>>> private ValueMapper userProvidedMapper; // set by constructor > >>>>>> > >>>>>> public VR apply(final K key, final V1 value1, final V2 value2) { > >>>>>> return userProvidedMapper(value1, value2); > >>>>>> } > >>>>>> } > >>>>> > >>>>> From a performance point of view, I am not sure if the > >>>>> "if(isRichFunction)" including casts etc would have more overhead > than > >>>>> this approach (we would do more nested method call for > non-RichFunction > >>>>> which should be more common than RichFunctions). > >>>>> > >>>>> This approach should not effect lambdas (or do I miss something?) and > >>>>> might be cleaner, as we could have one more top level interface > >>>>> `RichFunction` with methods `init()` and `close()` and also > interfaces > >>>>> for `RichValueMapper` etc. (thus, no abstract classes required). > >>>>> > >>>>> > >>>>> Any thoughts? > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> On 5/6/17 5:29 PM, Jeyhun Karimov wrote: > >>>>>> Hi, > >>>>>> > >>>>>> Thanks for comments. I extended PR and KIP to include rich > functions. > >> I > >>>>>> will still have to evaluate the cost of deep copying of keys. > >>>>>> > >>>>>> Cheers, > >>>>>> Jeyhun > >>>>>> > >>>>>> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak < > >>>> mathieu.fenn...@replicon.com> > >>>>>> wrote: > >>>>>> > >>>>>>> Hey Matthias, > >>>>>>> > >>>>>>> My opinion would be that documenting the immutability of the key is > >> the > >>>>>>> best approach available. Better than requiring the key to be > >>>> serializable > >>>>>>> (as with Jeyhun's last pass at the PR), no performance risk. > >>>>>>> > >>>>>>> It'd be different if Java had immutable type constraints of some > >> kind. > >>>> :-) > >>>>>>> > >>>>>>> Mathieu > >>>>>>> > >>>>>>> > >>>>>>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax < > >>>> matth...@confluent.io> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Agreed about RichFunction. If we follow this path, it should cover > >>>>>>>> all(?) DSL interfaces. > >>>>>>>> > >>>>>>>> About guarding the key -- I am still not sure what to do about > it... > >>>>>>>> Maybe it might be enough to document it (and name the key > parameter > >>>> like > >>>>>>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer > to > >>>>>>>> guard against any modification, but I have no good idea how to do > >>>> this. > >>>>>>>> Not sure what others think about the risk of corrupted > partitioning > >>>>>>>> (what would be a user error and we could say, well, bad luck, you > >> got > >>>> a > >>>>>>>> bug in your code, that's not our fault), vs deep copy with a > >> potential > >>>>>>>> performance hit (that we can't quantity atm without any > performance > >>>>>>> test). > >>>>>>>> We do have a performance system test. Maybe it's worth for you to > >>>> apply > >>>>>>>> the deep copy strategy and run the test. It's very basic > performance > >>>>>>>> test only, but might give some insight. If you want to do this, > look > >>>>>>>> into folder "tests" for general test setup, and into > >>>>>>>> "tests/kafaktests/benchmarks/streams" to find find the perf test. > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote: > >>>>>>>>> Hi Matthias, > >>>>>>>>> > >>>>>>>>> I think extending KIP to include RichFunctions totally makes > >> sense. > >>>>>>> So, > >>>>>>>>> we don't want to guard the keys because it is costly. > >>>>>>>>> If we introduce RichFunctions I think it should not be limited > only > >>>>>>> the 3 > >>>>>>>>> interfaces proposed in KIP but to wide range of interfaces. > >>>>>>>>> Please correct me if I am wrong. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Jeyhun > >>>>>>>>> > >>>>>>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax < > >>>> matth...@confluent.io > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> One follow up. There was this email on the user list: > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj= > >>>>>>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+ > >>>>>>>>>> It might make sense so include Initializer, Adder, and > Substractor > >>>>>>>>>> inferface, too. > >>>>>>>>>> > >>>>>>>>>> And we should double check if there are other interface we might > >>>> miss > >>>>>>>> atm. > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote: > >>>>>>>>>>> Thanks for updating the KIP. > >>>>>>>>>>> > >>>>>>>>>>> Deep copying the key will work for sure, but I am actually a > >> little > >>>>>>> bit > >>>>>>>>>>> worried about performance impact... We might want to do some > test > >>>> to > >>>>>>>>>>> quantify this impact. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Btw: this remind me about the idea of `RichFunction` interface > >> that > >>>>>>>>>>> would allow users to access record metadata (like timestamp, > >>>> offset, > >>>>>>>>>>> partition etc) within DSL. This would be a similar concept. > >> Thus, I > >>>>>>> am > >>>>>>>>>>> wondering, if it would make sense to enlarge the scope of this > >> KIP > >>>> by > >>>>>>>>>>> that? WDYT? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote: > >>>>>>>>>>>> Hi Mathieu, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for feedback. I followed similar approach and updated > PR > >>>> and > >>>>>>>> KIP > >>>>>>>>>>>> accordingly. I tried to guard the key in Processors sending a > >> copy > >>>>>>> of > >>>>>>>> an > >>>>>>>>>>>> actual key. > >>>>>>>>>>>> Because I am doing deep copy of an object, I think memory can > be > >>>>>>>>>> bottleneck > >>>>>>>>>>>> in some use-cases. > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> Jeyhun > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak < > >>>>>>>>>> mathieu.fenn...@replicon.com> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Jeyhun, > >>>>>>>>>>>>> > >>>>>>>>>>>>> This approach would change ValueMapper (...etc) to be > classes, > >>>>>>> rather > >>>>>>>>>> than > >>>>>>>>>>>>> interfaces, which is also a backwards incompatible change. > An > >>>>>>>>>> alternative > >>>>>>>>>>>>> approach that would be backwards compatible would be to > define > >>>> new > >>>>>>>>>>>>> interfaces, and provide overrides where those interfaces are > >>>> used. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Unfortunately, making the key parameter as "final" doesn't > >> change > >>>>>>>> much > >>>>>>>>>>>>> about guarding against key change. It only prevents the > >>>> parameter > >>>>>>>>>> variable > >>>>>>>>>>>>> from being reassigned. If the key type is a mutable object > >> (eg. > >>>>>>>>>> byte[]), > >>>>>>>>>>>>> it can still be mutated. (eg. key[0] = 0). But I'm not > really > >>>> sure > >>>>>>>>>> there's > >>>>>>>>>>>>> much that can be done about that. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Mathieu > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov < > >>>>>>> je.kari...@gmail.com > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for comments. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The concerns makes sense. Although we can guard for > immutable > >>>> keys > >>>>>>>> in > >>>>>>>>>>>>>> current implementation (with few changes), I didn't consider > >>>>>>>> backward > >>>>>>>>>>>>>> compatibility. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> In this case 2 solutions come to my mind. In both cases, > user > >>>>>>>> accesses > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> key in Object type, as passing extra type parameter will > break > >>>>>>>>>>>>>> backwards-compatibility. So user has to cast to actual key > >>>> type. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument > (key > >>>> and > >>>>>>>>>> value) > >>>>>>>>>>>>>> and force key to be *final*. By doing this, I think we can > >>>>>>> address > >>>>>>>>>> both > >>>>>>>>>>>>>> backward-compatibility and guarding against key change. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. Secondly, we can create class KeyAccess like: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> public class KeyAccess { > >>>>>>>>>>>>>> Object key; > >>>>>>>>>>>>>> public void beforeApply(final Object key) { > >>>>>>>>>>>>>> this.key = key; > >>>>>>>>>>>>>> } > >>>>>>>>>>>>>> public Object getKey() { > >>>>>>>>>>>>>> return key; > >>>>>>>>>>>>>> } > >>>>>>>>>>>>>> } > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and > >> *ValueTransformer* > >>>>>>> from > >>>>>>>>>>>>>> *KeyAccess*. Inside processor (for example > >>>>>>>> *KTableMapValuesProcessor*) > >>>>>>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by > >>>>>>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use > >> *getKey()* > >>>> to > >>>>>>>>>> access > >>>>>>>>>>>>>> the key inside *apply(value)* method. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>> Jeyhun > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax < > >>>>>>>> matth...@confluent.io > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Jeyhun, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> thanks a lot for the KIP! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I think there are two issues we need to address: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users > >> did > >>>>>>>>>>>>> complain > >>>>>>>>>>>>>>> about this in past releases already, and as the user base > >>>> grows, > >>>>>>> we > >>>>>>>>>>>>>>> should not break backward compatibility in future releases > >>>>>>> anymore. > >>>>>>>>>>>>>>> Thus, we should think of a better way to allow key access. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Mathieu's comment goes into the same direction > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On the other hand, the number of compile failures that > >> would > >>>>>>> need > >>>>>>>>>> to > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> fixed from this change is unfortunate. :-) > >>>>>>>>>>>>>>> (2) Another concern is, that there is no guard to prevent > >> user > >>>>>>> code > >>>>>>>>>> to > >>>>>>>>>>>>>>> modify the key. This might corrupt partitioning if users do > >>>> alter > >>>>>>>> the > >>>>>>>>>>>>>>> key (accidentally -- or users are just not aware that they > >> are > >>>>>>> not > >>>>>>>>>>>>>>> allowed to modify the provided key object) and thus break > the > >>>>>>>>>>>>>>> application. (This was the original motivation to not > provide > >>>> the > >>>>>>>> key > >>>>>>>>>>>>> in > >>>>>>>>>>>>>>> the first place -- it's guards against modification.) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote: > >>>>>>>>>>>>>>>> Hi Jeyhun, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I just want to add my voice that, I too, have wished for > >>>> access > >>>>>>> to > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> record key during a mapValues or similar operation. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On the other hand, the number of compile failures that > would > >>>>>>> need > >>>>>>>> to > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>> fixed from this change is unfortunate. :-) But at least > it > >>>>>>> would > >>>>>>>>>> all > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>> pretty clear and easy change. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Mathieu > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov < > >>>>>>>>>> je.kari...@gmail.com > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> Dear community, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 > [2], > >>>>>>>>>>>>> KAFKA-4726 > >>>>>>>>>>>>>>> [3], > >>>>>>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5]. > >>>>>>>>>>>>>>>>> I would like to get your comments. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+ > >>>>>>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner > >>>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218 > >>>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726 > >>>>>>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745 > >>>>>>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946 > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> Jeyhun > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>> -Cheers > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Jeyhun > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>> -Cheers > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Jeyhun > >>>>>>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>> -Cheers > >>>>>>>>> > >>>>>>>>> Jeyhun > >>>>>>>>> > >>>>>>>> > >>>> > >>>> -- > >>> -Cheers > >>> > >>> Jeyhun > >>> > >> > >> -- > > -Cheers > > > > Jeyhun > > > >