From a DSL point of view, users only see the new value on a
KTable#filter anyway. So why should it be an issue that we use
<newValue,oldValue> pair under the hood?

User sees newValue and gets corresponding RecordContext. I can't see any
issue here?

I cannot follow here:

>> Even when we have a statefull operation last. We move it to the very
>> first processor (KtableSource)
>> and therefore cant present a proper RecordContext.



With regard to `builder.table().filter()`:

I see you point that it would be good to be able to apply the filter()
first to reduce the stat store size of the table. But how is this
related to KIP-159?

Btw: with KIP-182, I am wondering if this would not be possible, by
putting a custom dummy store into the table and materialize the filter
result afterwards? It's not a nice way to do, but seems to be possible.


-Matthias

On 11/23/17 4:56 AM, Jan Filipiak wrote:
> The comment is valid. It falls exactly into this topic, it has exactly
> todo with this!
> Even when we have a statefull operation last. We move it to the very
> first processor (KtableSource)
> and therefore cant present a proper RecordContext.
> 
> Regarding the other Jiras you are referring to. They harm the project
> more than they do good!
> There is no need for this kind of optimizer and meta representation and
> what not. I hope they
> never get implemented.
> 
> Best Jan
> 
> 
> On 22.11.2017 14:44, Damian Guy wrote:
>> Jan, i think you comment with respect to filtering is valid, though
>> not for
>> this KIP. We have separate JIRAs for topology optimization of which this
>> falls into.
>>
>> Thanks,
>> Damian
>>
>> On Wed, 22 Nov 2017 at 02:25 Guozhang Wang <wangg...@gmail.com> wrote:
>>
>>> Jan,
>>>
>>> Not sure I understand your argument that "we still going to present
>>> change.oldValue to the filter even though the record context() is for
>>> change.newValue". Are you referring to `KTableFilter#process()`? If yes
>>> could you point to me which LOC are you concerning about?
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak <jan.filip...@trivago.com>
>>> wrote:
>>>
>>>> a remark of mine that got missed during migration:
>>>>
>>>> There is this problem that even though we have source.table.filter.join
>>>> the state-fullness happens at the table step not a the join step. In a
>>>> filter
>>>> we still going to present change.oldValue to the filter even though the
>>>> record context() is for change.newValue. I would go as far as applying
>>>> the filter before the table processor. Not to just get KIP-159, but
>>> because
>>>> I think its a side effect of a non ideal topology layout. If i can
>>>> filter
>>>> 99% of my
>>>> records. my state could be way smaller. Also widely escalates the
>>>> context
>>>> of the KIP
>>>>
>>>> I can only see upsides of executing the filter first.
>>>>
>>>> Best Jan
>>>>
>>>>
>>>>
>>>> On 20.11.2017 22:22, Matthias J. Sax wrote:
>>>>
>>>>> I am moving this back to the DISCUSS thread... Last 10 emails were
>>>>> sent
>>>>> to VOTE thread.
>>>>>
>>>>> Copying Guozhang's last summary below. Thanks for this summary. Very
>>>>> comprehensive!
>>>>>
>>>>> It seems, we all agree, that the current implementation of the context
>>>>> at PAPI level is ok, but we should not leak it into DSL.
>>>>>
>>>>> Thus, we can go with (2) or (3), were (3) is an extension to (2)
>>>>> carrying the context to more operators than just sources. It also
>>>>> seems,
>>>>> that we all agree, that many-to-one operations void the context.
>>>>>
>>>>> I still think, that just going with plain (2) is too restrictive --
>>>>> but
>>>>> I am also fine if we don't go with the full proposal of (3).
>>>>>
>>>>> Also note, that the two operators filter() and filterNot() don't
>>>>> modify
>>>>> the record and thus for both, it would be absolutely valid to keep the
>>>>> context.
>>>>>
>>>>> I personally would keep the context for at least all one-to-one
>>>>> operators. One-to-many is debatable and I am fine to not carry the
>>>>> context further: at least the offset information is questionable for
>>>>> this case -- note thought, that semantically, the timestamp is
>>>>> inherited
>>>>> via one-to-many, and I also think this applies to "topic" and
>>>>> "partition". Thus, I think it's still valuable information we can
>>>>> carry
>>>>> downstreams.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> Jan: which approach are you referring to as "the approach that is
>>>>> on the
>>>>>> table would be perfect"?
>>>>>>
>>>>>> Note that in today's PAPI layer we are already effectively
>>>>>> exposing the
>>>>>> record context which has the issues that we have been discussing
>>>>>> right
>>>>>> now,
>>>>>> and its semantics is always referring to the "processing record" at
>>> hand.
>>>>>> More specifically, we can think of processing a record a bit
>>>>>> different:
>>>>>>
>>>>>> 1) the record traversed the topology from source to sink, it may be
>>>>>> transformed into new object or even generate multiple new objects
>>> (think:
>>>>>> branch) along the traversal. And the record context is referring to
>>> this
>>>>>> processing record. Here the "lifetime" of the record lasts for the
>>> entire
>>>>>> topology traversal and any new records of this traversal is
>>>>>> treated as
>>>>>> different transformed values of this record (this applies to join and
>>>>>> aggregations as well).
>>>>>>
>>>>>> 2) the record being processed is wiped out in the first operator
>>>>>> after
>>>>>> the
>>>>>> source, and NEW records are forwarded to downstream operators. I.e.
>>> each
>>>>>> record only lives between two adjacent operators, once it reached the
>>> new
>>>>>> operator it's lifetime has ended and new records are generated.
>>>>>>
>>>>>> I think in the past we have talked about Streams under both context,
>>> and
>>>>>> we
>>>>>> do not have a clear agreement. I agree that 2) is logically more
>>>>>> understandable for users as it does not leak any internal
>>> implementation
>>>>>> details (e.g. for stream-table joins, table record's traversal
>>>>>> ends at
>>>>>> the
>>>>>> join operator as it is only be materialized, while stream record's
>>>>>> traversal goes through the join operator to further down until
>>>>>> sinks).
>>>>>> However if we are going to interpret following 2) above then even for
>>>>>> non-stateful operators we would not inherit record context. What
>>>>>> we're
>>>>>> discussing now, seems to infer a third semantics:
>>>>>>
>>>>>> 3) a record would traverse "through" one-to-one (non-stateful)
>>> operators,
>>>>>> will "replicate" at one-to-many (non-stateful) operators (think:
>>>>>> "mapValues"
>>>>>>    ) and will "end" at many-to-one (stateful) operators where NEW
>>> records
>>>>>> will be generated and forwarded to the downstream operators.
>>>>>>
>>>>>> Just wanted to lay the ground for discussions so we are all on the
>>>>>> same
>>>>>> page before chatting more.
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>
>>>>> On 11/6/17 1:41 PM, Jeyhun Karimov wrote:
>>>>>
>>>>>> Hi Matthias,
>>>>>>
>>>>>> Thanks a lot for correcting. It is a leftover from the past designs
>>> when
>>>>>> punctuate() was not deprecated.
>>>>>> I corrected.
>>>>>>
>>>>>> Cheers,
>>>>>> Jeyhun
>>>>>>
>>>>>> On Mon, Nov 6, 2017 at 5:30 PM Matthias J. Sax
>>>>>> <matth...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>> I just re-read the KIP.
>>>>>>> One minor comment: we don't need to introduce any deprecated
>>>>>>> methods.
>>>>>>> Thus, RichValueTransformer#punctuate can be removed completely
>>>>>>> instead
>>>>>>> of introducing it as deprecated.
>>>>>>>
>>>>>>> Otherwise looks good to me.
>>>>>>>
>>>>>>> Thanks for being so patient!
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 11/1/17 9:16 PM, Guozhang Wang wrote:
>>>>>>>
>>>>>>>> Jeyhun,
>>>>>>>>
>>>>>>>> I think I'm convinced to not do KAFKA-3907 in this KIP. We should
>>> think
>>>>>>>> carefully if we should add this functionality to the DSL layer
>>>>>>>> moving
>>>>>>>> forward since from what we discovered working on it the
>>>>>>>> conclusion is
>>>>>>>>
>>>>>>> that
>>>>>>>
>>>>>>>> it would require revamping the public APIs quite a lot, and it's
>>>>>>>> not
>>>>>>>>
>>>>>>> clear
>>>>>>>
>>>>>>>> if it is a good trade-off than asking users to call process()
>>> instead.
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Nov 1, 2017 at 4:50 AM, Damian Guy <damian....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Jeyhun, thanks, looks good.
>>>>>>>>> Do we need to remove the line that says:
>>>>>>>>>
>>>>>>>>>      - on-demand commit() feature
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Damian
>>>>>>>>>
>>>>>>>>> On Tue, 31 Oct 2017 at 23:07 Jeyhun Karimov <je.kari...@gmail.com>
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>>> Hi,
>>>>>>>>>> I removed the 'commit()' feature, as we discussed. It simplified
>>> the
>>>>>>>>>> overall design of KIP a lot.
>>>>>>>>>> If it is ok, I would like to start a VOTE thread.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Jeyhun
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 27, 2017 at 5:28 PM Matthias J. Sax <
>>>>>>>>>> matth...@confluent.io
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks. I understand what you are saying, but I don't agree that
>>>>>>>>>>> but also we need a commit() method
>>>>>>>>>>> I would just not provide `commit()` at DSL level and close the
>>>>>>>>>>> corresponding Jira as "not a problem" or similar.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 10/27/17 3:42 PM, Jeyhun Karimov wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your comments. I agree that this is not the best way
>>> to
>>>>>>>>>>> do.
>>>>>>>>>> A
>>>>>>>>>>
>>>>>>>>>>> bit of history behind this design.
>>>>>>>>>>>> Prior doing this, I tried to provide ProcessorContext itself as
>>> an
>>>>>>>>>>> argument
>>>>>>>>>>>
>>>>>>>>>>>> in Rich interfaces. However, we dont want to give users that
>>>>>>>>>>>>
>>>>>>>>>>> flexibility
>>>>>>>>>>> and “power”. Moreover, ProcessorContext contains processor level
>>>>>>>>>>>> information and not Record level info. The only thing we
>>>>>>>>>>>> need ij
>>>>>>>>>>>> ProcessorContext is commit() method.
>>>>>>>>>>>>
>>>>>>>>>>>> So, as far as I understood, we need recor context (offset,
>>>>>>>>>>>> timestamp
>>>>>>>>>>>>
>>>>>>>>>>> and
>>>>>>>>>>> etc) but also we need a commit() method ( we dont want to
>>>>>>>>>>> provide
>>>>>>>>>>>> ProcessorContext as a parameter so users can use
>>>>>>>>>>>>
>>>>>>>>>>> ProcessorContext.commit()
>>>>>>>>>>>
>>>>>>>>>>>> ).
>>>>>>>>>>>>
>>>>>>>>>>>> As a result, I thought to “propagate” commit() call from
>>>>>>>>>>>>
>>>>>>>>>>> RecordContext
>>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> ProcessorContext() .
>>>>>>>>>>>>
>>>>>>>>>>>> If there is a misunderstanding in motvation/discussion of
>>>>>>>>>>>>
>>>>>>>>>>> KIP/included
>>>>>>>>>> jiras please let me know.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri 27. Oct 2017 at 12:39, Matthias J. Sax <
>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I am personally still not convinced, that we should add
>>> `commit()`
>>>>>>>>>>>> at
>>>>>>>>>> all.
>>>>>>>>>>>> @Guozhang: you created the original Jira. Can you elaborate a
>>>>>>>>>>>>> little
>>>>>>>>>>>>> bit? Isn't requesting commits a low level API that should
>>>>>>>>>>>>> not be
>>>>>>>>>>>>>
>>>>>>>>>>>> exposed
>>>>>>>>>>> in the DSL? Just want to understand the motivation better. Why
>>> would
>>>>>>>>>>>>> anybody that uses the DSL ever want to request a commit? To
>>>>>>>>>>>>> me,
>>>>>>>>>>>>> requesting commits is useful if you manipulated state
>>> explicitly,
>>>>>>>>>>>> ie,
>>>>>>>>>> via Processor API.
>>>>>>>>>>>>> Also, for the solution: it seem rather unnatural to me,
>>>>>>>>>>>>> that we
>>>>>>>>>>>>> add
>>>>>>>>>>>>> `commit()` to `RecordContext` -- from my understanding,
>>>>>>>>>>>>>
>>>>>>>>>>>> `RecordContext`
>>>>>>>>>>> is an helper object that provide access to record meta data.
>>>>>>>>>>>> Requesting
>>>>>>>>>>> a commit is something quite different. Additionally, a commit
>>>>>>>>>>> does
>>>>>>>>>>>> not
>>>>>>>>>> commit a specific record but a `RecrodContext` is for a specific
>>>>>>>>>>>> record.
>>>>>>>>>>> To me, this does not seem to be a sound API design if we follow
>>> this
>>>>>>>>>>>> path.
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 10/26/17 10:41 PM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your suggestions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have some comments, to make sure that there is no
>>>>>>>>>>>>>>
>>>>>>>>>>>>> misunderstanding.
>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in ProcessorContext,
>>> to
>>>>>>>>>>>>> enforce
>>>>>>>>>>>> user to consolidate this call as
>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And internal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> implementation
>>>>>>>>>>>> of
>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` is
>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> changed
>>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>>> this call.
>>>>>>>>>>>>>> - I think we should not deprecate
>>>>>>>>>>>>>> `ProcessorContext.commit()`.
>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>
>>>>>>>>>>>>> main
>>>>>>>>>>> intuition that we introduce `commit()` in `RecordContext` is
>>>>>>>>>>> that,
>>>>>>>>>>>>>> `RecordContext` is the one which is provided in Rich
>>> interfaces.
>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>
>>>>>>>>>>>>> if
>>>>>>>>>>> user
>>>>>>>>>>>>>> wants to commit, then there should be some method inside
>>>>>>>>>>>>>>
>>>>>>>>>>>>> `RecordContext`
>>>>>>>>>>>> to
>>>>>>>>>>>>>> do so. Internally, `RecordContext.commit()` calls
>>>>>>>>>>>>>> `ProcessorContext.commit()`  (see the last code snippet in
>>>>>>>>>>>>>>
>>>>>>>>>>>>> KIP-159):
>>>>>>>>>> @Override
>>>>>>>>>>>>>>       public void process(final K1 key, final V1 value) {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>           recordContext = new RecordContext()
>>>>>>>>>>>>>> {               //
>>>>>>>>>>>>>> recordContext initialization is added in this KIP
>>>>>>>>>>>>>>               @Override
>>>>>>>>>>>>>>               public void commit() {
>>>>>>>>>>>>>>                   context().commit();
>>>>>>>>>>>>>>               }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>               @Override
>>>>>>>>>>>>>>               public long offset() {
>>>>>>>>>>>>>>                   return context().recordContext().offset();
>>>>>>>>>>>>>>               }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>               @Override
>>>>>>>>>>>>>>               public long timestamp() {
>>>>>>>>>>>>>>                   return
>>>>>>>>>>>>>> context().recordContext().timestamp();
>>>>>>>>>>>>>>               }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>               @Override
>>>>>>>>>>>>>>               public String topic() {
>>>>>>>>>>>>>>                   return context().recordContext().topic();
>>>>>>>>>>>>>>               }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>               @Override
>>>>>>>>>>>>>>               public int partition() {
>>>>>>>>>>>>>>                   return
>>>>>>>>>>>>>> context().recordContext().partition();
>>>>>>>>>>>>>>               }
>>>>>>>>>>>>>>         };
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So, we cannot deprecate `ProcessorContext.commit()` in this
>>> case
>>>>>>>>>>>>> IMO.
>>>>>>>>>>>>>> 2. Add the `task` reference to the impl class,
>>>>>>>>>>>>>>
>>>>>>>>>>>>> `ProcessorRecordContext`,
>>>>>>>>>>>> so
>>>>>>>>>>>>>> that it can implement the commit call itself.
>>>>>>>>>>>>>> - Actually, I don't think that we need `commit()` in
>>>>>>>>>>>>>> `ProcessorRecordContext`. The main intuition is to "transfer"
>>>>>>>>>>>>>> `ProcessorContext.commit()` call to Rich interfaces, to
>>>>>>>>>>>>>> support
>>>>>>>>>>>>>> user-specific committing.
>>>>>>>>>>>>>>    To do so, we introduce `commit()` method in
>>>>>>>>>>>>>> `RecordContext()`
>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>
>>>>>>>>>>>>> only
>>>>>>>>>>>> to
>>>>>>>>>>>>>> call ProcessorContext.commit() inside. (see the above code
>>>>>>>>>>>>>> snippet)
>>>>>>>>>>>>>> So, in Rich interfaces, we are not dealing with
>>>>>>>>>>>>>>
>>>>>>>>>>>>> `ProcessorRecordContext`
>>>>>>>>>>>> at all, and we leave all its methods as it is.
>>>>>>>>>>>>>> In this KIP, we made `RecordContext` to be the parent
>>>>>>>>>>>>>> class of
>>>>>>>>>>>>>> `ProcessorRecordContext`, just because of they share quite
>>> amount
>>>>>>>>>>>>> of
>>>>>>>>>> methods and it is logical to enable inheritance between those
>>>>>>>>>> two.
>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However, call to a
>>>>>>>>>>>>>>
>>>>>>>>>>>>> commit()
>>>>>>>>>> method,
>>>>>>>>>>>>>> is valid only within RecordContext interface (at least for
>>> now),
>>>>>>>>>>>>>> we
>>>>>>>>>> throw
>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and the
>>>>>>>>>>>>>> code
>>>>>>>>>>>>>> snippet
>>>>>>>>>>> below would need to be updated as well.
>>>>>>>>>>>>>> - I think above explanation covers this as well.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I want to gain some speed to this KIP, as it has gone though
>>> many
>>>>>>>>>>>>> changes
>>>>>>>>>>>> based on user/developer needs, both in
>>>>>>>>>>>>> documentation-/implementation-wise.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Oct 24, 2017 at 1:41 AM Guozhang Wang <
>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the information Jeyhun. I had also forgot about
>>>>>>>>>>>>>> KAFKA-3907
>>>>>>>>>>> with
>>>>>>>>>>>>>> this KIP..
>>>>>>>>>>>>>>> Thinking a bit more, I'm now inclined to go with what we
>>> agreed
>>>>>>>>>>>>>> before,
>>>>>>>>>>>> to
>>>>>>>>>>>>>> add the commit() call to `RecordContext`. A few minor
>>>>>>>>>>>>>> tweaks on
>>>>>>>>>>>>>> its
>>>>>>>>>> implementation:
>>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in
>>>>>>>>>>>>>>> ProcessorContext,
>>> to
>>>>>>>>>>>>>> enforce
>>>>>>>>>>>> user to consolidate this call as
>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And internal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> implementation
>>>>>>>>>>>> of
>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` is
>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> changed
>>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>>> this call.
>>>>>>>>>>>>>>> 2. Add the `task` reference to the impl class,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> `ProcessorRecordContext`, so
>>>>>>>>>>>>>> that it can implement the commit call itself.
>>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However, call to a
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> commit()
>>>>>>>>>> method,
>>>>>>>>>>>>>>> is valid only within RecordContext interface (at least for
>>> now),
>>>>>>>>>>>>>> we
>>>>>>>>>> throw
>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and the
>>>>>>>>>>>>>> code
>>>>>>>>>>>>>> snippet
>>>>>>>>>>> below would need to be updated as well.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Oct 23, 2017 at 1:40 PM, Matthias J. Sax <
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Fair point. This is a long discussion and I totally forgot
>>> that
>>>>>>>>>>>>>>> we
>>>>>>>>>> discussed this.
>>>>>>>>>>>>>>>> Seems I changed my opinion about including KAFKA-3907...
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Happy to hear what others think.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 10/23/17 1:20 PM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It is probably my bad, the discussion was a bit long in
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> thread. I
>>>>>>>>>>>> proposed the related issue in the related KIP discuss thread
>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>> and
>>>>>>>>>>> got
>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> approval [2,3].
>>>>>>>>>>>>>>>>> Maybe I misunderstood.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND19Asmg1GKKXT1?subj=
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1kpct22GKKXT1?subj=
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [3]
>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1G6TGIGKKXT1?subj=
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Mon, Oct 23, 2017 at 8:44 PM Matthias J. Sax <
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> Interesting.
>>>>>>>>>>>>>>>>>> I thought that https://issues.apache.org/
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> jira/browse/KAFKA-4125
>>>>>>>>>> is
>>>>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>>>>>>>> main motivation for this KIP :)
>>>>>>>>>>>>>>>>>> I also think, that we should not expose the full
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ProcessorContext
>>>>>>>>>> at
>>>>>>>>>>>> DSL
>>>>>>>>>>>>>>>> level.
>>>>>>>>>>>>>>>>>> Thus, overall I am not even sure if we should fix
>>> KAFKA-3907
>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> all.
>>>>>>>>>>>> Manual commits are something DSL users should not worry about
>>>>>>>>>>>>>>>>> -- 
>>>>>>>>>> and
>>>>>>>>>>>> if
>>>>>>>>>>>>>>>> one really needs this, an advanced user can still insert a
>>>>>>>>>>>>>>>>> dummy
>>>>>>>>>> `transform` to request a commit from there.
>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 10/18/17 5:39 AM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The main intuition is to solve [1], which is part of
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>>> I agree with you that this might not seem semantically
>>>>>>>>>>>>>>>>>>> correct
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>> we
>>>>>>>>>>>
>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>> not committing record state.
>>>>>>>>>>>>>>>>>>> Alternatively, we can remove commit() from RecordContext
>>> and
>>>>>>>>>>>>>>>>>> add
>>>>>>>>>> ProcessorContext (which has commit() method) as an extra
>>>>>>>>>>>>>>>>>> argument
>>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>>> Rich
>>>>>>>>>>>>>>>>> methods:
>>>>>>>>>>>>>>>>>>> instead of
>>>>>>>>>>>>>>>>>>> public interface RichValueMapper<V, VR, K> {
>>>>>>>>>>>>>>>>>>>       VR apply(final V value,
>>>>>>>>>>>>>>>>>>>                final K key,
>>>>>>>>>>>>>>>>>>>                final RecordContext recordContext);
>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> we can adopt
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> public interface RichValueMapper<V, VR, K> {
>>>>>>>>>>>>>>>>>>>       VR apply(final V value,
>>>>>>>>>>>>>>>>>>>                final K key,
>>>>>>>>>>>>>>>>>>>                final RecordContext recordContext,
>>>>>>>>>>>>>>>>>>>                final ProcessorContext processorContext);
>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> However, in this case, a user can get confused as
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ProcessorContext
>>>>>>>>>>> and
>>>>>>>>>>>>>>>> RecordContext share some methods with the same name.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Oct 17, 2017 at 3:19 AM Guozhang Wang <
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wangg...@gmail.com
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> Regarding #6 above, I'm still not clear why we would
>>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>> `commit()`
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> both ProcessorContext and RecordContext, could you
>>>>>>>>>>>>>>>>> elaborate
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>> bit
>>>>>>>>>>>> more?
>>>>>>>>>>>>>>>>> To me `commit()` is really a processor context not a
>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>> logically: when you call that function, it means we would
>>>>>>>>>>>>>>>>>>> commit
>>>>>>>>>>> the
>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>> of the whole task up to this processed record, not only
>>> that
>>>>>>>>>>>>>>>>>>> single
>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>> itself.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Mon, Oct 16, 2017 at 9:19 AM, Jeyhun Karimov <
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> je.kari...@gmail.com
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 0. RichInitializer definition seems missing.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - Fixed.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>    I'd suggest moving the key parameter in the
>>> RichValueXX
>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> RichReducer
>>>>>>>>>>>>>>>>>>> after the value parameters, as well as in the templates;
>>>>>>>>>>>>>>>>>>>>> e.g.
>>>>>>>>>> public interface RichValueJoiner<V1, V2, VR, K> {
>>>>>>>>>>>>>>>>>>>>>>       VR apply(final V1 value1, final V2 value2,
>>>>>>>>>>>>>>>>>>>>>> final K
>>>>>>>>>>>>>>>>>>>>>> key,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> final
>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>>>>>> recordContext);
>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - Fixed.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 2. Some of the listed functions are not necessary
>>>>>>>>>>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> their
>>>>>>>>>> pairing
>>>>>>>>>>>>>>>> APIs
>>>>>>>>>>>>>>>>>>>>> are being deprecated in 1.0 already:
>>>>>>>>>>>>>>>>>>>>>> <KR> KGroupedStream<KR, V> groupBy(final
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> RichKeyValueMapper<?
>>>>>>>>>> super
>>>>>>>>>>>>>>>> K,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>>>>>>> super V, KR> selector,
>>>>>>>>>>>>>>>>>>>>>>                                      final Serde<KR>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> keySerde,
>>>>>>>>>>                                      final Serde<V>
>>>>>>>>>>>>>>>>>>>>> valSerde);
>>>>>>>>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
>>>>>>>>>>>>>>>>>>>>>>                                    final
>>> RichValueJoiner<?
>>>>>>>>>>>>>>>>>>>>> super
>>>>>>>>>>> K,
>>>>>>>>>>>
>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>> super
>>>>>>>>>>>>>>>>>>>>> V,
>>>>>>>>>>>>>>>>>>>>>> ? super VT, ? extends VR> joiner,
>>>>>>>>>>>>>>>>>>>>>>                                    final Serde<K>
>>>>>>>>>>>>>>>>>>>>>> keySerde,
>>>>>>>>>>>>>>>>>>>>>>                                    final Serde<V>
>>>>>>>>>>>>>>>>>>>>>> valSerde);
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> -Fixed
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 3. For a few functions where we are adding three APIs
>>> for
>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> combo
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>> mapper / joiner, or both initializer / aggregator, or
>>>>>>>>>>>>>>>>>>>>> adder /
>>>>>>>>>> subtractor,
>>>>>>>>>>>>>>>>>>>>> I'm wondering if we can just keep one that use "rich"
>>>>>>>>>>>>>>>>>>>>> functions
>>>>>>>>>>> for
>>>>>>>>>>>>>>>> both;
>>>>>>>>>>>>>>>>>>>>> so that we can have less overloads and let users who
>>> only
>>>>>>>>>>>>>>>>>>>>> want
>>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>>> access
>>>>>>>>>>>>>>>>>>>>> one of them to just use dummy parameter declarations.
>>> For
>>>>>>>>>>>>>>>>>>>>> example:
>>>>>>>>>>>>>> <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV>
>>>>>>>>>>>>>>>>>>>>> globalKTable,
>>>>>>>>>>>>>>>>>>>>>                                    final
>>>>>>>>>>>>>>>>>>>>>> RichKeyValueMapper<?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> super
>>>>>>>>>>>> K, ?
>>>>>>>>>>>>>>>>> super
>>>>>>>>>>>>>>>>>>>>>>    V, ? extends GK> keyValueMapper,
>>>>>>>>>>>>>>>>>>>>>>                                    final
>>> RichValueJoiner<?
>>>>>>>>>>>>>>>>>>>>> super
>>>>>>>>>>> K,
>>>>>>>>>>>
>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>> super
>>>>>>>>>>>>>>>>>>>>> V,
>>>>>>>>>>>>>>>>>>>>>> ? super GV, ? extends RV> joiner);
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> -Agreed. Fixed.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 4. For TimeWindowedKStream, I'm wondering why we do
>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>> Initializer also "rich" functions? I.e.
>>>>>>>>>>>>>>>>>>>>> - It was a typo. Fixed.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 5. We need to move "RecordContext" from
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> o.a.k.processor.internals
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> o.a.k.processor.
>>>>>>>>>>>>>>>>>>>>>> 6. I'm not clear why we want to move `commit()` from
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> ProcessorContext
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> RecordContext?
>>>>>>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>>> Because it makes sense logically and  to reduce code
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> maintenance
>>>>>>>>>>> (both
>>>>>>>>>>>>>>>>> interfaces have offset() timestamp() topic() partition()
>>>>>>>>>>>>>>>>>>>> methods),  I
>>>>>>>>>>>>>>>> inherit ProcessorContext from RecordContext.
>>>>>>>>>>>>>>>>>>>>> Since we need commit() method both in ProcessorContext
>>> and
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>>>>> I move commit() method to parent class
>>>>>>>>>>>>>>>>>>>>> (RecordContext).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Wed, Oct 11, 2017 at 12:59 AM, Guozhang Wang <
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> Jeyhun,
>>>>>>>>>>>>>>>>>>>>>> Thanks for the updated KIP, here are my comments.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 0. RichInitializer definition seems missing.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 1. I'd suggest moving the key parameter in the
>>>>>>>>>>>>>>>>>>>>>> RichValueXX
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>> RichReducer
>>>>>>>>>>>>>>>>>>>>>> after the value parameters, as well as in the
>>> templates;
>>>>>>>>>>>>>>>>>>>>> e.g.
>>>>>>>>>> public interface RichValueJoiner<V1, V2, VR, K> {
>>>>>>>>>>>>>>>>>>>>>>       VR apply(final V1 value1, final V2 value2,
>>>>>>>>>>>>>>>>>>>>>> final K
>>>>>>>>>>>>>>>>>>>>>> key,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> final
>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>>>>>> recordContext);
>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> My motivation is that for lambda expression in J8,
>>> users
>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>> would
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>> care about the key but only the context, or vice
>>>>>>>>>>>>>>>>>>> versa, is
>>>>>>>>>>>>>>>>>>>>> likely
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>>>>>>> it as (value1, value2, dummy, context) -> ... than
>>>>>>>>>>>>>>>>>>>>>> putting
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>> dummy
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> beginning of the parameter list. Generally speaking
>>> we'd
>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>> to
>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>
>>>
>>> -- 
>>> -- Guozhang
>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to