Hi Thomas and yuzhihong,

That’s an interesting idea. Can you help think of a use case that isn’t also 
served by filtering or mapping beforehand?

Thanks for helping to design this feature!
-John

On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com wrote:
> I think this is good idea. 
> 
> > On Jan 31, 2020, at 4:49 PM, Thomas Becker <thomas.bec...@tivo.com> wrote:
> > 
> > How do folks feel about allowing the mechanism by which no-ops are 
> > detected to be pluggable? Meaning use something like a hash by default, but 
> > you could optionally provide an implementation of something to use instead, 
> > like a ChangeDetector. This could be useful for example to ignore changes 
> > to certain fields, which may not be relevant to the operation being 
> > performed.
> > ________________________________
> > From: John Roesler <vvcep...@apache.org>
> > Sent: Friday, January 31, 2020 4:51 PM
> > To: dev@kafka.apache.org <dev@kafka.apache.org>
> > Subject: Re: [KAFKA-557] Add emit on change support for Kafka Streams
> > 
> > [EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT 
> > CLICK any links or attachments unless you expected them.
> > ________________________________
> > 
> > 
> > Hello all,
> > 
> > Sorry for my silence. It seems like we are getting close to consensus.
> > Hopefully, we could move to a vote soon!
> > 
> > All of the reasoning from Matthias and Bruno around timestamp is 
> > compelling. I
> > would be strongly in favor of stating a few things very clearly in the KIP:
> > 1. Streams will drop no-op updates only for KTable operations.
> > 
> >   That is, we won't make any changes to KStream aggregations at the moment. 
> > It
> >   does seem like we can potentially revisit the time semantics of that 
> > operation
> >   in the future, but we don't need to do it now.
> > 
> >   On the other hand, the proposed semantics for KTable timestamps (marking 
> > the
> >   beginning of the validity of that record) makes sense to me.
> > 
> > 2. Streams will only drop no-op updates for _stateful_ KTable operations.
> > 
> >   We don't want to add a hard guarantee that Streams will _never_ emit a 
> > no-op
> >   table update because it would require adding state to otherwise stateless
> >   operations. If someone is really concerned about a particular stateless
> >   operation producing a lot of no-op results, all they have to do is
> >   materialize it, and Streams would automatically drop the no-ops.
> > 
> > Additionally, I'm +1 on not adding an opt-out at this time.
> > 
> > Regarding the KIP itself, I would clean it up a bit before calling for a 
> > vote.
> > There is a lot of "discussion"-type language there, which is very natural to
> > read, but makes it a bit hard to see what _exactly_ the kip is proposing.
> > 
> > Richard, would you mind just making the "proposed behavior change" a simple 
> > and
> > succinct list of bullet points? I.e., please drop glue phrases like "there 
> > has
> > been some discussion" or "possibly we could do X". For the final version of 
> > the
> > KIP, it should just say, "Streams will do X, Streams will do Y". Feel free 
> > to
> > add an elaboration section to explain more about what X and Y mean, but we 
> > don't
> > need to talk about possibilities or alternatives except in the "rejected
> > alternatives" section.
> > 
> > Accordingly, can you also move the options you presented in the intro to the
> > "rejected alternatives" section and only mention the final proposal itself?
> > 
> > This just really helps reviewers to know what they are voting for, and it 
> > helps
> > everyone after the fact when they are trying to get clarity on what exactly 
> > the
> > proposal is, versus all the things it could have been.
> > 
> > Thanks,
> > -John
> > 
> > 
> >> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote:
> >> Hello to all,
> >> 
> >> I've finished making some initial modifications to the KIP.
> >> I have decided to keep the implementation section in the KIP for
> >> record-keeping purposes.
> >> 
> >> For now, we should focus on only the proposed behavior changes instead.
> >> 
> >> See if you have any comments!
> >> 
> >> Cheers,
> >> Richard
> >> 
> >> On Sat, Jan 25, 2020 at 11:12 AM Richard Yu <yohan.richard...@gmail.com>
> >> wrote:
> >> 
> >>> Hi all,
> >>> 
> >>> Thanks for all the discussion!
> >>> 
> >>> @John and @Bruno I will survey other possible systems and see what I can
> >>> do.
> >>> Just a question, by systems, I suppose you would mean the pros and cons of
> >>> different reporting strategies?
> >>> 
> >>> I'm not completely certain on this point, so it would be great if you can
> >>> clarify on that.
> >>> 
> >>> So here's what I got from all the discussion so far:
> >>> 
> >>>   - Since both Matthias and John seems to have come to a consensus on
> >>>   this, then we will go for an all-round behavorial change for KTables. 
> >>> After
> >>>   some thought, I decided that for now, an opt-out config will not be 
> >>> added.
> >>>   As John have pointed out, no-op changes tend to explode further down the
> >>>   topology as they are forwarded to more and more processor nodes 
> >>> downstream.
> >>>   - About using hash codes, after some explanation from John, it looks
> >>>   like hash codes might not be as ideal (for implementation). For now, we
> >>>   will omit that detail, and save it for the PR.
> >>>   - @Bruno You do have valid concerns. Though, I am not completely
> >>>   certain if we want to do emit-on-change only for materialized KTables. I
> >>>   will put it down in the KIP regardless.
> >>> 
> >>> I will do my best to address all points raised so far on the discussion.
> >>> Hope we could keep this going!
> >>> 
> >>> Best,
> >>> Richard
> >>> 
> >>>> On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna <br...@confluent.io> wrote:
> >>> 
> >>>> Thank you Matthias for the use cases!
> >>>> 
> >>>> Looking at both use cases, I think you need to elaborate on them in
> >>>> the KIP, Richard.
> >>>> 
> >>>> Emit from plain KTable:
> >>>> I agree with Matthias that the lower timestamp makes sense because it
> >>>> marks the start of the validity of the record. Idempotent records with
> >>>> a higher timestamp can be safely ignored. A corner case that I
> >>>> discussed with Matthias offline is when we do not materialize a KTable
> >>>> due to optimization. Then we cannot avoid the idempotent records
> >>>> because we do not keep the first record with the lower timestamp to
> >>>> compare to.
> >>>> 
> >>>> Emit from KTable with aggregations:
> >>>> If we specify that an aggregation result should have the highest
> >>>> timestamp of the records that participated in the aggregation, we
> >>>> cannot ignore any idempotent records. Admittedly, the result of an
> >>>> aggregation usually changes, but there are aggregations where the
> >>>> result may not change like min and max, or sum when the incoming
> >>>> records have a value of zero. In those cases, we could benefit of the
> >>>> emit on change, but only if we define the semantics of the
> >>>> aggregations to not use the highest timestamp of the participating
> >>>> records for the result. In Kafka Streams, we do not have min, max, and
> >>>> sum as explicit aggregations, but we need to provide an API to define
> >>>> what timestamp should be used for the result of an aggregation if we
> >>>> want to go down this path.
> >>>> 
> >>>> All of this does not block this KIP and I just wanted to put this
> >>>> aspects up for discussion. The KIP can limit itself to emit from
> >>>> materialized KTables. However, the limits should be explicitly stated
> >>>> in the KIP.
> >>>> 
> >>>> Best,
> >>>> Bruno
> >>>> 
> >>>> 
> >>>> 
> >>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax <matth...@confluent.io>
> >>>> wrote:
> >>>>> 
> >>>>> IMHO, the question about semantics depends on the use case, in
> >>>>> particular on the origin of a KTable.
> >>>>> 
> >>>>> If there is a changlog topic that one reads directly into a KTable,
> >>>>> emit-on-change does actually make sense, because the timestamp indicates
> >>>>> _when_ the update was _effective_. For this case, it is semantically
> >>>>> sound to _not_ update the timestamp in the store, because the second
> >>>>> update is actually idempotent and advancing the timestamp is not ideal
> >>>>> (one could even consider it to be wrong to advance the timestamp)
> >>>>> because the "valid time" of the record pair did not change.
> >>>>> 
> >>>>> This reasoning also applies to KTable-KTable joins.
> >>>>> 
> >>>>> However, if the KTable is the result of an aggregation, I think
> >>>>> emit-on-update is more natural, because the timestamp reflects the
> >>>>> _last_ time (ie, highest timestamp) of all input records the contributed
> >>>>> to the result. Hence, updating the timestamp and emitting a new record
> >>>>> actually sounds correct to me. This applies to windowed and non-windowed
> >>>>> aggregations IMHO.
> >>>>> 
> >>>>> However, considering the argument that the timestamp should not be
> >>>>> update in the first case in the store to begin with, both cases are
> >>>>> actually the same, and both can be modeled as emit-on-change: if a
> >>>>> `table()` operator does not update the timestamp if the value does not
> >>>>> change, there is _no_ change and thus nothing is emitted. At the same
> >>>>> time, if an aggregation operator does update the timestamp (even if the
> >>>>> value does not change) there _is_ a change and we emit.
> >>>>> 
> >>>>> Note that handling out-of-order data for aggregations would also work
> >>>>> seamlessly with this approach -- for out-of-order records, the timestamp
> >>>>> does never change, and thus, we only emit if the result itself changes.
> >>>>> 
> >>>>> Therefore, I would argue that we might not even need any config, because
> >>>>> the emit-on-change behavior is just correct and reduced the downstream
> >>>>> load, while our current behavior is not ideal (even if it's also
> >>>> correct).
> >>>>> 
> >>>>> Thoughts?
> >>>>> 
> >>>>> -Matthias
> >>>>> 
> >>>>> On 1/24/20 9:37 AM, John Roesler wrote:
> >>>>>> Hi Bruno,
> >>>>>> 
> >>>>>> Thanks for that idea. I hadn't considered that
> >>>>>> option before, and it does seem like that would be
> >>>>>> the right place to put it if we think it might be
> >>>>>> semantically important to control on a
> >>>>>> table-by-table basis.
> >>>>>> 
> >>>>>> I had been thinking of it less semantically and
> >>>>>> more practically. In the context of a large
> >>>>>> topology, or more generally, a large software
> >>>>>> system that contains many topologies and other
> >>>>>> event-driven systems, each no-op result becomes an
> >>>>>> input that is destined to itself become a no-op
> >>>>>> result, and so on, all the way through the system.
> >>>>>> Thus, a single pointless processing result becomes
> >>>>>> amplified into a large number of pointless
> >>>>>> computations, cache perturbations, and network
> >>>>>> and disk I/O operations. If you also consider
> >>>>>> operations with fan-out implications, like
> >>>>>> branching or foreign-key joins, the wasted
> >>>>>> resources are amplified not just in proportion to
> >>>>>> the size of the system, but the size of the system
> >>>>>> times the average fan-out (to the power of the
> >>>>>> number of fan-out operations on the path(s)
> >>>>>> through the system).
> >>>>>> 
> >>>>>> In my time operating such systems, I've observed
> >>>>>> these effects to be very real, and actually, the
> >>>>>> system and use case doesn't have to be very large
> >>>>>> before the amplification poses an existential
> >>>>>> threat to the system as a whole.
> >>>>>> 
> >>>>>> This is the basis of my advocating for a simple
> >>>>>> behavior change, rather than an opt-in config of
> >>>>>> any kind. It seems like Streams should "do the
> >>>>>> right thing" for the majority use case. My theory
> >>>>>> (which may be wrong) is that the majority use case
> >>>>>> is more like "relational queries" than "CEP
> >>>>>> queries". Even if you were doing some
> >>>>>> event-sensitive computation, wouldn't you do them
> >>>>>> as Stream operations (where this feature is
> >>>>>> inapplicable anyway)?
> >>>>>> 
> >>>>>> In keeping with the "practical" perspective, I
> >>>>>> suggested the opt-out config only in the (I think
> >>>>>> unlikely) event that filtering out pointless
> >>>>>> updates actually harms performance. I'd also be
> >>>>>> perfectly fine without the opt-out config. I
> >>>>>> really think that (because of the timestamp
> >>>>>> semantics work already underway), we're already
> >>>>>> pre-fetching the prior result most of the time, so
> >>>>>> there would actually be very little extra I/O
> >>>>>> involved in implementing emit-on-change.
> >>>>>> 
> >>>>>> However, we should consider whether my experience
> >>>>>> is likely to be general. Do you have some use
> >>>>>> case in mind for which you'd actually want some
> >>>>>> KTable results to be emit-on-update for semantic
> >>>>>> reasons?
> >>>>>> 
> >>>>>> Thanks,
> >>>>>> -John
> >>>>>> 
> >>>>>> 
> >>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote:
> >>>>>>> Hi Richard,
> >>>>>>> 
> >>>>>>> Thank you for the KIP.
> >>>>>>> 
> >>>>>>> I agree with John that we should focus on the interface and behavior
> >>>>>>> change in a KIP. We can discuss the implementation later.
> >>>>>>> 
> >>>>>>> I am also +1 for the survey.
> >>>>>>> 
> >>>>>>> I had a thought about this. Couldn't we consider emit-on-change to be
> >>>>>>> one config of suppress (like `untilWindowCloses`)? What you basically
> >>>>>>> propose is to suppress updates if they do not change the result.
> >>>>>>> Considering emit on change as a flavour of suppress would be more
> >>>>>>> flexible because it would specify the behavior locally for a KTable
> >>>>>>> instead of globally for all KTables. Additionally, specifying the
> >>>>>>> behavior in one place instead of multiple places feels more intuitive
> >>>>>>> and consistent to me.
> >>>>>>> 
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>> 
> >>>>>>> On Fri, Jan 24, 2020 at 7:49 AM John Roesler <vvcep...@apache.org>
> >>>> wrote:
> >>>>>>>> 
> >>>>>>>> Hi Richard,
> >>>>>>>> 
> >>>>>>>> Thanks for picking this up! I know of at least one large community
> >>>> member
> >>>>>>>> for which this feature is absolutely essential.
> >>>>>>>> 
> >>>>>>>> If I understand your two options, it seems like the proposal is to
> >>>> implement
> >>>>>>>> it as a behavior change regardless, and the question is whether to
> >>>> provide
> >>>>>>>> an opt-out config or not.
> >>>>>>>> 
> >>>>>>>> Given that any implementation of this feature would have some
> >>>> performance
> >>>>>>>> impact under some workloads, and also that we don't know if anyone
> >>>> really
> >>>>>>>> depends on emit-on-update time semantics, it seems like we should
> >>>> propose
> >>>>>>>> to add an opt-out config. Can you update the KIP to mention the
> >>>> exact
> >>>>>>>> config key and value(s) you'd propose?
> >>>>>>>> 
> >>>>>>>> Just to move the discussion forward, maybe something like:
> >>>>>>>>    emit.on := change|update
> >>>>>>>> with the new default being "change"
> >>>>>>>> 
> >>>>>>>> Thanks for pointing out the timestamp issue in particular. I agree
> >>>> that if
> >>>>>>>> we discard the latter update as a no-op, then we also have to
> >>>> discard its
> >>>>>>>> timestamp (obviously, we don't forward the timestamp update, as
> >>>> that's
> >>>>>>>> the whole point, but we also can't update the timestamp in the
> >>>> store, as
> >>>>>>>> the store must remain consistent with what has been emitted).
> >>>>>>>> 
> >>>>>>>> I have to confess that I disagree with your implementation
> >>>> proposal, but
> >>>>>>>> it's also not necessary to discuss implementation in the KIP. Maybe
> >>>> it would
> >>>>>>>> be less controversial if you just drop that section for now, so
> >>>> that the KIP
> >>>>>>>> discussion can focus on the behavior change and config.
> >>>>>>>> 
> >>>>>>>> Just for reference, there is some research into this domain. For
> >>>> example,
> >>>>>>>> see the "Report" section (3.2.3) of the SECRET paper:
> >>>>>>>> 
> >>>> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeople.csail.mit.edu%2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078978612&amp;sdata=yCdlYShUf2y3mqZQHA8ZGR83%2B99CZp%2B5r0HksqS%2B%2FPc%3D&amp;reserved=0
> >>>>>>>> 
> >>>>>>>> It might help to round out the proposal if you take a brief survey
> >>>> of the
> >>>>>>>> behaviors of other systems, along with pros and cons if any are
> >>>> reported.
> >>>>>>>> 
> >>>>>>>> Thanks,
> >>>>>>>> -John
> >>>>>>>> 
> >>>>>>>> 
> >>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote:
> >>>>>>>>> Hi everybody!
> >>>>>>>>> 
> >>>>>>>>> I'd like to propose a change that we probably should've added for
> >>>> a long
> >>>>>>>>> time now.
> >>>>>>>>> 
> >>>>>>>>> The key benefit of this KIP would be reduced traffic in Kafka
> >>>> Streams since
> >>>>>>>>> a lot of no-op results would no longer be sent downstream.
> >>>>>>>>> Here is the KIP for reference.
> >>>>>>>>> 
> >>>>>>>>> 
> >>>> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit%2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078988604&amp;sdata=eVtuuyDX6aNsYcw8wOmM1HSinOq5ptPPUaTxXqgyA7g%3D&amp;reserved=0
> >>>>>>>>> 
> >>>>>>>>> Currently, I seek to formalize our approach for this KIP first
> >>>> before we
> >>>>>>>>> determine concrete API additions / configurations.
> >>>>>>>>> Some configs might warrant adding, whiles others are not necessary
> >>>> since
> >>>>>>>>> adding them would only increase complexity of Kafka Streams.
> >>>>>>>>> 
> >>>>>>>>> Cheers,
> >>>>>>>>> Richard
> >>>>>>>>> 
> >>>>>>> 
> >>>>> 
> >>>> 
> >>> 
> >> 
> > 
> > ________________________________
> > 
> > This email and any attachments may contain confidential and privileged 
> > material for the sole use of the intended recipient. Any review, copying, 
> > or distribution of this email (or any attachments) by others is prohibited. 
> > If you are not the intended recipient, please contact the sender 
> > immediately and permanently delete this email and any attachments. No 
> > employee or agent of TiVo is authorized to conclude any binding agreement 
> > on behalf of TiVo by email. Binding agreements with TiVo may only be made 
> > by a signed written agreement.
>

Reply via email to