I think this is good idea.
> On Jan 31, 2020, at 4:49 PM, Thomas Becker <thomas.bec...@tivo.com> wrote: > > How do folks feel about allowing the mechanism by which no-ops are detected > to be pluggable? Meaning use something like a hash by default, but you could > optionally provide an implementation of something to use instead, like a > ChangeDetector. This could be useful for example to ignore changes to certain > fields, which may not be relevant to the operation being performed. > ________________________________ > From: John Roesler <vvcep...@apache.org> > Sent: Friday, January 31, 2020 4:51 PM > To: dev@kafka.apache.org <dev@kafka.apache.org> > Subject: Re: [KAFKA-557] Add emit on change support for Kafka Streams > > [EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT > CLICK any links or attachments unless you expected them. > ________________________________ > > > Hello all, > > Sorry for my silence. It seems like we are getting close to consensus. > Hopefully, we could move to a vote soon! > > All of the reasoning from Matthias and Bruno around timestamp is compelling. I > would be strongly in favor of stating a few things very clearly in the KIP: > 1. Streams will drop no-op updates only for KTable operations. > > That is, we won't make any changes to KStream aggregations at the moment. It > does seem like we can potentially revisit the time semantics of that > operation > in the future, but we don't need to do it now. > > On the other hand, the proposed semantics for KTable timestamps (marking the > beginning of the validity of that record) makes sense to me. > > 2. Streams will only drop no-op updates for _stateful_ KTable operations. > > We don't want to add a hard guarantee that Streams will _never_ emit a no-op > table update because it would require adding state to otherwise stateless > operations. If someone is really concerned about a particular stateless > operation producing a lot of no-op results, all they have to do is > materialize it, and Streams would automatically drop the no-ops. > > Additionally, I'm +1 on not adding an opt-out at this time. > > Regarding the KIP itself, I would clean it up a bit before calling for a vote. > There is a lot of "discussion"-type language there, which is very natural to > read, but makes it a bit hard to see what _exactly_ the kip is proposing. > > Richard, would you mind just making the "proposed behavior change" a simple > and > succinct list of bullet points? I.e., please drop glue phrases like "there has > been some discussion" or "possibly we could do X". For the final version of > the > KIP, it should just say, "Streams will do X, Streams will do Y". Feel free to > add an elaboration section to explain more about what X and Y mean, but we > don't > need to talk about possibilities or alternatives except in the "rejected > alternatives" section. > > Accordingly, can you also move the options you presented in the intro to the > "rejected alternatives" section and only mention the final proposal itself? > > This just really helps reviewers to know what they are voting for, and it > helps > everyone after the fact when they are trying to get clarity on what exactly > the > proposal is, versus all the things it could have been. > > Thanks, > -John > > >> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote: >> Hello to all, >> >> I've finished making some initial modifications to the KIP. >> I have decided to keep the implementation section in the KIP for >> record-keeping purposes. >> >> For now, we should focus on only the proposed behavior changes instead. >> >> See if you have any comments! >> >> Cheers, >> Richard >> >> On Sat, Jan 25, 2020 at 11:12 AM Richard Yu <yohan.richard...@gmail.com> >> wrote: >> >>> Hi all, >>> >>> Thanks for all the discussion! >>> >>> @John and @Bruno I will survey other possible systems and see what I can >>> do. >>> Just a question, by systems, I suppose you would mean the pros and cons of >>> different reporting strategies? >>> >>> I'm not completely certain on this point, so it would be great if you can >>> clarify on that. >>> >>> So here's what I got from all the discussion so far: >>> >>> - Since both Matthias and John seems to have come to a consensus on >>> this, then we will go for an all-round behavorial change for KTables. >>> After >>> some thought, I decided that for now, an opt-out config will not be added. >>> As John have pointed out, no-op changes tend to explode further down the >>> topology as they are forwarded to more and more processor nodes >>> downstream. >>> - About using hash codes, after some explanation from John, it looks >>> like hash codes might not be as ideal (for implementation). For now, we >>> will omit that detail, and save it for the PR. >>> - @Bruno You do have valid concerns. Though, I am not completely >>> certain if we want to do emit-on-change only for materialized KTables. I >>> will put it down in the KIP regardless. >>> >>> I will do my best to address all points raised so far on the discussion. >>> Hope we could keep this going! >>> >>> Best, >>> Richard >>> >>>> On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna <br...@confluent.io> wrote: >>> >>>> Thank you Matthias for the use cases! >>>> >>>> Looking at both use cases, I think you need to elaborate on them in >>>> the KIP, Richard. >>>> >>>> Emit from plain KTable: >>>> I agree with Matthias that the lower timestamp makes sense because it >>>> marks the start of the validity of the record. Idempotent records with >>>> a higher timestamp can be safely ignored. A corner case that I >>>> discussed with Matthias offline is when we do not materialize a KTable >>>> due to optimization. Then we cannot avoid the idempotent records >>>> because we do not keep the first record with the lower timestamp to >>>> compare to. >>>> >>>> Emit from KTable with aggregations: >>>> If we specify that an aggregation result should have the highest >>>> timestamp of the records that participated in the aggregation, we >>>> cannot ignore any idempotent records. Admittedly, the result of an >>>> aggregation usually changes, but there are aggregations where the >>>> result may not change like min and max, or sum when the incoming >>>> records have a value of zero. In those cases, we could benefit of the >>>> emit on change, but only if we define the semantics of the >>>> aggregations to not use the highest timestamp of the participating >>>> records for the result. In Kafka Streams, we do not have min, max, and >>>> sum as explicit aggregations, but we need to provide an API to define >>>> what timestamp should be used for the result of an aggregation if we >>>> want to go down this path. >>>> >>>> All of this does not block this KIP and I just wanted to put this >>>> aspects up for discussion. The KIP can limit itself to emit from >>>> materialized KTables. However, the limits should be explicitly stated >>>> in the KIP. >>>> >>>> Best, >>>> Bruno >>>> >>>> >>>> >>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>>> >>>>> IMHO, the question about semantics depends on the use case, in >>>>> particular on the origin of a KTable. >>>>> >>>>> If there is a changlog topic that one reads directly into a KTable, >>>>> emit-on-change does actually make sense, because the timestamp indicates >>>>> _when_ the update was _effective_. For this case, it is semantically >>>>> sound to _not_ update the timestamp in the store, because the second >>>>> update is actually idempotent and advancing the timestamp is not ideal >>>>> (one could even consider it to be wrong to advance the timestamp) >>>>> because the "valid time" of the record pair did not change. >>>>> >>>>> This reasoning also applies to KTable-KTable joins. >>>>> >>>>> However, if the KTable is the result of an aggregation, I think >>>>> emit-on-update is more natural, because the timestamp reflects the >>>>> _last_ time (ie, highest timestamp) of all input records the contributed >>>>> to the result. Hence, updating the timestamp and emitting a new record >>>>> actually sounds correct to me. This applies to windowed and non-windowed >>>>> aggregations IMHO. >>>>> >>>>> However, considering the argument that the timestamp should not be >>>>> update in the first case in the store to begin with, both cases are >>>>> actually the same, and both can be modeled as emit-on-change: if a >>>>> `table()` operator does not update the timestamp if the value does not >>>>> change, there is _no_ change and thus nothing is emitted. At the same >>>>> time, if an aggregation operator does update the timestamp (even if the >>>>> value does not change) there _is_ a change and we emit. >>>>> >>>>> Note that handling out-of-order data for aggregations would also work >>>>> seamlessly with this approach -- for out-of-order records, the timestamp >>>>> does never change, and thus, we only emit if the result itself changes. >>>>> >>>>> Therefore, I would argue that we might not even need any config, because >>>>> the emit-on-change behavior is just correct and reduced the downstream >>>>> load, while our current behavior is not ideal (even if it's also >>>> correct). >>>>> >>>>> Thoughts? >>>>> >>>>> -Matthias >>>>> >>>>> On 1/24/20 9:37 AM, John Roesler wrote: >>>>>> Hi Bruno, >>>>>> >>>>>> Thanks for that idea. I hadn't considered that >>>>>> option before, and it does seem like that would be >>>>>> the right place to put it if we think it might be >>>>>> semantically important to control on a >>>>>> table-by-table basis. >>>>>> >>>>>> I had been thinking of it less semantically and >>>>>> more practically. In the context of a large >>>>>> topology, or more generally, a large software >>>>>> system that contains many topologies and other >>>>>> event-driven systems, each no-op result becomes an >>>>>> input that is destined to itself become a no-op >>>>>> result, and so on, all the way through the system. >>>>>> Thus, a single pointless processing result becomes >>>>>> amplified into a large number of pointless >>>>>> computations, cache perturbations, and network >>>>>> and disk I/O operations. If you also consider >>>>>> operations with fan-out implications, like >>>>>> branching or foreign-key joins, the wasted >>>>>> resources are amplified not just in proportion to >>>>>> the size of the system, but the size of the system >>>>>> times the average fan-out (to the power of the >>>>>> number of fan-out operations on the path(s) >>>>>> through the system). >>>>>> >>>>>> In my time operating such systems, I've observed >>>>>> these effects to be very real, and actually, the >>>>>> system and use case doesn't have to be very large >>>>>> before the amplification poses an existential >>>>>> threat to the system as a whole. >>>>>> >>>>>> This is the basis of my advocating for a simple >>>>>> behavior change, rather than an opt-in config of >>>>>> any kind. It seems like Streams should "do the >>>>>> right thing" for the majority use case. My theory >>>>>> (which may be wrong) is that the majority use case >>>>>> is more like "relational queries" than "CEP >>>>>> queries". Even if you were doing some >>>>>> event-sensitive computation, wouldn't you do them >>>>>> as Stream operations (where this feature is >>>>>> inapplicable anyway)? >>>>>> >>>>>> In keeping with the "practical" perspective, I >>>>>> suggested the opt-out config only in the (I think >>>>>> unlikely) event that filtering out pointless >>>>>> updates actually harms performance. I'd also be >>>>>> perfectly fine without the opt-out config. I >>>>>> really think that (because of the timestamp >>>>>> semantics work already underway), we're already >>>>>> pre-fetching the prior result most of the time, so >>>>>> there would actually be very little extra I/O >>>>>> involved in implementing emit-on-change. >>>>>> >>>>>> However, we should consider whether my experience >>>>>> is likely to be general. Do you have some use >>>>>> case in mind for which you'd actually want some >>>>>> KTable results to be emit-on-update for semantic >>>>>> reasons? >>>>>> >>>>>> Thanks, >>>>>> -John >>>>>> >>>>>> >>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote: >>>>>>> Hi Richard, >>>>>>> >>>>>>> Thank you for the KIP. >>>>>>> >>>>>>> I agree with John that we should focus on the interface and behavior >>>>>>> change in a KIP. We can discuss the implementation later. >>>>>>> >>>>>>> I am also +1 for the survey. >>>>>>> >>>>>>> I had a thought about this. Couldn't we consider emit-on-change to be >>>>>>> one config of suppress (like `untilWindowCloses`)? What you basically >>>>>>> propose is to suppress updates if they do not change the result. >>>>>>> Considering emit on change as a flavour of suppress would be more >>>>>>> flexible because it would specify the behavior locally for a KTable >>>>>>> instead of globally for all KTables. Additionally, specifying the >>>>>>> behavior in one place instead of multiple places feels more intuitive >>>>>>> and consistent to me. >>>>>>> >>>>>>> Best, >>>>>>> Bruno >>>>>>> >>>>>>> On Fri, Jan 24, 2020 at 7:49 AM John Roesler <vvcep...@apache.org> >>>> wrote: >>>>>>>> >>>>>>>> Hi Richard, >>>>>>>> >>>>>>>> Thanks for picking this up! I know of at least one large community >>>> member >>>>>>>> for which this feature is absolutely essential. >>>>>>>> >>>>>>>> If I understand your two options, it seems like the proposal is to >>>> implement >>>>>>>> it as a behavior change regardless, and the question is whether to >>>> provide >>>>>>>> an opt-out config or not. >>>>>>>> >>>>>>>> Given that any implementation of this feature would have some >>>> performance >>>>>>>> impact under some workloads, and also that we don't know if anyone >>>> really >>>>>>>> depends on emit-on-update time semantics, it seems like we should >>>> propose >>>>>>>> to add an opt-out config. Can you update the KIP to mention the >>>> exact >>>>>>>> config key and value(s) you'd propose? >>>>>>>> >>>>>>>> Just to move the discussion forward, maybe something like: >>>>>>>> emit.on := change|update >>>>>>>> with the new default being "change" >>>>>>>> >>>>>>>> Thanks for pointing out the timestamp issue in particular. I agree >>>> that if >>>>>>>> we discard the latter update as a no-op, then we also have to >>>> discard its >>>>>>>> timestamp (obviously, we don't forward the timestamp update, as >>>> that's >>>>>>>> the whole point, but we also can't update the timestamp in the >>>> store, as >>>>>>>> the store must remain consistent with what has been emitted). >>>>>>>> >>>>>>>> I have to confess that I disagree with your implementation >>>> proposal, but >>>>>>>> it's also not necessary to discuss implementation in the KIP. Maybe >>>> it would >>>>>>>> be less controversial if you just drop that section for now, so >>>> that the KIP >>>>>>>> discussion can focus on the behavior change and config. >>>>>>>> >>>>>>>> Just for reference, there is some research into this domain. For >>>> example, >>>>>>>> see the "Report" section (3.2.3) of the SECRET paper: >>>>>>>> >>>> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeople.csail.mit.edu%2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078978612&sdata=yCdlYShUf2y3mqZQHA8ZGR83%2B99CZp%2B5r0HksqS%2B%2FPc%3D&reserved=0 >>>>>>>> >>>>>>>> It might help to round out the proposal if you take a brief survey >>>> of the >>>>>>>> behaviors of other systems, along with pros and cons if any are >>>> reported. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> -John >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote: >>>>>>>>> Hi everybody! >>>>>>>>> >>>>>>>>> I'd like to propose a change that we probably should've added for >>>> a long >>>>>>>>> time now. >>>>>>>>> >>>>>>>>> The key benefit of this KIP would be reduced traffic in Kafka >>>> Streams since >>>>>>>>> a lot of no-op results would no longer be sent downstream. >>>>>>>>> Here is the KIP for reference. >>>>>>>>> >>>>>>>>> >>>> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit%2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078988604&sdata=eVtuuyDX6aNsYcw8wOmM1HSinOq5ptPPUaTxXqgyA7g%3D&reserved=0 >>>>>>>>> >>>>>>>>> Currently, I seek to formalize our approach for this KIP first >>>> before we >>>>>>>>> determine concrete API additions / configurations. >>>>>>>>> Some configs might warrant adding, whiles others are not necessary >>>> since >>>>>>>>> adding them would only increase complexity of Kafka Streams. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Richard >>>>>>>>> >>>>>>> >>>>> >>>> >>> >> > > ________________________________ > > This email and any attachments may contain confidential and privileged > material for the sole use of the intended recipient. Any review, copying, or > distribution of this email (or any attachments) by others is prohibited. If > you are not the intended recipient, please contact the sender immediately and > permanently delete this email and any attachments. No employee or agent of > TiVo is authorized to conclude any binding agreement on behalf of TiVo by > email. Binding agreements with TiVo may only be made by a signed written > agreement.