I agree with Guozhang that just exposing meta data at the source level might not provide too much value. Furthermore, for timestamps we do already have a well defined contract and we should exploit it: timestamps can always be provided in a meaningful way.
Also, for simple operations like KStream-filter/map the contract is simple and we can just use it. Same for KTable-filter/map (for new values). For aggregations, join, and oldValue, I could just drop some information and return `null`/-1, if the result records has no semantically meaningful meta data. For example, for aggregations, we could preserve the partition (as all agg-input-records have the same partition). For single input topic aggregation (what I guess is the most prominent case), we can also carry over the topic name (would be a internal repartitioning topic name often). Offsets don't have any semantic interpretation IMHO and we could return -1. For joins, we could keep the partition information. Topic and offset are both unknown/invalid for the output record IMHO. For the oldValue case, we can keep partition and for single input topic case topic name. Timestamp might be -1 for now, but after we added timestamps to KTable (what we plan to do anyway), we can also return a valid timestamp. Offset would be -1 again (if we store offset in KTable too, we could provide all offset as well -- but I don't see too much value in doing this compared to the storage overhead this implies). WDYT? -Matthias On 11/29/17 4:14 AM, Jan Filipiak wrote: > Hi, > > thank you for the summary and thanks for acknowledging that I do have a > point here. > > I don't like the second Idea at all. Hence I started of this discussion. > > I am just disappointed, back then when we had the discussion about how > to refactor store overload > and IQ handling, I knew the path we are taking is wrong. Having problems > implementing these kinda > features (wich are really simple) is just a symptom of messed up IQ > implementation. I wish really bad > I could have convinced you guys back then. To be honest with IQ we can > continue here > as we Materialize but would not send oldValue, but with join you're out > of luck with current setup. > > I of course recommend to do not introduce any optimizations here. Id > recommend to go towards what > I recommended already back then. So i would't say we need to optimize > anything later we need to build > the topology better in the first place. > > > > > On 28.11.2017 21:00, Guozhang Wang wrote: >> Jan, >> >> Thanks for your input, I can understand now that the oldValue is also >> exposed in user customized `filter` function and hence want record >> context >> we should expose is a problem. And I think it does brings a good point to >> consider for KIP-159. The discussions maybe a bit confusing to reader >> though, and hence I'd like to summarize the status quo and with a >> proposal: >> >> In today's Streams DSL, when a KTable is created either from a source >> topic, or from an stateful operator, we will materialize the KTable >> with a >> backing state store; on the other hand, KTables created from a >> non-stateful >> operator like filter, will not be backed by a state store by default >> unless >> users indicate so (e.g. using the overloaded function with the queryable >> name or store supplier). >> >> For example: >> >> KTable table1 = builder.table("topic"); // a >> state store created for table1 >> KTable table2 = table1.filter(..); >> // no state store created for table2 >> KTable table3 = table1.filter(.., "storeName"); // a >> state >> store created for table3 >> KTable table4 = table1.groupBy(..).aggregate(..); // a state >> store created for table4 >> >> Because of that, the filter() operator above on table1 will always be >> exposed with oldValue and newValue; Damian's point is that, we may >> optimize >> the first case such that table1 will only be materialized if users >> asked so >> (e.g. using the overloaded function with a store supplier), and in which >> case, we do not need to pass newValue / oldValue pairs (I think this is >> what Jan suggests as well, i.e. do filtering before materializing, so >> that >> we can have a smaller backed state store as well). But this optimization >> does not eliminate the possibilities that we may still need to do >> filter if >> users does specify "yes I do want to the source KTable itself to be >> materialized, please". So the concern about how to expose the record >> context in such cases still persists. >> >> >> With that, regarding to KIP-159 itself, here are my thoughts: >> >> 1) if we restrict the scope of exposing record context only to source >> KTables / KStreams I felt the KIP itself does not bring much value given >> its required API change because only the SourceKStream can safely >> maintain >> its records context, and for SourceKTable if it is materialized, then >> even >> non-stateful operators like Join may still have a concern about exposing >> the record context. >> >> 2) an alternative idea is we provide the semantics on how record context >> would be inherited across the operators for KTable / KStream and >> expose it >> in all operators (similarly in PAPI we would expose a much simpler >> contract), and make it as a public contract that Streams library will >> guarantee moving forward even we optimize our topology builder; it may >> not >> align perfectly with the linear algebraic semantics but practically >> applicable for most cases; if users semantics do not fit in the provided >> contract, then they may need to get this themselves (embed such >> information >> in the value payload, for example). >> >> If people do not like the second idea, I'd suggest we hold on pursuing >> the >> first direction since to me its beneficial scope is too limited >> compared to >> its cost. >> >> >> Guozhang >> >> >> >> On Fri, Nov 24, 2017 at 1:39 AM, Jan Filipiak <jan.filip...@trivago.com> >> wrote: >> >>> Cleary we show the oldValue to the user. We have to, because we filter >>> after the store. >>> https://github.com/axbaretto/kafka/blob/master/streams/src/m >>> ain/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java#L96 >>> >>> >>> I cannot help you following this. It is really obvious and I am running >>> out of tools for explaining. >>> >>> Thanks for understanding my point to put filter before. Not only >>> would it >>> make the store smaller. It would make this feature reasonably >>> possible and >>> the framework easier. Interestingly it would also help to move IQ >>> into more >>> reasonable directions. And it might help understand that we do not >>> need any >>> intermediate representation of the topology, >>> >>> KIP-182 I have no clue what everyone has with their "bytestores" so >>> broken. But putting another store after doesn't help when the store >>> before >>> is the problem. >>> >>> >>> >>> >>> On 24.11.2017 05:08, Matthias J. Sax wrote: >>> >>>> From a DSL point of view, users only see the new value on a >>>> KTable#filter anyway. So why should it be an issue that we use >>>> <newValue,oldValue> pair under the hood? >>>> >>>> User sees newValue and gets corresponding RecordContext. I can't see >>>> any >>>> issue here? >>>> >>>> I cannot follow here: >>>> >>>> Even when we have a statefull operation last. We move it to the very >>>>>> first processor (KtableSource) >>>>>> and therefore cant present a proper RecordContext. >>>>>> >>>> With regard to `builder.table().filter()`: >>>> >>>> I see you point that it would be good to be able to apply the filter() >>>> first to reduce the stat store size of the table. But how is this >>>> related to KIP-159? >>>> >>>> Btw: with KIP-182, I am wondering if this would not be possible, by >>>> putting a custom dummy store into the table and materialize the filter >>>> result afterwards? It's not a nice way to do, but seems to be possible. >>>> >>>> >>>> -Matthias >>>> >>>> On 11/23/17 4:56 AM, Jan Filipiak wrote: >>>> >>>>> The comment is valid. It falls exactly into this topic, it has exactly >>>>> todo with this! >>>>> Even when we have a statefull operation last. We move it to the very >>>>> first processor (KtableSource) >>>>> and therefore cant present a proper RecordContext. >>>>> >>>>> Regarding the other Jiras you are referring to. They harm the project >>>>> more than they do good! >>>>> There is no need for this kind of optimizer and meta representation >>>>> and >>>>> what not. I hope they >>>>> never get implemented. >>>>> >>>>> Best Jan >>>>> >>>>> >>>>> On 22.11.2017 14:44, Damian Guy wrote: >>>>> >>>>>> Jan, i think you comment with respect to filtering is valid, though >>>>>> not for >>>>>> this KIP. We have separate JIRAs for topology optimization of >>>>>> which this >>>>>> falls into. >>>>>> >>>>>> Thanks, >>>>>> Damian >>>>>> >>>>>> On Wed, 22 Nov 2017 at 02:25 Guozhang Wang <wangg...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> Jan, >>>>>>> Not sure I understand your argument that "we still going to present >>>>>>> change.oldValue to the filter even though the record context() is >>>>>>> for >>>>>>> change.newValue". Are you referring to `KTableFilter#process()`? >>>>>>> If yes >>>>>>> could you point to me which LOC are you concerning about? >>>>>>> >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> >>>>>>> On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak < >>>>>>> jan.filip...@trivago.com> >>>>>>> wrote: >>>>>>> >>>>>>> a remark of mine that got missed during migration: >>>>>>>> There is this problem that even though we have >>>>>>>> source.table.filter.join >>>>>>>> the state-fullness happens at the table step not a the join >>>>>>>> step. In a >>>>>>>> filter >>>>>>>> we still going to present change.oldValue to the filter even though >>>>>>>> the >>>>>>>> record context() is for change.newValue. I would go as far as >>>>>>>> applying >>>>>>>> the filter before the table processor. Not to just get KIP-159, but >>>>>>>> >>>>>>> because >>>>>>> >>>>>>>> I think its a side effect of a non ideal topology layout. If i can >>>>>>>> filter >>>>>>>> 99% of my >>>>>>>> records. my state could be way smaller. Also widely escalates the >>>>>>>> context >>>>>>>> of the KIP >>>>>>>> >>>>>>>> I can only see upsides of executing the filter first. >>>>>>>> >>>>>>>> Best Jan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 20.11.2017 22:22, Matthias J. Sax wrote: >>>>>>>> >>>>>>>> I am moving this back to the DISCUSS thread... Last 10 emails were >>>>>>>>> sent >>>>>>>>> to VOTE thread. >>>>>>>>> >>>>>>>>> Copying Guozhang's last summary below. Thanks for this summary. >>>>>>>>> Very >>>>>>>>> comprehensive! >>>>>>>>> >>>>>>>>> It seems, we all agree, that the current implementation of the >>>>>>>>> context >>>>>>>>> at PAPI level is ok, but we should not leak it into DSL. >>>>>>>>> >>>>>>>>> Thus, we can go with (2) or (3), were (3) is an extension to (2) >>>>>>>>> carrying the context to more operators than just sources. It also >>>>>>>>> seems, >>>>>>>>> that we all agree, that many-to-one operations void the context. >>>>>>>>> >>>>>>>>> I still think, that just going with plain (2) is too >>>>>>>>> restrictive -- >>>>>>>>> but >>>>>>>>> I am also fine if we don't go with the full proposal of (3). >>>>>>>>> >>>>>>>>> Also note, that the two operators filter() and filterNot() don't >>>>>>>>> modify >>>>>>>>> the record and thus for both, it would be absolutely valid to keep >>>>>>>>> the >>>>>>>>> context. >>>>>>>>> >>>>>>>>> I personally would keep the context for at least all one-to-one >>>>>>>>> operators. One-to-many is debatable and I am fine to not carry the >>>>>>>>> context further: at least the offset information is >>>>>>>>> questionable for >>>>>>>>> this case -- note thought, that semantically, the timestamp is >>>>>>>>> inherited >>>>>>>>> via one-to-many, and I also think this applies to "topic" and >>>>>>>>> "partition". Thus, I think it's still valuable information we can >>>>>>>>> carry >>>>>>>>> downstreams. >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> Jan: which approach are you referring to as "the approach that is >>>>>>>>> on the >>>>>>>>> >>>>>>>>>> table would be perfect"? >>>>>>>>>> >>>>>>>>>> Note that in today's PAPI layer we are already effectively >>>>>>>>>> exposing the >>>>>>>>>> record context which has the issues that we have been discussing >>>>>>>>>> right >>>>>>>>>> now, >>>>>>>>>> and its semantics is always referring to the "processing >>>>>>>>>> record" at >>>>>>>>>> >>>>>>>>> hand. >>>>>>>> More specifically, we can think of processing a record a bit >>>>>>>>>> different: >>>>>>>>>> >>>>>>>>>> 1) the record traversed the topology from source to sink, it >>>>>>>>>> may be >>>>>>>>>> transformed into new object or even generate multiple new objects >>>>>>>>>> >>>>>>>>> (think: >>>>>>>> branch) along the traversal. And the record context is referring to >>>>>>>>> this >>>>>>>> processing record. Here the "lifetime" of the record lasts for the >>>>>>>>> entire >>>>>>>> topology traversal and any new records of this traversal is >>>>>>>>>> treated as >>>>>>>>>> different transformed values of this record (this applies to join >>>>>>>>>> and >>>>>>>>>> aggregations as well). >>>>>>>>>> >>>>>>>>>> 2) the record being processed is wiped out in the first operator >>>>>>>>>> after >>>>>>>>>> the >>>>>>>>>> source, and NEW records are forwarded to downstream operators. >>>>>>>>>> I.e. >>>>>>>>>> >>>>>>>>> each >>>>>>>> record only lives between two adjacent operators, once it >>>>>>>> reached the >>>>>>>>> new >>>>>>>> operator it's lifetime has ended and new records are generated. >>>>>>>>>> I think in the past we have talked about Streams under both >>>>>>>>>> context, >>>>>>>>>> >>>>>>>>> and >>>>>>>> we >>>>>>>>>> do not have a clear agreement. I agree that 2) is logically more >>>>>>>>>> understandable for users as it does not leak any internal >>>>>>>>>> >>>>>>>>> implementation >>>>>>>> details (e.g. for stream-table joins, table record's traversal >>>>>>>>>> ends at >>>>>>>>>> the >>>>>>>>>> join operator as it is only be materialized, while stream >>>>>>>>>> record's >>>>>>>>>> traversal goes through the join operator to further down until >>>>>>>>>> sinks). >>>>>>>>>> However if we are going to interpret following 2) above then even >>>>>>>>>> for >>>>>>>>>> non-stateful operators we would not inherit record context. What >>>>>>>>>> we're >>>>>>>>>> discussing now, seems to infer a third semantics: >>>>>>>>>> >>>>>>>>>> 3) a record would traverse "through" one-to-one (non-stateful) >>>>>>>>>> >>>>>>>>> operators, >>>>>>>> will "replicate" at one-to-many (non-stateful) operators (think: >>>>>>>>>> "mapValues" >>>>>>>>>> ) and will "end" at many-to-one (stateful) operators >>>>>>>>>> where NEW >>>>>>>>>> >>>>>>>>> records >>>>>>>> will be generated and forwarded to the downstream operators. >>>>>>>>>> Just wanted to lay the ground for discussions so we are all on >>>>>>>>>> the >>>>>>>>>> same >>>>>>>>>> page before chatting more. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> On 11/6/17 1:41 PM, Jeyhun Karimov wrote: >>>>>>>>> Hi Matthias, >>>>>>>>>> Thanks a lot for correcting. It is a leftover from the past >>>>>>>>>> designs >>>>>>>>>> >>>>>>>>> when >>>>>>>> punctuate() was not deprecated. >>>>>>>>>> I corrected. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Jeyhun >>>>>>>>>> >>>>>>>>>> On Mon, Nov 6, 2017 at 5:30 PM Matthias J. Sax >>>>>>>>>> <matth...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> I just re-read the KIP. >>>>>>>>>> >>>>>>>>>>> One minor comment: we don't need to introduce any deprecated >>>>>>>>>>> methods. >>>>>>>>>>> Thus, RichValueTransformer#punctuate can be removed completely >>>>>>>>>>> instead >>>>>>>>>>> of introducing it as deprecated. >>>>>>>>>>> >>>>>>>>>>> Otherwise looks good to me. >>>>>>>>>>> >>>>>>>>>>> Thanks for being so patient! >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> On 11/1/17 9:16 PM, Guozhang Wang wrote: >>>>>>>>>>> >>>>>>>>>>> Jeyhun, >>>>>>>>>>>> I think I'm convinced to not do KAFKA-3907 in this KIP. We >>>>>>>>>>>> should >>>>>>>>>>>> >>>>>>>>>>> think >>>>>>>> carefully if we should add this functionality to the DSL layer >>>>>>>>>>>> moving >>>>>>>>>>>> forward since from what we discovered working on it the >>>>>>>>>>>> conclusion is >>>>>>>>>>>> >>>>>>>>>>>> that >>>>>>>>>>> it would require revamping the public APIs quite a lot, and it's >>>>>>>>>>>> not >>>>>>>>>>>> >>>>>>>>>>>> clear >>>>>>>>>>> if it is a good trade-off than asking users to call process() >>>>>>>>>>> instead. >>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Nov 1, 2017 at 4:50 AM, Damian Guy >>>>>>>>>>>> <damian....@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi Jeyhun, thanks, looks good. >>>>>>>>>>>> >>>>>>>>>>>>> Do we need to remove the line that says: >>>>>>>>>>>>> >>>>>>>>>>>>> - on-demand commit() feature >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Damian >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, 31 Oct 2017 at 23:07 Jeyhun Karimov < >>>>>>>>>>>>> je.kari...@gmail.com> >>>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>>> I removed the 'commit()' feature, as we discussed. It >>>>>>>>>>>>> simplified >>>>>>>>>>>>> the >>>>>>>> overall design of KIP a lot. >>>>>>>>>>>>>> If it is ok, I would like to start a VOTE thread. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Oct 27, 2017 at 5:28 PM Matthias J. Sax < >>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks. I understand what you are saying, but I don't >>>>>>>>>>>>>> agree that >>>>>>>>>>>>>> >>>>>>>>>>>>>>> but also we need a commit() method >>>>>>>>>>>>>>> I would just not provide `commit()` at DSL level and >>>>>>>>>>>>>>> close the >>>>>>>>>>>>>>> corresponding Jira as "not a problem" or similar. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 10/27/17 3:42 PM, Jeyhun Karimov wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>> Thanks for your comments. I agree that this is not the best >>>>>>>>>>>>>>>> way >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> to >>>>>>>> do. >>>>>>>>>>>>>> A >>>>>>>>>>>>>> >>>>>>>>>>>>>> bit of history behind this design. >>>>>>>>>>>>>>>> Prior doing this, I tried to provide ProcessorContext >>>>>>>>>>>>>>>> itself >>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> an >>>>>>>> argument >>>>>>>>>>>>>>> in Rich interfaces. However, we dont want to give users that >>>>>>>>>>>>>>>> flexibility >>>>>>>>>>>>>>> and “power”. Moreover, ProcessorContext contains processor >>>>>>>>>>>>>>> level >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> information and not Record level info. The only thing we >>>>>>>>>>>>>>>> need ij >>>>>>>>>>>>>>>> ProcessorContext is commit() method. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So, as far as I understood, we need recor context (offset, >>>>>>>>>>>>>>>> timestamp >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>> etc) but also we need a commit() method ( we dont want to >>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ProcessorContext as a parameter so users can use >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ProcessorContext.commit() >>>>>>>>>>>>>>> ). >>>>>>>>>>>>>>>> As a result, I thought to “propagate” commit() call from >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> RecordContext >>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> ProcessorContext() . >>>>>>>>>>>>>>>> If there is a misunderstanding in motvation/discussion of >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> KIP/included >>>>>>>>>>>>>> jiras please let me know. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri 27. Oct 2017 at 12:39, Matthias J. Sax < >>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> I am personally still not convinced, that we should add >>>>>>>>>>>>>>> `commit()` >>>>>>>> at >>>>>>>>>>>>>>> all. >>>>>>>>>>>>>>> @Guozhang: you created the original Jira. Can you >>>>>>>>>>>>>>> elaborate a >>>>>>>>>>>>>>>>> little >>>>>>>>>>>>>>>>> bit? Isn't requesting commits a low level API that should >>>>>>>>>>>>>>>>> not be >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> exposed >>>>>>>>>>>>>>> in the DSL? Just want to understand the motivation >>>>>>>>>>>>>>> better. Why >>>>>>>>>>>>>>> >>>>>>>>>>>>>> would >>>>>>>> anybody that uses the DSL ever want to request a commit? To >>>>>>>>>>>>>>>>> me, >>>>>>>>>>>>>>>>> requesting commits is useful if you manipulated state >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> explicitly, >>>>>>>> ie, >>>>>>>>>>>>>>> via Processor API. >>>>>>>>>>>>>>> Also, for the solution: it seem rather unnatural to me, >>>>>>>>>>>>>>>>> that we >>>>>>>>>>>>>>>>> add >>>>>>>>>>>>>>>>> `commit()` to `RecordContext` -- from my understanding, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> `RecordContext` >>>>>>>>>>>>>>> is an helper object that provide access to record meta data. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Requesting >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> a commit is something quite different. Additionally, a >>>>>>>>>>>>>>> commit >>>>>>>>>>>>>>> does >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> commit a specific record but a `RecrodContext` is for a >>>>>>>>>>>>>> specific >>>>>>>>>>>>>> >>>>>>>>>>>>>>> record. >>>>>>>>>>>>>>> To me, this does not seem to be a sound API design if we >>>>>>>>>>>>>>> follow >>>>>>>>>>>>>>> >>>>>>>>>>>>>> this >>>>>>>> path. >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 10/26/17 10:41 PM, Jeyhun Karimov wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> Thanks for your suggestions. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I have some comments, to make sure that there is no >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> misunderstanding. >>>>>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in >>>>>>>>>>>>>>>>>> ProcessorContext, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>> enforce >>>>>>>>>>>>>>>> user to consolidate this call as >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And internal >>>>>>>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` is >>>>>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> changed >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> this call. >>>>>>>>>>>>>>>>> - I think we should not deprecate >>>>>>>>>>>>>>>>>> `ProcessorContext.commit()`. >>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> main >>>>>>>>>>>>>>>> intuition that we introduce `commit()` in >>>>>>>>>>>>>>>> `RecordContext` is >>>>>>>>>>>>>>> that, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> `RecordContext` is the one which is provided in Rich >>>>>>>>>>>>>>>>> interfaces. >>>>>>>> So >>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>> wants to commit, then there should be some method inside >>>>>>>>>>>>>>>>>> `RecordContext` >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> do so. Internally, `RecordContext.commit()` calls >>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` (see the last code >>>>>>>>>>>>>>>>>> snippet in >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> KIP-159): >>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>> public void process(final K1 key, final V1 value) { >>>>>>>>>>>>>>>>>> recordContext = new RecordContext() >>>>>>>>>>>>>>>>>> { // >>>>>>>>>>>>>>>>>> recordContext initialization is added in this KIP >>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>> public void commit() { >>>>>>>>>>>>>>>>>> context().commit(); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>> public long offset() { >>>>>>>>>>>>>>>>>> return context().recordContext().offs >>>>>>>>>>>>>>>>>> et(); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>> public long timestamp() { >>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>> context().recordContext().timestamp(); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>> public String topic() { >>>>>>>>>>>>>>>>>> return context().recordContext().topi >>>>>>>>>>>>>>>>>> c(); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>> public int partition() { >>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>> context().recordContext().partition(); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> }; >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So, we cannot deprecate `ProcessorContext.commit()` in >>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> case >>>>>>>> IMO. >>>>>>>>>>>>>>>>>> 2. Add the `task` reference to the impl class, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, >>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> that it can implement the commit call itself. >>>>>>>>>>>>>>>>>> - Actually, I don't think that we need `commit()` in >>>>>>>>>>>>>>>>>> `ProcessorRecordContext`. The main intuition is to >>>>>>>>>>>>>>>>>> "transfer" >>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` call to Rich interfaces, to >>>>>>>>>>>>>>>>>> support >>>>>>>>>>>>>>>>>> user-specific committing. >>>>>>>>>>>>>>>>>> To do so, we introduce `commit()` method in >>>>>>>>>>>>>>>>>> `RecordContext()` >>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> call ProcessorContext.commit() inside. (see the above code >>>>>>>>>>>>>>>>>> snippet) >>>>>>>>>>>>>>>>>> So, in Rich interfaces, we are not dealing with >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> `ProcessorRecordContext` >>>>>>>>>>>>>>>> at all, and we leave all its methods as it is. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> In this KIP, we made `RecordContext` to be the parent >>>>>>>>>>>>>>>>>> class of >>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, just because of they share >>>>>>>>>>>>>>>>>> quite >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> amount >>>>>>>> of >>>>>>>>>>>>>>>> methods and it is logical to enable inheritance between >>>>>>>>>>>>>>>> those >>>>>>>>>>>>>> two. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However, call to a >>>>>>>>>>>>>>>>>> commit() >>>>>>>>>>>>>>>> method, >>>>>>>>>>>>>>> is valid only within RecordContext interface (at least for >>>>>>>>>>>>>>>>> now), >>>>>>>> we >>>>>>>>>>>>>>>>> throw >>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and the >>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>> snippet >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> below would need to be updated as well. >>>>>>>>>>>>>>>> - I think above explanation covers this as well. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I want to gain some speed to this KIP, as it has gone >>>>>>>>>>>>>>>>>> though >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> many >>>>>>>> changes >>>>>>>>>>>>>>>> based on user/developer needs, both in >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> documentation-/implementation-wise. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Oct 24, 2017 at 1:41 AM Guozhang Wang < >>>>>>>>>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> Thanks for the information Jeyhun. I had also forgot about >>>>>>>>>>>>>>>>>> KAFKA-3907 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>> this KIP.. >>>>>>>>>>>>>>>>>>> Thinking a bit more, I'm now inclined to go with what we >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> agreed >>>>>>>> before, >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> add the commit() call to `RecordContext`. A few minor >>>>>>>>>>>>>>>>>> tweaks on >>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> implementation: >>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in >>>>>>>>>>>>>>>>>>> ProcessorContext, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> to >>>>>>>> enforce >>>>>>>>>>>>>>>>> user to consolidate this call as >>>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And internal >>>>>>>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` is >>>>>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> changed >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> this call. >>>>>>>>>>>>>>>>> 2. Add the `task` reference to the impl class, >>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, so >>>>>>>>>>>>>>>>>> that it can implement the commit call itself. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However, >>>>>>>>>>>>>>>>>>> call to a >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> commit() >>>>>>>>>>>>>>>>> method, >>>>>>>>>>>>>>> is valid only within RecordContext interface (at least for >>>>>>>>>>>>>>>>>> now), >>>>>>>> we >>>>>>>>>>>>>>>>> throw >>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and the >>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>> snippet >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> below would need to be updated as well. >>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Mon, Oct 23, 2017 at 1:40 PM, Matthias J. Sax < >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> Fair point. This is a long discussion and I totally forgot >>>>>>>>>>>>>>>>>> that >>>>>>>> we >>>>>>>>>>>>>>>>>> discussed this. >>>>>>>>>>>>>>> Seems I changed my opinion about including KAFKA-3907... >>>>>>>>>>>>>>>>>>>> Happy to hear what others think. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 10/23/17 1:20 PM, Jeyhun Karimov wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>>>>>> It is probably my bad, the discussion was a bit >>>>>>>>>>>>>>>>>>>>> long in >>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> thread. I >>>>>>>>>>>>>>>>>>> proposed the related issue in the related KIP discuss >>>>>>>>>>>>>>>> thread >>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> got >>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>> approval [2,3]. >>>>>>>>>>>>>>>>>>>>> Maybe I misunderstood. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kaf >>>>>>>>>>>>>>>>>>>>> ka/uyzND19Asmg1GKKXT1?subj= >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kaf >>>>>>>>>>>>>>>>>>>>> ka/uyzND1kpct22GKKXT1?subj= >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> [3] >>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1G6TGIGKKXT1?subj= >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Mon, Oct 23, 2017 at 8:44 PM Matthias J. Sax < >>>>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> Interesting. >>>>>>>>>>>>>>>>>>>>>> I thought that https://issues.apache.org/ >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> jira/browse/KAFKA-4125 >>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> main motivation for this KIP :) >>>>>>>>>>>>>>>>>>>>> I also think, that we should not expose the full >>>>>>>>>>>>>>>>>>>>>> ProcessorContext >>>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> DSL >>>>>>>>>>>>>>>>> level. >>>>>>>>>>>>>>>>>>>>> Thus, overall I am not even sure if we should fix >>>>>>>>>>>>>>>>>>>>> KAFKA-3907 >>>>>>>> at >>>>>>>>>>>>>>>>>>>>>> all. >>>>>>>>>>>>>>>>>>>> Manual commits are something DSL users should not worry >>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>> one really needs this, an advanced user can still insert a >>>>>>>>>>>>>>>>>>>>> dummy >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> `transform` to request a commit from there. >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On 10/18/17 5:39 AM, Jeyhun Karimov wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>> The main intuition is to solve [1], which is part of >>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>> KIP. >>>>>>>>>>>>>>>>>>>>>>> I agree with you that this might not seem >>>>>>>>>>>>>>>>>>>>>>> semantically >>>>>>>>>>>>>>>>>>>>>>> correct >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>> not committing record state. >>>>>>>>>>>>>>>>>>>>>> Alternatively, we can remove commit() from >>>>>>>>>>>>>>>>>>>>>> RecordContext >>>>>>>>>>>>>>>>>>>>>> and >>>>>>>> add >>>>>>>>>>>>>>>>>>>>> ProcessorContext (which has commit() method) as an >>>>>>>>>>>>>>>>>>>>> extra >>>>>>>>>>>>>>> argument >>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> Rich >>>>>>>>>>>>>>>>> methods: >>>>>>>>>>>>>>>>>>>>>> instead of >>>>>>>>>>>>>>>>>>>>>>> public interface RichValueMapper<V, VR, K> { >>>>>>>>>>>>>>>>>>>>>>> VR apply(final V value, >>>>>>>>>>>>>>>>>>>>>>> final K key, >>>>>>>>>>>>>>>>>>>>>>> final RecordContext recordContext); >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> we can adopt >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> public interface RichValueMapper<V, VR, K> { >>>>>>>>>>>>>>>>>>>>>>> VR apply(final V value, >>>>>>>>>>>>>>>>>>>>>>> final K key, >>>>>>>>>>>>>>>>>>>>>>> final RecordContext recordContext, >>>>>>>>>>>>>>>>>>>>>>> final ProcessorContext >>>>>>>>>>>>>>>>>>>>>>> processorContext); >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> However, in this case, a user can get confused as >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ProcessorContext >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> RecordContext share some methods with the same name. >>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907 >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 17, 2017 at 3:19 AM Guozhang Wang < >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> Regarding #6 above, I'm still not clear why we would >>>>>>>>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>>>>> `commit()` >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> both ProcessorContext and RecordContext, could you >>>>>>>>>>>>>>>>>>>>> elaborate >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>> bit >>>>>>>>>>>>>>> more? >>>>>>>>>>>>>>>>> To me `commit()` is really a processor context not a >>>>>>>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>>>> logically: when you call that function, it means we >>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> commit >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>> of the whole task up to this processed record, not only >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>> single >>>>>>>>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>>> itself. >>>>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 16, 2017 at 9:19 AM, Jeyhun Karimov < >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> je.kari...@gmail.com >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 0. RichInitializer definition seems missing. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> - Fixed. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd suggest moving the key parameter in the >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> RichValueXX >>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>> RichReducer >>>>>>>>>>>>>>>>>>>>>>> after the value parameters, as well as in the >>>>>>>>>>>>>>>>>>>>>>> templates; >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> e.g. >>>>>>>>>>>>>>>>>>>>>>>> public interface RichValueJoiner<V1, V2, VR, K> { >>>>>>>>>>>>>>> VR apply(final V1 value1, final V2 value2, >>>>>>>>>>>>>>>>>>>>>>>>>> final K >>>>>>>>>>>>>>>>>>>>>>>>>> key, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> final >>>>>>>>>>>>>>>>>>>>>>>> RecordContext >>>>>>>>>>>>>>>>> recordContext); >>>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> - Fixed. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 2. Some of the listed functions are not necessary >>>>>>>>>>>>>>>>>>>>>>>>> since >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> their >>>>>>>>>>>>>>>>>>>>>>> pairing >>>>>>>>>>>>>>> APIs >>>>>>>>>>>>>>>>>>>>> are being deprecated in 1.0 already: >>>>>>>>>>>>>>>>>>>>>>>>>> <KR> KGroupedStream<KR, V> groupBy(final >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> RichKeyValueMapper<? >>>>>>>>>>>>>>>>>>>>>>>> super >>>>>>>>>>>>>>> K, >>>>>>>>>>>>>>>>>>>> ? >>>>>>>>>>>>>>>>>>>>>> super V, KR> selector, >>>>>>>>>>>>>>>>>>>>>>>>>> final >>>>>>>>>>>>>>>>>>>>>>>>>> Serde<KR> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> keySerde, >>>>>>>>>>>>>>>>>>>>>>>> final >>>>>>>>>>>>>>>>>>>>>>>> Serde<V> >>>>>>>>>>>>>>> valSerde); >>>>>>>>>>>>>>>>>>>>>>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K, >>>>>>>>>>>>>>>>>>>>>>>> VT> >>>>>>>>>>>>>> table, >>>>>>>>>>>>>> >>>>>>>>>>>>>>> final >>>>>>>>>>>>>>>>>>>>>>>>> RichValueJoiner<? >>>>>>>> super >>>>>>>>>>>>>>>>>>>>>>>> K, >>>>>>>>>>>>>>> ? >>>>>>>>>>>>>>>>> super >>>>>>>>>>>>>>>>>>>>> V, >>>>>>>>>>>>>>>>>>>>>>>>>> ? super VT, ? extends VR> joiner, >>>>>>>>>>>>>>>>>>>>>>>>>> final >>>>>>>>>>>>>>>>>>>>>>>>>> Serde<K> >>>>>>>>>>>>>>>>>>>>>>>>>> keySerde, >>>>>>>>>>>>>>>>>>>>>>>>>> final >>>>>>>>>>>>>>>>>>>>>>>>>> Serde<V> >>>>>>>>>>>>>>>>>>>>>>>>>> valSerde); >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> -Fixed >>>>>>>>>>>>>>>>>>>>>>>>> 3. For a few functions where we are adding >>>>>>>>>>>>>>>>>>>>>>>>> three APIs >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>> >> > >
signature.asc
Description: OpenPGP digital signature