I agree with Guozhang that just exposing meta data at the source level
might not provide too much value. Furthermore, for timestamps we do
already have a well defined contract and we should exploit it:
timestamps can always be provided in a meaningful way.

Also, for simple operations like KStream-filter/map the contract is
simple and we can just use it. Same for KTable-filter/map (for new values).

For aggregations, join, and oldValue, I could just drop some information
and return `null`/-1, if the result records has no semantically
meaningful meta data.

For example, for aggregations, we could preserve the partition (as all
agg-input-records have the same partition). For single input topic
aggregation (what I guess is the most prominent case), we can also carry
over the topic name (would be a internal repartitioning topic name
often). Offsets don't have any semantic interpretation IMHO and we could
return -1.

For joins, we could keep the partition information. Topic and offset are
both unknown/invalid for the output record IMHO.

For the oldValue case, we can keep partition and for single input topic
case topic name. Timestamp might be -1 for now, but after we added
timestamps to KTable (what we plan to do anyway), we can also return a
valid timestamp. Offset would be -1 again (if we store offset in KTable
too, we could provide all offset as well -- but I don't see too much
value in doing this compared to the storage overhead this implies).


WDYT?


-Matthias

On 11/29/17 4:14 AM, Jan Filipiak wrote:
> Hi,
> 
> thank you for the summary and thanks for acknowledging that I do have a
> point here.
> 
> I don't like the second Idea at all. Hence I started of this discussion.
> 
> I am just disappointed, back then when we had the discussion about how
> to refactor store overload
> and IQ handling, I knew the path we are taking is wrong. Having problems
> implementing these kinda
> features (wich are really simple)  is just a symptom of messed up IQ
> implementation. I wish really bad
> I could have convinced you guys back then. To be honest with IQ we can
> continue here
> as we Materialize but would not send oldValue, but with join you're out
> of luck with current setup.
> 
> I of course recommend to do not introduce any optimizations here. Id
> recommend to go towards what
> I recommended already back then. So i would't say we need to optimize
> anything later we need to build
> the topology better in the first place.
> 
> 
> 
> 
> On 28.11.2017 21:00, Guozhang Wang wrote:
>> Jan,
>>
>> Thanks for your input, I can understand now that the oldValue is also
>> exposed in user customized `filter` function and hence want record
>> context
>> we should expose is a problem. And I think it does brings a good point to
>> consider for KIP-159. The discussions maybe a bit confusing to reader
>> though, and hence I'd like to summarize the status quo and with a
>> proposal:
>>
>> In today's Streams DSL, when a KTable is created either from a source
>> topic, or from an stateful operator, we will materialize the KTable
>> with a
>> backing state store; on the other hand, KTables created from a
>> non-stateful
>> operator like filter, will not be backed by a state store by default
>> unless
>> users indicate so (e.g. using the overloaded function with the queryable
>> name or store supplier).
>>
>> For example:
>>
>> KTable table1 = builder.table("topic");                              // a
>> state store created for table1
>> KTable table2 = table1.filter(..);
>> // no state store created for table2
>> KTable table3 = table1.filter(.., "storeName");                  // a
>> state
>> store created for table3
>> KTable table4 = table1.groupBy(..).aggregate(..);            // a state
>> store created for table4
>>
>> Because of that, the filter() operator above on table1 will always be
>> exposed with oldValue and newValue; Damian's point is that, we may
>> optimize
>> the first case such that table1 will only be materialized if users
>> asked so
>> (e.g. using the overloaded function with a store supplier), and in which
>> case, we do not need to pass newValue / oldValue pairs (I think this is
>> what Jan suggests as well, i.e. do filtering before materializing, so
>> that
>> we can have a smaller backed state store as well). But this optimization
>> does not eliminate the possibilities that we may still need to do
>> filter if
>> users does specify "yes I do want to the source KTable itself to be
>> materialized, please". So the concern about how to expose the record
>> context in such cases still persists.
>>
>>
>> With that, regarding to KIP-159 itself, here are my thoughts:
>>
>> 1) if we restrict the scope of exposing record context only to source
>> KTables / KStreams I felt the KIP itself does not bring much value given
>> its required API change because only the SourceKStream can safely
>> maintain
>> its records context, and for SourceKTable if it is materialized, then
>> even
>> non-stateful operators like Join may still have a concern about exposing
>> the record context.
>>
>> 2) an alternative idea is we provide the semantics on how record context
>> would be inherited across the operators for KTable / KStream and
>> expose it
>> in all operators (similarly in PAPI we would expose a much simpler
>> contract), and make it as a public contract that Streams library will
>> guarantee moving forward even we optimize our topology builder; it may
>> not
>> align perfectly with the linear algebraic semantics but practically
>> applicable for most cases; if users semantics do not fit in the provided
>> contract, then they may need to get this themselves (embed such
>> information
>> in the value payload, for example).
>>
>> If people do not like the second idea, I'd suggest we hold on pursuing
>> the
>> first direction since to me its beneficial scope is too limited
>> compared to
>> its cost.
>>
>>
>> Guozhang
>>
>>
>>
>> On Fri, Nov 24, 2017 at 1:39 AM, Jan Filipiak <jan.filip...@trivago.com>
>> wrote:
>>
>>> Cleary we show the oldValue to the user. We have to, because we filter
>>> after the store.
>>> https://github.com/axbaretto/kafka/blob/master/streams/src/m
>>> ain/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java#L96
>>>
>>>
>>> I cannot help you following this. It is really obvious and I am running
>>> out of tools for explaining.
>>>
>>> Thanks for understanding my point to put filter before. Not only
>>> would it
>>> make the store smaller. It would make this feature reasonably
>>> possible and
>>> the framework easier. Interestingly it would also help to move IQ
>>> into more
>>> reasonable directions. And it might help understand that we do not
>>> need any
>>> intermediate representation of the topology,
>>>
>>> KIP-182 I have no clue what everyone has with their "bytestores" so
>>> broken. But putting another store after doesn't help when the store
>>> before
>>> is the problem.
>>>
>>>
>>>
>>>
>>> On 24.11.2017 05:08, Matthias J. Sax wrote:
>>>
>>>>   From a DSL point of view, users only see the new value on a
>>>> KTable#filter anyway. So why should it be an issue that we use
>>>> <newValue,oldValue> pair under the hood?
>>>>
>>>> User sees newValue and gets corresponding RecordContext. I can't see
>>>> any
>>>> issue here?
>>>>
>>>> I cannot follow here:
>>>>
>>>> Even when we have a statefull operation last. We move it to the very
>>>>>> first processor (KtableSource)
>>>>>> and therefore cant present a proper RecordContext.
>>>>>>
>>>> With regard to `builder.table().filter()`:
>>>>
>>>> I see you point that it would be good to be able to apply the filter()
>>>> first to reduce the stat store size of the table. But how is this
>>>> related to KIP-159?
>>>>
>>>> Btw: with KIP-182, I am wondering if this would not be possible, by
>>>> putting a custom dummy store into the table and materialize the filter
>>>> result afterwards? It's not a nice way to do, but seems to be possible.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 11/23/17 4:56 AM, Jan Filipiak wrote:
>>>>
>>>>> The comment is valid. It falls exactly into this topic, it has exactly
>>>>> todo with this!
>>>>> Even when we have a statefull operation last. We move it to the very
>>>>> first processor (KtableSource)
>>>>> and therefore cant present a proper RecordContext.
>>>>>
>>>>> Regarding the other Jiras you are referring to. They harm the project
>>>>> more than they do good!
>>>>> There is no need for this kind of optimizer and meta representation
>>>>> and
>>>>> what not. I hope they
>>>>> never get implemented.
>>>>>
>>>>> Best Jan
>>>>>
>>>>>
>>>>> On 22.11.2017 14:44, Damian Guy wrote:
>>>>>
>>>>>> Jan, i think you comment with respect to filtering is valid, though
>>>>>> not for
>>>>>> this KIP. We have separate JIRAs for topology optimization of
>>>>>> which this
>>>>>> falls into.
>>>>>>
>>>>>> Thanks,
>>>>>> Damian
>>>>>>
>>>>>> On Wed, 22 Nov 2017 at 02:25 Guozhang Wang <wangg...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Jan,
>>>>>>> Not sure I understand your argument that "we still going to present
>>>>>>> change.oldValue to the filter even though the record context() is
>>>>>>> for
>>>>>>> change.newValue". Are you referring to `KTableFilter#process()`?
>>>>>>> If yes
>>>>>>> could you point to me which LOC are you concerning about?
>>>>>>>
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak <
>>>>>>> jan.filip...@trivago.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> a remark of mine that got missed during migration:
>>>>>>>> There is this problem that even though we have
>>>>>>>> source.table.filter.join
>>>>>>>> the state-fullness happens at the table step not a the join
>>>>>>>> step. In a
>>>>>>>> filter
>>>>>>>> we still going to present change.oldValue to the filter even though
>>>>>>>> the
>>>>>>>> record context() is for change.newValue. I would go as far as
>>>>>>>> applying
>>>>>>>> the filter before the table processor. Not to just get KIP-159, but
>>>>>>>>
>>>>>>> because
>>>>>>>
>>>>>>>> I think its a side effect of a non ideal topology layout. If i can
>>>>>>>> filter
>>>>>>>> 99% of my
>>>>>>>> records. my state could be way smaller. Also widely escalates the
>>>>>>>> context
>>>>>>>> of the KIP
>>>>>>>>
>>>>>>>> I can only see upsides of executing the filter first.
>>>>>>>>
>>>>>>>> Best Jan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 20.11.2017 22:22, Matthias J. Sax wrote:
>>>>>>>>
>>>>>>>> I am moving this back to the DISCUSS thread... Last 10 emails were
>>>>>>>>> sent
>>>>>>>>> to VOTE thread.
>>>>>>>>>
>>>>>>>>> Copying Guozhang's last summary below. Thanks for this summary.
>>>>>>>>> Very
>>>>>>>>> comprehensive!
>>>>>>>>>
>>>>>>>>> It seems, we all agree, that the current implementation of the
>>>>>>>>> context
>>>>>>>>> at PAPI level is ok, but we should not leak it into DSL.
>>>>>>>>>
>>>>>>>>> Thus, we can go with (2) or (3), were (3) is an extension to (2)
>>>>>>>>> carrying the context to more operators than just sources. It also
>>>>>>>>> seems,
>>>>>>>>> that we all agree, that many-to-one operations void the context.
>>>>>>>>>
>>>>>>>>> I still think, that just going with plain (2) is too
>>>>>>>>> restrictive --
>>>>>>>>> but
>>>>>>>>> I am also fine if we don't go with the full proposal of (3).
>>>>>>>>>
>>>>>>>>> Also note, that the two operators filter() and filterNot() don't
>>>>>>>>> modify
>>>>>>>>> the record and thus for both, it would be absolutely valid to keep
>>>>>>>>> the
>>>>>>>>> context.
>>>>>>>>>
>>>>>>>>> I personally would keep the context for at least all one-to-one
>>>>>>>>> operators. One-to-many is debatable and I am fine to not carry the
>>>>>>>>> context further: at least the offset information is
>>>>>>>>> questionable for
>>>>>>>>> this case -- note thought, that semantically, the timestamp is
>>>>>>>>> inherited
>>>>>>>>> via one-to-many, and I also think this applies to "topic" and
>>>>>>>>> "partition". Thus, I think it's still valuable information we can
>>>>>>>>> carry
>>>>>>>>> downstreams.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> Jan: which approach are you referring to as "the approach that is
>>>>>>>>> on the
>>>>>>>>>
>>>>>>>>>> table would be perfect"?
>>>>>>>>>>
>>>>>>>>>> Note that in today's PAPI layer we are already effectively
>>>>>>>>>> exposing the
>>>>>>>>>> record context which has the issues that we have been discussing
>>>>>>>>>> right
>>>>>>>>>> now,
>>>>>>>>>> and its semantics is always referring to the "processing
>>>>>>>>>> record" at
>>>>>>>>>>
>>>>>>>>> hand.
>>>>>>>> More specifically, we can think of processing a record a bit
>>>>>>>>>> different:
>>>>>>>>>>
>>>>>>>>>> 1) the record traversed the topology from source to sink, it
>>>>>>>>>> may be
>>>>>>>>>> transformed into new object or even generate multiple new objects
>>>>>>>>>>
>>>>>>>>> (think:
>>>>>>>> branch) along the traversal. And the record context is referring to
>>>>>>>>> this
>>>>>>>> processing record. Here the "lifetime" of the record lasts for the
>>>>>>>>> entire
>>>>>>>> topology traversal and any new records of this traversal is
>>>>>>>>>> treated as
>>>>>>>>>> different transformed values of this record (this applies to join
>>>>>>>>>> and
>>>>>>>>>> aggregations as well).
>>>>>>>>>>
>>>>>>>>>> 2) the record being processed is wiped out in the first operator
>>>>>>>>>> after
>>>>>>>>>> the
>>>>>>>>>> source, and NEW records are forwarded to downstream operators.
>>>>>>>>>> I.e.
>>>>>>>>>>
>>>>>>>>> each
>>>>>>>> record only lives between two adjacent operators, once it
>>>>>>>> reached the
>>>>>>>>> new
>>>>>>>> operator it's lifetime has ended and new records are generated.
>>>>>>>>>> I think in the past we have talked about Streams under both
>>>>>>>>>> context,
>>>>>>>>>>
>>>>>>>>> and
>>>>>>>> we
>>>>>>>>>> do not have a clear agreement. I agree that 2) is logically more
>>>>>>>>>> understandable for users as it does not leak any internal
>>>>>>>>>>
>>>>>>>>> implementation
>>>>>>>> details (e.g. for stream-table joins, table record's traversal
>>>>>>>>>> ends at
>>>>>>>>>> the
>>>>>>>>>> join operator as it is only be materialized, while stream
>>>>>>>>>> record's
>>>>>>>>>> traversal goes through the join operator to further down until
>>>>>>>>>> sinks).
>>>>>>>>>> However if we are going to interpret following 2) above then even
>>>>>>>>>> for
>>>>>>>>>> non-stateful operators we would not inherit record context. What
>>>>>>>>>> we're
>>>>>>>>>> discussing now, seems to infer a third semantics:
>>>>>>>>>>
>>>>>>>>>> 3) a record would traverse "through" one-to-one (non-stateful)
>>>>>>>>>>
>>>>>>>>> operators,
>>>>>>>> will "replicate" at one-to-many (non-stateful) operators (think:
>>>>>>>>>> "mapValues"
>>>>>>>>>>      ) and will "end" at many-to-one (stateful) operators
>>>>>>>>>> where NEW
>>>>>>>>>>
>>>>>>>>> records
>>>>>>>> will be generated and forwarded to the downstream operators.
>>>>>>>>>> Just wanted to lay the ground for discussions so we are all on
>>>>>>>>>> the
>>>>>>>>>> same
>>>>>>>>>> page before chatting more.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>> On 11/6/17 1:41 PM, Jeyhun Karimov wrote:
>>>>>>>>> Hi Matthias,
>>>>>>>>>> Thanks a lot for correcting. It is a leftover from the past
>>>>>>>>>> designs
>>>>>>>>>>
>>>>>>>>> when
>>>>>>>> punctuate() was not deprecated.
>>>>>>>>>> I corrected.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Jeyhun
>>>>>>>>>>
>>>>>>>>>> On Mon, Nov 6, 2017 at 5:30 PM Matthias J. Sax
>>>>>>>>>> <matth...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> I just re-read the KIP.
>>>>>>>>>>
>>>>>>>>>>> One minor comment: we don't need to introduce any deprecated
>>>>>>>>>>> methods.
>>>>>>>>>>> Thus, RichValueTransformer#punctuate can be removed completely
>>>>>>>>>>> instead
>>>>>>>>>>> of introducing it as deprecated.
>>>>>>>>>>>
>>>>>>>>>>> Otherwise looks good to me.
>>>>>>>>>>>
>>>>>>>>>>> Thanks for being so patient!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 11/1/17 9:16 PM, Guozhang Wang wrote:
>>>>>>>>>>>
>>>>>>>>>>> Jeyhun,
>>>>>>>>>>>> I think I'm convinced to not do KAFKA-3907 in this KIP. We
>>>>>>>>>>>> should
>>>>>>>>>>>>
>>>>>>>>>>> think
>>>>>>>> carefully if we should add this functionality to the DSL layer
>>>>>>>>>>>> moving
>>>>>>>>>>>> forward since from what we discovered working on it the
>>>>>>>>>>>> conclusion is
>>>>>>>>>>>>
>>>>>>>>>>>> that
>>>>>>>>>>> it would require revamping the public APIs quite a lot, and it's
>>>>>>>>>>>> not
>>>>>>>>>>>>
>>>>>>>>>>>> clear
>>>>>>>>>>> if it is a good trade-off than asking users to call process()
>>>>>>>>>>> instead.
>>>>>>>> Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Nov 1, 2017 at 4:50 AM, Damian Guy
>>>>>>>>>>>> <damian....@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Jeyhun, thanks, looks good.
>>>>>>>>>>>>
>>>>>>>>>>>>> Do we need to remove the line that says:
>>>>>>>>>>>>>
>>>>>>>>>>>>>        - on-demand commit() feature
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, 31 Oct 2017 at 23:07 Jeyhun Karimov <
>>>>>>>>>>>>> je.kari...@gmail.com>
>>>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>>> I removed the 'commit()' feature, as we discussed. It
>>>>>>>>>>>>> simplified
>>>>>>>>>>>>> the
>>>>>>>> overall design of KIP a lot.
>>>>>>>>>>>>>> If it is ok, I would like to start a VOTE thread.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Oct 27, 2017 at 5:28 PM Matthias J. Sax <
>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks. I understand what you are saying, but I don't
>>>>>>>>>>>>>> agree that
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> but also we need a commit() method
>>>>>>>>>>>>>>> I would just not provide `commit()` at DSL level and
>>>>>>>>>>>>>>> close the
>>>>>>>>>>>>>>> corresponding Jira as "not a problem" or similar.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 10/27/17 3:42 PM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>> Thanks for your comments. I agree that this is not the best
>>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> to
>>>>>>>> do.
>>>>>>>>>>>>>> A
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> bit of history behind this design.
>>>>>>>>>>>>>>>> Prior doing this, I tried to provide ProcessorContext
>>>>>>>>>>>>>>>> itself
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> an
>>>>>>>> argument
>>>>>>>>>>>>>>> in Rich interfaces. However, we dont want to give users that
>>>>>>>>>>>>>>>> flexibility
>>>>>>>>>>>>>>> and “power”. Moreover, ProcessorContext contains processor
>>>>>>>>>>>>>>> level
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> information and not Record level info. The only thing we
>>>>>>>>>>>>>>>> need ij
>>>>>>>>>>>>>>>> ProcessorContext is commit() method.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So, as far as I understood, we need recor context (offset,
>>>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> etc) but also we need a commit() method ( we dont want to
>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ProcessorContext as a parameter so users can use
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ProcessorContext.commit()
>>>>>>>>>>>>>>> ).
>>>>>>>>>>>>>>>> As a result, I thought to “propagate” commit() call from
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ProcessorContext() .
>>>>>>>>>>>>>>>> If there is a misunderstanding in motvation/discussion of
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> KIP/included
>>>>>>>>>>>>>> jiras please let me know.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri 27. Oct 2017 at 12:39, Matthias J. Sax <
>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> I am personally still not convinced, that we should add
>>>>>>>>>>>>>>> `commit()`
>>>>>>>> at
>>>>>>>>>>>>>>> all.
>>>>>>>>>>>>>>> @Guozhang: you created the original Jira. Can you
>>>>>>>>>>>>>>> elaborate a
>>>>>>>>>>>>>>>>> little
>>>>>>>>>>>>>>>>> bit? Isn't requesting commits a low level API that should
>>>>>>>>>>>>>>>>> not be
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> exposed
>>>>>>>>>>>>>>> in the DSL? Just want to understand the motivation
>>>>>>>>>>>>>>> better. Why
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> would
>>>>>>>> anybody that uses the DSL ever want to request a commit? To
>>>>>>>>>>>>>>>>> me,
>>>>>>>>>>>>>>>>> requesting commits is useful if you manipulated state
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> explicitly,
>>>>>>>> ie,
>>>>>>>>>>>>>>> via Processor API.
>>>>>>>>>>>>>>> Also, for the solution: it seem rather unnatural to me,
>>>>>>>>>>>>>>>>> that we
>>>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>> `commit()` to `RecordContext` -- from my understanding,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> `RecordContext`
>>>>>>>>>>>>>>> is an helper object that provide access to record meta data.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Requesting
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> a commit is something quite different. Additionally, a
>>>>>>>>>>>>>>> commit
>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> commit a specific record but a `RecrodContext` is for a
>>>>>>>>>>>>>> specific
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> record.
>>>>>>>>>>>>>>> To me, this does not seem to be a sound API design if we
>>>>>>>>>>>>>>> follow
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> this
>>>>>>>> path.
>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 10/26/17 10:41 PM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>> Thanks for your suggestions.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I have some comments, to make sure that there is no
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> misunderstanding.
>>>>>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in
>>>>>>>>>>>>>>>>>> ProcessorContext,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> to
>>>>>>>> enforce
>>>>>>>>>>>>>>>> user to consolidate this call as
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And internal
>>>>>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` is
>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> changed
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> this call.
>>>>>>>>>>>>>>>>> - I think we should not deprecate
>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()`.
>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> main
>>>>>>>>>>>>>>>> intuition that we introduce `commit()` in
>>>>>>>>>>>>>>>> `RecordContext` is
>>>>>>>>>>>>>>> that,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> `RecordContext` is the one which is provided in Rich
>>>>>>>>>>>>>>>>> interfaces.
>>>>>>>> So
>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>> wants to commit, then there should be some method inside
>>>>>>>>>>>>>>>>>> `RecordContext`
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> do so. Internally, `RecordContext.commit()` calls
>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()`  (see the last code
>>>>>>>>>>>>>>>>>> snippet in
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> KIP-159):
>>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>>         public void process(final K1 key, final V1 value) {
>>>>>>>>>>>>>>>>>>             recordContext = new RecordContext()
>>>>>>>>>>>>>>>>>> {               //
>>>>>>>>>>>>>>>>>> recordContext initialization is added in this KIP
>>>>>>>>>>>>>>>>>>                 @Override
>>>>>>>>>>>>>>>>>>                 public void commit() {
>>>>>>>>>>>>>>>>>>                     context().commit();
>>>>>>>>>>>>>>>>>>                 }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>                 @Override
>>>>>>>>>>>>>>>>>>                 public long offset() {
>>>>>>>>>>>>>>>>>>                     return context().recordContext().offs
>>>>>>>>>>>>>>>>>> et();
>>>>>>>>>>>>>>>>>>                 }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>                 @Override
>>>>>>>>>>>>>>>>>>                 public long timestamp() {
>>>>>>>>>>>>>>>>>>                     return
>>>>>>>>>>>>>>>>>> context().recordContext().timestamp();
>>>>>>>>>>>>>>>>>>                 }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>                 @Override
>>>>>>>>>>>>>>>>>>                 public String topic() {
>>>>>>>>>>>>>>>>>>                     return context().recordContext().topi
>>>>>>>>>>>>>>>>>> c();
>>>>>>>>>>>>>>>>>>                 }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>                 @Override
>>>>>>>>>>>>>>>>>>                 public int partition() {
>>>>>>>>>>>>>>>>>>                     return
>>>>>>>>>>>>>>>>>> context().recordContext().partition();
>>>>>>>>>>>>>>>>>>                 }
>>>>>>>>>>>>>>>>>>           };
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So, we cannot deprecate `ProcessorContext.commit()` in
>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> case
>>>>>>>> IMO.
>>>>>>>>>>>>>>>>>> 2. Add the `task` reference to the impl class,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`,
>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> that it can implement the commit call itself.
>>>>>>>>>>>>>>>>>> - Actually, I don't think that we need `commit()` in
>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`. The main intuition is to
>>>>>>>>>>>>>>>>>> "transfer"
>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` call to Rich interfaces, to
>>>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>> user-specific committing.
>>>>>>>>>>>>>>>>>>      To do so, we introduce `commit()` method in
>>>>>>>>>>>>>>>>>> `RecordContext()`
>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> call ProcessorContext.commit() inside. (see the above code
>>>>>>>>>>>>>>>>>> snippet)
>>>>>>>>>>>>>>>>>> So, in Rich interfaces, we are not dealing with
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`
>>>>>>>>>>>>>>>> at all, and we leave all its methods as it is.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In this KIP, we made `RecordContext` to be the parent
>>>>>>>>>>>>>>>>>> class of
>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, just because of they share
>>>>>>>>>>>>>>>>>> quite
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> amount
>>>>>>>> of
>>>>>>>>>>>>>>>> methods and it is logical to enable inheritance between
>>>>>>>>>>>>>>>> those
>>>>>>>>>>>>>> two.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However, call to a
>>>>>>>>>>>>>>>>>> commit()
>>>>>>>>>>>>>>>> method,
>>>>>>>>>>>>>>> is valid only within RecordContext interface (at least for
>>>>>>>>>>>>>>>>> now),
>>>>>>>> we
>>>>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and the
>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>> snippet
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> below would need to be updated as well.
>>>>>>>>>>>>>>>> - I think above explanation covers this as well.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I want to gain some speed to this KIP, as it has gone
>>>>>>>>>>>>>>>>>> though
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> many
>>>>>>>> changes
>>>>>>>>>>>>>>>> based on user/developer needs, both in
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> documentation-/implementation-wise.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Oct 24, 2017 at 1:41 AM Guozhang Wang <
>>>>>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> Thanks for the information Jeyhun. I had also forgot about
>>>>>>>>>>>>>>>>>> KAFKA-3907
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> this KIP..
>>>>>>>>>>>>>>>>>>> Thinking a bit more, I'm now inclined to go with what we
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> agreed
>>>>>>>> before,
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> add the commit() call to `RecordContext`. A few minor
>>>>>>>>>>>>>>>>>> tweaks on
>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> implementation:
>>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in
>>>>>>>>>>>>>>>>>>> ProcessorContext,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> to
>>>>>>>> enforce
>>>>>>>>>>>>>>>>> user to consolidate this call as
>>>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And internal
>>>>>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` is
>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> changed
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> this call.
>>>>>>>>>>>>>>>>> 2. Add the `task` reference to the impl class,
>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, so
>>>>>>>>>>>>>>>>>> that it can implement the commit call itself.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However,
>>>>>>>>>>>>>>>>>>> call to a
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> commit()
>>>>>>>>>>>>>>>>> method,
>>>>>>>>>>>>>>> is valid only within RecordContext interface (at least for
>>>>>>>>>>>>>>>>>> now),
>>>>>>>> we
>>>>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and the
>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>> snippet
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> below would need to be updated as well.
>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Mon, Oct 23, 2017 at 1:40 PM, Matthias J. Sax <
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> Fair point. This is a long discussion and I totally forgot
>>>>>>>>>>>>>>>>>> that
>>>>>>>> we
>>>>>>>>>>>>>>>>>> discussed this.
>>>>>>>>>>>>>>> Seems I changed my opinion about including KAFKA-3907...
>>>>>>>>>>>>>>>>>>>> Happy to hear what others think.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 10/23/17 1:20 PM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>>>>> It is probably my bad, the discussion was a bit
>>>>>>>>>>>>>>>>>>>>> long in
>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> thread. I
>>>>>>>>>>>>>>>>>>> proposed the related issue in the related KIP discuss
>>>>>>>>>>>>>>>> thread
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> got
>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>> approval [2,3].
>>>>>>>>>>>>>>>>>>>>> Maybe I misunderstood.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kaf
>>>>>>>>>>>>>>>>>>>>> ka/uyzND19Asmg1GKKXT1?subj=
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kaf
>>>>>>>>>>>>>>>>>>>>> ka/uyzND1kpct22GKKXT1?subj=
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> [3]
>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1G6TGIGKKXT1?subj=
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Mon, Oct 23, 2017 at 8:44 PM Matthias J. Sax <
>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> Interesting.
>>>>>>>>>>>>>>>>>>>>>> I thought that https://issues.apache.org/
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> jira/browse/KAFKA-4125
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> main motivation for this KIP :)
>>>>>>>>>>>>>>>>>>>>> I also think, that we should not expose the full
>>>>>>>>>>>>>>>>>>>>>> ProcessorContext
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> DSL
>>>>>>>>>>>>>>>>> level.
>>>>>>>>>>>>>>>>>>>>> Thus, overall I am not even sure if we should fix
>>>>>>>>>>>>>>>>>>>>> KAFKA-3907
>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>>> all.
>>>>>>>>>>>>>>>>>>>> Manual commits are something DSL users should not worry
>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -- 
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>> one really needs this, an advanced user can still insert a
>>>>>>>>>>>>>>>>>>>>> dummy
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> `transform` to request a commit from there.
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On 10/18/17 5:39 AM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>> The main intuition is to solve [1], which is part of
>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>>>>>>> I agree with you that this might not seem
>>>>>>>>>>>>>>>>>>>>>>> semantically
>>>>>>>>>>>>>>>>>>>>>>> correct
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>> not committing record state.
>>>>>>>>>>>>>>>>>>>>>> Alternatively, we can remove commit() from
>>>>>>>>>>>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>> add
>>>>>>>>>>>>>>>>>>>>> ProcessorContext (which has commit() method) as an
>>>>>>>>>>>>>>>>>>>>> extra
>>>>>>>>>>>>>>> argument
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> Rich
>>>>>>>>>>>>>>>>> methods:
>>>>>>>>>>>>>>>>>>>>>> instead of
>>>>>>>>>>>>>>>>>>>>>>> public interface RichValueMapper<V, VR, K> {
>>>>>>>>>>>>>>>>>>>>>>>         VR apply(final V value,
>>>>>>>>>>>>>>>>>>>>>>>                  final K key,
>>>>>>>>>>>>>>>>>>>>>>>                  final RecordContext recordContext);
>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> we can adopt
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> public interface RichValueMapper<V, VR, K> {
>>>>>>>>>>>>>>>>>>>>>>>         VR apply(final V value,
>>>>>>>>>>>>>>>>>>>>>>>                  final K key,
>>>>>>>>>>>>>>>>>>>>>>>                  final RecordContext recordContext,
>>>>>>>>>>>>>>>>>>>>>>>                  final ProcessorContext
>>>>>>>>>>>>>>>>>>>>>>> processorContext);
>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> However, in this case, a user can get confused as
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> ProcessorContext
>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> RecordContext share some methods with the same name.
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 17, 2017 at 3:19 AM Guozhang Wang <
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> Regarding #6 above, I'm still not clear why we would
>>>>>>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>>>> `commit()`
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> both ProcessorContext and RecordContext, could you
>>>>>>>>>>>>>>>>>>>>> elaborate
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> bit
>>>>>>>>>>>>>>> more?
>>>>>>>>>>>>>>>>> To me `commit()` is really a processor context not a
>>>>>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>>>> logically: when you call that function, it means we
>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> commit
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>> of the whole task up to this processed record, not only
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>> single
>>>>>>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>> itself.
>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 16, 2017 at 9:19 AM, Jeyhun Karimov <
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> je.kari...@gmail.com
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> 0. RichInitializer definition seems missing.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> - Fixed.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>      I'd suggest moving the key parameter in the
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> RichValueXX
>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> RichReducer
>>>>>>>>>>>>>>>>>>>>>>> after the value parameters, as well as in the
>>>>>>>>>>>>>>>>>>>>>>> templates;
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> e.g.
>>>>>>>>>>>>>>>>>>>>>>>> public interface RichValueJoiner<V1, V2, VR, K> {
>>>>>>>>>>>>>>>         VR apply(final V1 value1, final V2 value2,
>>>>>>>>>>>>>>>>>>>>>>>>>> final K
>>>>>>>>>>>>>>>>>>>>>>>>>> key,
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> final
>>>>>>>>>>>>>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>> recordContext);
>>>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> - Fixed.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> 2. Some of the listed functions are not necessary
>>>>>>>>>>>>>>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>>>>>> pairing
>>>>>>>>>>>>>>> APIs
>>>>>>>>>>>>>>>>>>>>> are being deprecated in 1.0 already:
>>>>>>>>>>>>>>>>>>>>>>>>>> <KR> KGroupedStream<KR, V> groupBy(final
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> RichKeyValueMapper<?
>>>>>>>>>>>>>>>>>>>>>>>> super
>>>>>>>>>>>>>>> K,
>>>>>>>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>>>>>>>> super V, KR> selector,
>>>>>>>>>>>>>>>>>>>>>>>>>>                                        final
>>>>>>>>>>>>>>>>>>>>>>>>>> Serde<KR>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> keySerde,
>>>>>>>>>>>>>>>>>>>>>>>>                                        final
>>>>>>>>>>>>>>>>>>>>>>>> Serde<V>
>>>>>>>>>>>>>>> valSerde);
>>>>>>>>>>>>>>>>>>>>>>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K,
>>>>>>>>>>>>>>>>>>>>>>>> VT>
>>>>>>>>>>>>>> table,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>                                      final
>>>>>>>>>>>>>>>>>>>>>>>>> RichValueJoiner<?
>>>>>>>> super
>>>>>>>>>>>>>>>>>>>>>>>> K,
>>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>>> super
>>>>>>>>>>>>>>>>>>>>> V,
>>>>>>>>>>>>>>>>>>>>>>>>>> ? super VT, ? extends VR> joiner,
>>>>>>>>>>>>>>>>>>>>>>>>>>                                      final
>>>>>>>>>>>>>>>>>>>>>>>>>> Serde<K>
>>>>>>>>>>>>>>>>>>>>>>>>>> keySerde,
>>>>>>>>>>>>>>>>>>>>>>>>>>                                      final
>>>>>>>>>>>>>>>>>>>>>>>>>> Serde<V>
>>>>>>>>>>>>>>>>>>>>>>>>>> valSerde);
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> -Fixed
>>>>>>>>>>>>>>>>>>>>>>>>> 3. For a few functions where we are adding
>>>>>>>>>>>>>>>>>>>>>>>>> three APIs
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to