I think this is good idea. 

> On Jan 31, 2020, at 4:49 PM, Thomas Becker <thomas.bec...@tivo.com> wrote:
> 
> How do folks feel about allowing the mechanism by which no-ops are detected 
> to be pluggable? Meaning use something like a hash by default, but you could 
> optionally provide an implementation of something to use instead, like a 
> ChangeDetector. This could be useful for example to ignore changes to certain 
> fields, which may not be relevant to the operation being performed.
> ________________________________
> From: John Roesler <vvcep...@apache.org>
> Sent: Friday, January 31, 2020 4:51 PM
> To: dev@kafka.apache.org <dev@kafka.apache.org>
> Subject: Re: [KAFKA-557] Add emit on change support for Kafka Streams
> 
> [EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT 
> CLICK any links or attachments unless you expected them.
> ________________________________
> 
> 
> Hello all,
> 
> Sorry for my silence. It seems like we are getting close to consensus.
> Hopefully, we could move to a vote soon!
> 
> All of the reasoning from Matthias and Bruno around timestamp is compelling. I
> would be strongly in favor of stating a few things very clearly in the KIP:
> 1. Streams will drop no-op updates only for KTable operations.
> 
>   That is, we won't make any changes to KStream aggregations at the moment. It
>   does seem like we can potentially revisit the time semantics of that 
> operation
>   in the future, but we don't need to do it now.
> 
>   On the other hand, the proposed semantics for KTable timestamps (marking the
>   beginning of the validity of that record) makes sense to me.
> 
> 2. Streams will only drop no-op updates for _stateful_ KTable operations.
> 
>   We don't want to add a hard guarantee that Streams will _never_ emit a no-op
>   table update because it would require adding state to otherwise stateless
>   operations. If someone is really concerned about a particular stateless
>   operation producing a lot of no-op results, all they have to do is
>   materialize it, and Streams would automatically drop the no-ops.
> 
> Additionally, I'm +1 on not adding an opt-out at this time.
> 
> Regarding the KIP itself, I would clean it up a bit before calling for a vote.
> There is a lot of "discussion"-type language there, which is very natural to
> read, but makes it a bit hard to see what _exactly_ the kip is proposing.
> 
> Richard, would you mind just making the "proposed behavior change" a simple 
> and
> succinct list of bullet points? I.e., please drop glue phrases like "there has
> been some discussion" or "possibly we could do X". For the final version of 
> the
> KIP, it should just say, "Streams will do X, Streams will do Y". Feel free to
> add an elaboration section to explain more about what X and Y mean, but we 
> don't
> need to talk about possibilities or alternatives except in the "rejected
> alternatives" section.
> 
> Accordingly, can you also move the options you presented in the intro to the
> "rejected alternatives" section and only mention the final proposal itself?
> 
> This just really helps reviewers to know what they are voting for, and it 
> helps
> everyone after the fact when they are trying to get clarity on what exactly 
> the
> proposal is, versus all the things it could have been.
> 
> Thanks,
> -John
> 
> 
>> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote:
>> Hello to all,
>> 
>> I've finished making some initial modifications to the KIP.
>> I have decided to keep the implementation section in the KIP for
>> record-keeping purposes.
>> 
>> For now, we should focus on only the proposed behavior changes instead.
>> 
>> See if you have any comments!
>> 
>> Cheers,
>> Richard
>> 
>> On Sat, Jan 25, 2020 at 11:12 AM Richard Yu <yohan.richard...@gmail.com>
>> wrote:
>> 
>>> Hi all,
>>> 
>>> Thanks for all the discussion!
>>> 
>>> @John and @Bruno I will survey other possible systems and see what I can
>>> do.
>>> Just a question, by systems, I suppose you would mean the pros and cons of
>>> different reporting strategies?
>>> 
>>> I'm not completely certain on this point, so it would be great if you can
>>> clarify on that.
>>> 
>>> So here's what I got from all the discussion so far:
>>> 
>>>   - Since both Matthias and John seems to have come to a consensus on
>>>   this, then we will go for an all-round behavorial change for KTables. 
>>> After
>>>   some thought, I decided that for now, an opt-out config will not be added.
>>>   As John have pointed out, no-op changes tend to explode further down the
>>>   topology as they are forwarded to more and more processor nodes 
>>> downstream.
>>>   - About using hash codes, after some explanation from John, it looks
>>>   like hash codes might not be as ideal (for implementation). For now, we
>>>   will omit that detail, and save it for the PR.
>>>   - @Bruno You do have valid concerns. Though, I am not completely
>>>   certain if we want to do emit-on-change only for materialized KTables. I
>>>   will put it down in the KIP regardless.
>>> 
>>> I will do my best to address all points raised so far on the discussion.
>>> Hope we could keep this going!
>>> 
>>> Best,
>>> Richard
>>> 
>>>> On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna <br...@confluent.io> wrote:
>>> 
>>>> Thank you Matthias for the use cases!
>>>> 
>>>> Looking at both use cases, I think you need to elaborate on them in
>>>> the KIP, Richard.
>>>> 
>>>> Emit from plain KTable:
>>>> I agree with Matthias that the lower timestamp makes sense because it
>>>> marks the start of the validity of the record. Idempotent records with
>>>> a higher timestamp can be safely ignored. A corner case that I
>>>> discussed with Matthias offline is when we do not materialize a KTable
>>>> due to optimization. Then we cannot avoid the idempotent records
>>>> because we do not keep the first record with the lower timestamp to
>>>> compare to.
>>>> 
>>>> Emit from KTable with aggregations:
>>>> If we specify that an aggregation result should have the highest
>>>> timestamp of the records that participated in the aggregation, we
>>>> cannot ignore any idempotent records. Admittedly, the result of an
>>>> aggregation usually changes, but there are aggregations where the
>>>> result may not change like min and max, or sum when the incoming
>>>> records have a value of zero. In those cases, we could benefit of the
>>>> emit on change, but only if we define the semantics of the
>>>> aggregations to not use the highest timestamp of the participating
>>>> records for the result. In Kafka Streams, we do not have min, max, and
>>>> sum as explicit aggregations, but we need to provide an API to define
>>>> what timestamp should be used for the result of an aggregation if we
>>>> want to go down this path.
>>>> 
>>>> All of this does not block this KIP and I just wanted to put this
>>>> aspects up for discussion. The KIP can limit itself to emit from
>>>> materialized KTables. However, the limits should be explicitly stated
>>>> in the KIP.
>>>> 
>>>> Best,
>>>> Bruno
>>>> 
>>>> 
>>>> 
>>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>> 
>>>>> IMHO, the question about semantics depends on the use case, in
>>>>> particular on the origin of a KTable.
>>>>> 
>>>>> If there is a changlog topic that one reads directly into a KTable,
>>>>> emit-on-change does actually make sense, because the timestamp indicates
>>>>> _when_ the update was _effective_. For this case, it is semantically
>>>>> sound to _not_ update the timestamp in the store, because the second
>>>>> update is actually idempotent and advancing the timestamp is not ideal
>>>>> (one could even consider it to be wrong to advance the timestamp)
>>>>> because the "valid time" of the record pair did not change.
>>>>> 
>>>>> This reasoning also applies to KTable-KTable joins.
>>>>> 
>>>>> However, if the KTable is the result of an aggregation, I think
>>>>> emit-on-update is more natural, because the timestamp reflects the
>>>>> _last_ time (ie, highest timestamp) of all input records the contributed
>>>>> to the result. Hence, updating the timestamp and emitting a new record
>>>>> actually sounds correct to me. This applies to windowed and non-windowed
>>>>> aggregations IMHO.
>>>>> 
>>>>> However, considering the argument that the timestamp should not be
>>>>> update in the first case in the store to begin with, both cases are
>>>>> actually the same, and both can be modeled as emit-on-change: if a
>>>>> `table()` operator does not update the timestamp if the value does not
>>>>> change, there is _no_ change and thus nothing is emitted. At the same
>>>>> time, if an aggregation operator does update the timestamp (even if the
>>>>> value does not change) there _is_ a change and we emit.
>>>>> 
>>>>> Note that handling out-of-order data for aggregations would also work
>>>>> seamlessly with this approach -- for out-of-order records, the timestamp
>>>>> does never change, and thus, we only emit if the result itself changes.
>>>>> 
>>>>> Therefore, I would argue that we might not even need any config, because
>>>>> the emit-on-change behavior is just correct and reduced the downstream
>>>>> load, while our current behavior is not ideal (even if it's also
>>>> correct).
>>>>> 
>>>>> Thoughts?
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 1/24/20 9:37 AM, John Roesler wrote:
>>>>>> Hi Bruno,
>>>>>> 
>>>>>> Thanks for that idea. I hadn't considered that
>>>>>> option before, and it does seem like that would be
>>>>>> the right place to put it if we think it might be
>>>>>> semantically important to control on a
>>>>>> table-by-table basis.
>>>>>> 
>>>>>> I had been thinking of it less semantically and
>>>>>> more practically. In the context of a large
>>>>>> topology, or more generally, a large software
>>>>>> system that contains many topologies and other
>>>>>> event-driven systems, each no-op result becomes an
>>>>>> input that is destined to itself become a no-op
>>>>>> result, and so on, all the way through the system.
>>>>>> Thus, a single pointless processing result becomes
>>>>>> amplified into a large number of pointless
>>>>>> computations, cache perturbations, and network
>>>>>> and disk I/O operations. If you also consider
>>>>>> operations with fan-out implications, like
>>>>>> branching or foreign-key joins, the wasted
>>>>>> resources are amplified not just in proportion to
>>>>>> the size of the system, but the size of the system
>>>>>> times the average fan-out (to the power of the
>>>>>> number of fan-out operations on the path(s)
>>>>>> through the system).
>>>>>> 
>>>>>> In my time operating such systems, I've observed
>>>>>> these effects to be very real, and actually, the
>>>>>> system and use case doesn't have to be very large
>>>>>> before the amplification poses an existential
>>>>>> threat to the system as a whole.
>>>>>> 
>>>>>> This is the basis of my advocating for a simple
>>>>>> behavior change, rather than an opt-in config of
>>>>>> any kind. It seems like Streams should "do the
>>>>>> right thing" for the majority use case. My theory
>>>>>> (which may be wrong) is that the majority use case
>>>>>> is more like "relational queries" than "CEP
>>>>>> queries". Even if you were doing some
>>>>>> event-sensitive computation, wouldn't you do them
>>>>>> as Stream operations (where this feature is
>>>>>> inapplicable anyway)?
>>>>>> 
>>>>>> In keeping with the "practical" perspective, I
>>>>>> suggested the opt-out config only in the (I think
>>>>>> unlikely) event that filtering out pointless
>>>>>> updates actually harms performance. I'd also be
>>>>>> perfectly fine without the opt-out config. I
>>>>>> really think that (because of the timestamp
>>>>>> semantics work already underway), we're already
>>>>>> pre-fetching the prior result most of the time, so
>>>>>> there would actually be very little extra I/O
>>>>>> involved in implementing emit-on-change.
>>>>>> 
>>>>>> However, we should consider whether my experience
>>>>>> is likely to be general. Do you have some use
>>>>>> case in mind for which you'd actually want some
>>>>>> KTable results to be emit-on-update for semantic
>>>>>> reasons?
>>>>>> 
>>>>>> Thanks,
>>>>>> -John
>>>>>> 
>>>>>> 
>>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote:
>>>>>>> Hi Richard,
>>>>>>> 
>>>>>>> Thank you for the KIP.
>>>>>>> 
>>>>>>> I agree with John that we should focus on the interface and behavior
>>>>>>> change in a KIP. We can discuss the implementation later.
>>>>>>> 
>>>>>>> I am also +1 for the survey.
>>>>>>> 
>>>>>>> I had a thought about this. Couldn't we consider emit-on-change to be
>>>>>>> one config of suppress (like `untilWindowCloses`)? What you basically
>>>>>>> propose is to suppress updates if they do not change the result.
>>>>>>> Considering emit on change as a flavour of suppress would be more
>>>>>>> flexible because it would specify the behavior locally for a KTable
>>>>>>> instead of globally for all KTables. Additionally, specifying the
>>>>>>> behavior in one place instead of multiple places feels more intuitive
>>>>>>> and consistent to me.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Bruno
>>>>>>> 
>>>>>>> On Fri, Jan 24, 2020 at 7:49 AM John Roesler <vvcep...@apache.org>
>>>> wrote:
>>>>>>>> 
>>>>>>>> Hi Richard,
>>>>>>>> 
>>>>>>>> Thanks for picking this up! I know of at least one large community
>>>> member
>>>>>>>> for which this feature is absolutely essential.
>>>>>>>> 
>>>>>>>> If I understand your two options, it seems like the proposal is to
>>>> implement
>>>>>>>> it as a behavior change regardless, and the question is whether to
>>>> provide
>>>>>>>> an opt-out config or not.
>>>>>>>> 
>>>>>>>> Given that any implementation of this feature would have some
>>>> performance
>>>>>>>> impact under some workloads, and also that we don't know if anyone
>>>> really
>>>>>>>> depends on emit-on-update time semantics, it seems like we should
>>>> propose
>>>>>>>> to add an opt-out config. Can you update the KIP to mention the
>>>> exact
>>>>>>>> config key and value(s) you'd propose?
>>>>>>>> 
>>>>>>>> Just to move the discussion forward, maybe something like:
>>>>>>>>    emit.on := change|update
>>>>>>>> with the new default being "change"
>>>>>>>> 
>>>>>>>> Thanks for pointing out the timestamp issue in particular. I agree
>>>> that if
>>>>>>>> we discard the latter update as a no-op, then we also have to
>>>> discard its
>>>>>>>> timestamp (obviously, we don't forward the timestamp update, as
>>>> that's
>>>>>>>> the whole point, but we also can't update the timestamp in the
>>>> store, as
>>>>>>>> the store must remain consistent with what has been emitted).
>>>>>>>> 
>>>>>>>> I have to confess that I disagree with your implementation
>>>> proposal, but
>>>>>>>> it's also not necessary to discuss implementation in the KIP. Maybe
>>>> it would
>>>>>>>> be less controversial if you just drop that section for now, so
>>>> that the KIP
>>>>>>>> discussion can focus on the behavior change and config.
>>>>>>>> 
>>>>>>>> Just for reference, there is some research into this domain. For
>>>> example,
>>>>>>>> see the "Report" section (3.2.3) of the SECRET paper:
>>>>>>>> 
>>>> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeople.csail.mit.edu%2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078978612&amp;sdata=yCdlYShUf2y3mqZQHA8ZGR83%2B99CZp%2B5r0HksqS%2B%2FPc%3D&amp;reserved=0
>>>>>>>> 
>>>>>>>> It might help to round out the proposal if you take a brief survey
>>>> of the
>>>>>>>> behaviors of other systems, along with pros and cons if any are
>>>> reported.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote:
>>>>>>>>> Hi everybody!
>>>>>>>>> 
>>>>>>>>> I'd like to propose a change that we probably should've added for
>>>> a long
>>>>>>>>> time now.
>>>>>>>>> 
>>>>>>>>> The key benefit of this KIP would be reduced traffic in Kafka
>>>> Streams since
>>>>>>>>> a lot of no-op results would no longer be sent downstream.
>>>>>>>>> Here is the KIP for reference.
>>>>>>>>> 
>>>>>>>>> 
>>>> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit%2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078988604&amp;sdata=eVtuuyDX6aNsYcw8wOmM1HSinOq5ptPPUaTxXqgyA7g%3D&amp;reserved=0
>>>>>>>>> 
>>>>>>>>> Currently, I seek to formalize our approach for this KIP first
>>>> before we
>>>>>>>>> determine concrete API additions / configurations.
>>>>>>>>> Some configs might warrant adding, whiles others are not necessary
>>>> since
>>>>>>>>> adding them would only increase complexity of Kafka Streams.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Richard
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 
> ________________________________
> 
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, copying, or 
> distribution of this email (or any attachments) by others is prohibited. If 
> you are not the intended recipient, please contact the sender immediately and 
> permanently delete this email and any attachments. No employee or agent of 
> TiVo is authorized to conclude any binding agreement on behalf of TiVo by 
> email. Binding agreements with TiVo may only be made by a signed written 
> agreement.

Reply via email to