Hi John, Sounds goods. It looks like we are close to wrapping things up. If there isn't any other revisions which needs to be made. (If so, please comment in the thread) I will start the voting process this Thursday (Pacific Standard Time).
Cheers, Richard On Tue, Feb 25, 2020 at 11:59 AM John Roesler <[email protected]> wrote: > Hi Richard, > > Sorry for the slow reply. I actually think we should avoid checking > equals() for now. Your reasoning is good, but the truth is that > depending on the implementation of equals() is non-trivial, > semantically, and (though I proposed it before), I'm not convinced > it's worth the risk. Much better to start with exactly one kind of > "idempotence detection". > > Even if someone does update their serdes, we know that the new > serde would still be able to _de_serialize the old format, or the whole > app would break. The situation is that the new result gets encoded > in the new binary format, which means we don't detect an idempotent > update for what it is. In this case, we'd write the new binary format to > disk and the changelog, and forward it downstream. However, we only > do this once. Now that the binary format for that record has been updated, > we would correctly detect idempotence of any subsequent updates. > > Plus, we would still be able to filter out idempotent updates in > repartition > sinks, since for those, we use the new serde to serialize both the "old" > and > "new" result. > > It's certainly a good observation, but I think we can just make a note of > it > in "rejected alternatives" for now, and plan to refine it later, if it does > pose a big performance problem. > > Thanks! > -John > > On Sat, Feb 22, 2020, at 18:14, Richard Yu wrote: > > Hi all, > > > > Updated the KIP. > > > > Just a question: do you think it would be a good idea if we check for > both > > Object#equals() and binary equality? > > Because there might be some subtle changes in the serialization (for > > example, if the user decides to upgrade their serialization procedure to > a > > new one), but the underlying values of the result might be the same. > > (therefore equals() might return true) > > > > Do you think this would be plausible? > > > > Cheers, > > Richard > > > > On Fri, Feb 21, 2020 at 2:37 PM Richard Yu <[email protected]> > > wrote: > > > > > Hello, > > > > > > Just to make some updates. I changed the name of the metric so that it > was > > > more in line with usual Kafka naming conventions for metrics / sensors. > > > Below is the updated description of the metric: > > > > > > dropped-idempotent-updates : (Level 2 - Per Task) DEBUG (rate | total) > > > > > > Description: This metric will record the number of updates that have > been > > > dropped since they are essentially re-performing an earlier operation. > > > > > > Note: > > > > > > - The rate option indicates the ratio of records dropped to actual > > > volume of records passing through the task. > > > - The total option will just give a raw count of the number of > records > > > dropped. > > > > > > > > > I hope that this is more on point. > > > > > > Best, > > > Richard > > > > > > On Fri, Feb 21, 2020 at 2:20 PM Richard Yu <[email protected] > > > > > wrote: > > > > > >> Hi all, > > >> > > >> Thanks for the clarification. I was just confused a little on what was > > >> going on. > > >> > > >> So I guess then that for the actual proposal. We got the following: > > >> > > >> 1. We check for binary equality, and perform no extra look ups. > > >> 2. Emphasize that this applies only to materialized tables. > > >> 3. We drop aggregation updates if key, value and timestamp is the > same. > > >> > > >> Then that settles the behavior changes. So it looks like the Metric > that > > >> is the only thing that is left. In this case, I think the metric > would be > > >> named the following: IdempotentUpdateMetric. This is mostly off the > top of > > >> my head. So if you think that we change it, feel free to say so. > > >> The metric will report the number of dropped operations inherently. > > >> > > >> It will probably be added as a Sensor, similar to the dropped records > > >> sensor we already have. > > >> > > >> If there isn't anything else, I will probably start the voting process > > >> next week! > > >> > > >> Cheers, > > >> Richard > > >> > > >> > > >> On Fri, Feb 21, 2020 at 11:23 AM John Roesler <[email protected]> > > >> wrote: > > >> > > >>> Hi Bruno, > > >>> > > >>> Thanks for the clarification. Indeed, I was thinking two things: > > >>> 1. For the initial implementation, we can just avoid adding any extra > > >>> lookups, but only do the comparison when we already happen to have > > >>> the prior value. > > >>> 2. I think, as a result of the timestamp semantics, we actually _do_ > look > > >>> up the prior value approximately all the time, so the idempotence > check > > >>> should be quite effective. > > >>> > > >>> I think that second point is the same thing you're referring to > > >>> potentially > > >>> being unnecessary. It does mean that we do fetch the whole value in a > > >>> lot of cases where we really only need the timestamp, so it could > > >>> certainly > > >>> be optimized in the future. In that future, we would need to weigh > that > > >>> optimization against losing the idempotence check. But, that's a > problem > > >>> for tomorrow :) > > >>> > > >>> I'm 100% on board with scrutinizing the performance as we implement > > >>> this feature. > > >>> > > >>> Thanks again, > > >>> -John > > >>> > > >>> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote: > > >>> > Hi John, > > >>> > > > >>> > I am glad to help you with your imagination. With overhead, I > mainly > > >>> > meant the additional lookup into the state store to get the current > > >>> > value, but I see now in the code that we do that lookup anyways > > >>> > (although I think we could avoid that in the cases where we do not > > >>> > need the old value). With or without config, we need to evaluate > the > > >>> > performance benefits of this change, in any case. > > >>> > > > >>> > Best, > > >>> > Bruno > > >>> > > > >>> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler <[email protected]> > > >>> wrote: > > >>> > > > > >>> > > Thanks for your remarks, Bruno! > > >>> > > > > >>> > > I'm in favor of standardizing on terminology like "not forwarding > > >>> > > idempotent updates" or "dropping idempotent updates". Maybe we > > >>> > > should make a pass on the KIP and just convert everything to this > > >>> > > phrasing. In retrospect, even the term "emit-on-change" has too > much > > >>> > > semantic baggage, since it implies the semantics from the SECRET > > >>> > > paper, which we don't really want to imply here. > > >>> > > > > >>> > > I'm also in favor of the metric as you propose. > > >>> > > > > >>> > > Likewise with stream aggregations, I was also under the > impression > > >>> > > that we agreed on dropping idempotent updates to the aggregation > > >>> > > result, any time we find that our "new" (key, value, timestamp) > > >>> result > > >>> > > is identical to the prior one. > > >>> > > > > >>> > > Also, I'm +1 on all your recommendations for updating the KIP > > >>> document > > >>> > > for clarity. > > >>> > > > > >>> > > Regarding the opt-out config. Perhaps I'm suffering from a > failure of > > >>> > > imagination, but I don't see how the current proposal could > really > > >>> have > > >>> > > a measurable impact on latency. If all we do is make a single > extra > > >>> pass > > >>> > > to compare two byte arrays for equality, only in the cases where > we > > >>> already > > >>> > > have the byte arrays available, it seems unlikely to measurably > > >>> affect the > > >>> > > processing of non-idempotent updates. It seems guaranteed to > > >>> _decrease_ > > >>> > > the latency of processing idempotent updates, since we get to > skip a > > >>> > > store#put, at least one producer#send, and also all downstream > > >>> processing, > > >>> > > including all the disk and network operations associated with > > >>> downstream > > >>> > > operations. > > >>> > > > > >>> > > It seems like if we're pretty sure this change would only > _help_, we > > >>> shouldn't > > >>> > > introduce the operational burden of an extra configuration. If we > > >>> want to > > >>> > > be more aggressive about dropping idempotent operations in the > > >>> future, > > >>> > > such as depending on equals() or adding a ChangeDetector > interface, > > >>> then > > >>> > > we should consider adding a configuration as part of that future > > >>> work. In > > >>> > > fact, if we add a simple "opt-in/opt-out" switch right now, we > might > > >>> find > > >>> > > that it's actually insufficient for whatever future feature we > might > > >>> propose, > > >>> > > then we have a mess of deprecating the opt-out config and > replacing > > >>> it. > > >>> > > > > >>> > > What do you think? > > >>> > > -John > > >>> > > > > >>> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna wrote: > > >>> > > > Hi all, > > >>> > > > > > >>> > > > Sorry for the late reply! > > >>> > > > > > >>> > > > I am also in favour of baby steps. > > >>> > > > > > >>> > > > I am undecided whether the KIP should contain a opt-out config > or > > >>> not. > > >>> > > > The overhead of emit-on-change might affect latency. For > > >>> applications > > >>> > > > where low latency is crucial and there are not too many > idempotent > > >>> > > > updates, it would be better to fall back to emit-on-update. > > >>> However, > > >>> > > > we do not know how much emit-on-change impacts latency. We > would > > >>> first > > >>> > > > need to benchmark that before we can decide about the > > >>> opt-out-config. > > >>> > > > > > >>> > > > A metric of dropped idempotent updates seems useful to me to be > > >>> > > > informed about potential upstream applications or upstream > > >>> operators > > >>> > > > that produce too many idempotent updates. The KIP should state > the > > >>> > > > name of the metric, its group, its tags, and its recording > level > > >>> (see > > >>> > > > KIP-444 or KIP-471 for examples). I propose DEBUG as reporting > > >>> level. > > >>> > > > > > >>> > > > Richard, what competing proposals for emit-on-change for > > >>> aggregations > > >>> > > > do you mean? I have the feeling that we agreed to get rid of > > >>> > > > idempotent updates if the aggregate is updated with the same > key, > > >>> > > > value, AND timestamp. I am also fine if we do not include this > into > > >>> > > > this KIP (remember: baby steps). > > >>> > > > > > >>> > > > You write that "emit-on-change is more correct". Since we > agreed > > >>> that > > >>> > > > this is an optimization, IMO you cannot argue this way. > > >>> > > > > > >>> > > > Please put "Alternative Approaches" under "Rejected > Alternatives", > > >>> so > > >>> > > > that it becomes clear that we are not going to implement them. > In > > >>> > > > general, I think the KIP needs a bit of clean-up (probably, you > > >>> > > > already planned for it). "Design Reasoning" is a bit of > behavior > > >>> > > > changes, rejected alternatives and duplicates a bit the > content in > > >>> > > > those sections. > > >>> > > > > > >>> > > > I do not like the name "no-op operations" or "no-ops", because > they > > >>> > > > are rather generic. I like more "idempotent updates". > > >>> > > > > > >>> > > > Best, > > >>> > > > Bruno > > >>> > > > > > >>> > > > > > >>> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu < > > >>> [email protected]> wrote: > > >>> > > > > > > >>> > > > > Hi all, > > >>> > > > > > > >>> > > > > We are definitely making progress! > > >>> > > > > > > >>> > > > > @John should I emphasize in the proposed behavior changes > that > > >>> we are only > > >>> > > > > doing binary equality checks for stateful operators? > > >>> > > > > It looks like we have come close to finalizing this part of > the > > >>> KIP. (I > > >>> > > > > will note in the KIP that this proposal is intended for > > >>> optimization, not > > >>> > > > > semantics correctness) > > >>> > > > > > > >>> > > > > I do think maybe we still have one other detail we need to > > >>> discuss. So far, > > >>> > > > > there has been quite a bit of back and forth about what the > > >>> behavior of > > >>> > > > > aggregations should look like in emit on change. I have seen > > >>> > > > > multiple competing proposals, so I am not completely certain > > >>> which one we > > >>> > > > > should go with, or how we will be able to compromise in > between > > >>> them. > > >>> > > > > > > >>> > > > > Let me know what your thoughts are on this matter, since we > are > > >>> probably > > >>> > > > > close to wrapping up most other stuff. > > >>> > > > > @Matthias J. Sax <[email protected]> and @Bruno, see > what > > >>> you think > > >>> > > > > about this. > > >>> > > > > > > >>> > > > > Best, > > >>> > > > > Richard > > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler < > > >>> [email protected]> wrote: > > >>> > > > > > > >>> > > > > > Thanks, Matthias! > > >>> > > > > > > > >>> > > > > > Regarding numbers, it would be hard to know how many > > >>> applications > > >>> > > > > > would benefit, since we don't know how many applications > there > > >>> are, > > >>> > > > > > or anything about their data sets or topologies. We could > do a > > >>> survey, > > >>> > > > > > but it seems overkill if we take the conservative approach. > > >>> > > > > > > > >>> > > > > > I have my own practical stream processing experience that > > >>> tells me this > > >>> > > > > > is absolutely critical for any moderate-to-large relational > > >>> stream > > >>> > > > > > processing use cases. I'll leave it to you to decide if you > > >>> find that > > >>> > > > > > convincing, but it's definitely not an _assumption_. I've > also > > >>> heard from > > >>> > > > > > a few Streams users who have already had to implement > their own > > >>> > > > > > noop-suppression transformers in order to get to production > > >>> scale. > > >>> > > > > > > > >>> > > > > > Regardless, it sounds like we can agree on taking an > > >>> opportunistic approach > > >>> > > > > > and targeting the optimization just to use a > binary-equality > > >>> check at > > >>> > > > > > stateful operators. (I'd also suggest in sink nodes, when > we > > >>> are about to > > >>> > > > > > send old and new values, since they are also already > present > > >>> and serialized > > >>> > > > > > at that point.) We could make the KIP even more vague, and > > >>> just say that > > >>> > > > > > we'll drop no-op updates "when possible". > > >>> > > > > > > > >>> > > > > > I'm curious what Bruno and the others think about this. If > it > > >>> seems like > > >>> > > > > > a good starting point, perhaps we could move to a vote soon > > >>> and get to > > >>> > > > > > work on the implementation! > > >>> > > > > > > > >>> > > > > > Thanks, > > >>> > > > > > -John > > >>> > > > > > > > >>> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J. Sax wrote: > > >>> > > > > > > Talking about optimizations and reducing downstream load: > > >>> > > > > > > > > >>> > > > > > > Do we actually have any numbers? I have the impression > that > > >>> this KIP is > > >>> > > > > > > more or less build on the _assumption_ that there is a > > >>> problem. Yes, > > >>> > > > > > > there are some use cases that would benefit from this; > But > > >>> how many > > >>> > > > > > > applications would actually benefit? And how much load > > >>> reduction would > > >>> > > > > > > they get? > > >>> > > > > > > > > >>> > > > > > > The simplest approach (following John idea to make baby > > >>> steps) would be > > >>> > > > > > > to apply the emit-on-change pattern only if there is a > > >>> store. For this > > >>> > > > > > > case we need to serialize old and new result anyway and > thus > > >>> a simple > > >>> > > > > > > byte-array comparison is no overhead. > > >>> > > > > > > > > >>> > > > > > > Sending `oldValues` by default would become expensive > > >>> because we would > > >>> > > > > > > need to serialize the recomputed old result, as well as > the > > >>> new result, > > >>> > > > > > > to make the comparison (and we now the serialization is > not > > >>> cheap). We > > >>> > > > > > > are facing a trade-off between CPU overhead and > downstream > > >>> load and I am > > >>> > > > > > > not sure if we should hard code this. My original > argument > > >>> for sending > > >>> > > > > > > `oldValues` was about semantics; but for an > optimization, I > > >>> am not sure > > >>> > > > > > > if this would be the right choice. > > >>> > > > > > > > > >>> > > > > > > For now, users who want to opt-in can force a > > >>> materialization. A > > >>> > > > > > > materialization may be expensive and if we see future > > >>> demand, we could > > >>> > > > > > > still add an option to send `oldValues` instead of > > >>> materialization (this > > >>> > > > > > > would at least save the store overhead). As we consider > the > > >>> KIP an > > >>> > > > > > > optimization, a "config" seems to make sense. > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > -Matthias > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote: > > >>> > > > > > > > Hi John! > > >>> > > > > > > > > > >>> > > > > > > > Thanks for the reply. > > >>> > > > > > > > > > >>> > > > > > > > About the changes we have discussed so far. I think > upon > > >>> further > > >>> > > > > > > > consideration, we have been mostly talking about this > from > > >>> the > > >>> > > > > > perspective > > >>> > > > > > > > that no stop-gap effort is acceptable. However, in > recent > > >>> discussion, > > >>> > > > > > > if we > > >>> > > > > > > > consider optimization, then it appears that the > > >>> perspective I > > >>> > > > > > mentioned no > > >>> > > > > > > > longer applies. After all, we are no longer concerned > so > > >>> much about > > >>> > > > > > > > semantics correctness, then reducing traffic as much as > > >>> possible > > >>> > > > > > without > > >>> > > > > > > > performance tradeoffs. > > >>> > > > > > > > > > >>> > > > > > > > In this case, I think a cache would be a good idea for > > >>> stateless > > >>> > > > > > > > operations. This cache will not be backed by a store > > >>> obviously. We can > > >>> > > > > > > > probably use Kafka's ThreadCache. We should be able to > > >>> catch a large > > >>> > > > > > > > portion of the no-ops if we at least store some > results in > > >>> the cache. > > >>> > > > > > Not > > >>> > > > > > > > all will be caught, but I think the impact will be > > >>> significant. > > >>> > > > > > > > > > >>> > > > > > > > On another note, I think that we should implement > > >>> competing proposals > > >>> > > > > > i.e. > > >>> > > > > > > > one where we forward both old and new values with a > > >>> reasonable > > >>> > > > > > proportion > > >>> > > > > > > > of artificial no-ops (we do not necessarily have to > rely > > >>> on equals so > > >>> > > > > > much > > >>> > > > > > > > as comparing the serialized binary data after the > > >>> operation), and in > > >>> > > > > > > > another scenario, the cache for stateless ops. It > would be > > >>> > > > > > unreasonable if > > >>> > > > > > > > we completely disregard either approach, since they > both > > >>> have merit. > > >>> > > > > > The > > >>> > > > > > > > reason for implementing both is to perform benchmark > tests > > >>> on them, and > > >>> > > > > > > > compare them with the original. This way, we can more > > >>> clearly see what > > >>> > > > > > is > > >>> > > > > > > > the drawbacks and the gains. So far, we have been > > >>> discussing only > > >>> > > > > > > > hypotheticals, and if we continue to do so, I think it > is > > >>> likely no > > >>> > > > > > ground > > >>> > > > > > > > will be gained. > > >>> > > > > > > > > > >>> > > > > > > > After all, what we seek is optimization, and > performance > > >>> benchmarks > > >>> > > > > > > will be > > >>> > > > > > > > mandatory for a KIP of this nature. > > >>> > > > > > > > > > >>> > > > > > > > Hope this helps, > > >>> > > > > > > > Richard > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John Roesler < > > >>> [email protected]> > > >>> > > > > > wrote: > > >>> > > > > > > > > > >>> > > > > > > >> Hi again, all, > > >>> > > > > > > >> > > >>> > > > > > > >> Sorry on my part for my silence. > > >>> > > > > > > >> > > >>> > > > > > > >> I've just taken another look over the recent history > of > > >>> this > > >>> > > > > > > discussion. It > > >>> > > > > > > >> seems like the #1 point to clarify (because it affect > > >>> everything > > >>> > > > > > else) is > > >>> > > > > > > >> that, > > >>> > > > > > > >> yes, I was 100% envisioning this as an _optimization_. > > >>> > > > > > > >> > > >>> > > > > > > >> As a consequence, I don't think it's critical to make > any > > >>> hard > > >>> > > > > > guarantees > > >>> > > > > > > >> about > > >>> > > > > > > >> what results get forwarded and what (no-op updates) > get > > >>> dropped. I'd > > >>> > > > > > > >> initially > > >>> > > > > > > >> just been thinking about doing this opportunistically > in > > >>> cases where > > >>> > > > > > we > > >>> > > > > > > >> already > > >>> > > > > > > >> had the "old" and "new" result in memory, thanks to a > > >>> request to > > >>> > > > > > > "emit old > > >>> > > > > > > >> values", or to the implementation of timestamp > semantics. > > >>> > > > > > > >> > > >>> > > > > > > >> However, whether or not it's semantically critical, I > do > > >>> think that > > >>> > > > > > > >> Matthias's > > >>> > > > > > > >> idea to use the change-forwarding mechanism to check > for > > >>> no-ops even > > >>> > > > > > on > > >>> > > > > > > >> stateless operations is pretty interesting. > Specifically, > > >>> this would > > >>> > > > > > > >> _really_ > > >>> > > > > > > >> let you pare down useless updates by using mapValues > to > > >>> strip down > > >>> > > > > > > records > > >>> > > > > > > >> only > > >>> > > > > > > >> to what you really need. However, the dependence on > the > > >>> > > > > > implementation of > > >>> > > > > > > >> equals() is troubling. > > >>> > > > > > > >> > > >>> > > > > > > >> It might make sense to table this idea, as well as my > > >>> complicated > > >>> > > > > > no-op > > >>> > > > > > > >> detection algorithm, and initially propose just a > > >>> nonconfigurable > > >>> > > > > > feature > > >>> > > > > > > >> to > > >>> > > > > > > >> check "old" and "new" results for binary equality > before > > >>> forwarding. > > >>> > > > > > > I.e., > > >>> > > > > > > >> if > > >>> > > > > > > >> any operation determines that the old and new results > are > > >>> > > > > > > >> binary-identical, we > > >>> > > > > > > >> would not forward. > > >>> > > > > > > >> > > >>> > > > > > > >> I'll admit that this doesn't serve Tommy's use case > very > > >>> well, but it > > >>> > > > > > > >> might be > > >>> > > > > > > >> better to take baby steps with an optimization like > this > > >>> and not risk > > >>> > > > > > > >> over-reaching in a way that actually harms > performance or > > >>> > > > > > correctness. We > > >>> > > > > > > >> could > > >>> > > > > > > >> always expand the feature to use equals() or some > kind of > > >>> > > > > > ChangeDetector > > >>> > > > > > > >> later > > >>> > > > > > > >> on, in a more focused discussion. > > >>> > > > > > > >> > > >>> > > > > > > >> Regarding metrics or debug logs, I guess I don't feel > > >>> strongly, but it > > >>> > > > > > > >> feels > > >>> > > > > > > >> like two things will happen that make it nicer to add > > >>> them: > > >>> > > > > > > >> > > >>> > > > > > > >> 1. This feature is going to surprise/annoy _somebody_, > > >>> and it would be > > >>> > > > > > > >> nice to > > >>> > > > > > > >> be able to definitively say the reason that updates > are > > >>> dropped is > > >>> > > > > > that > > >>> > > > > > > >> they > > >>> > > > > > > >> were no-ops. The easiest smoking gun is if there are > > >>> debug-logs that > > >>> > > > > > > can be > > >>> > > > > > > >> enabled. This person might just be looking at the > > >>> dashboards, > > >>> > > > > > > wondering why > > >>> > > > > > > >> there are 100K updates per second going into their > app, > > >>> but only 1K > > >>> > > > > > > >> results per > > >>> > > > > > > >> second coming out. Having the metric there makes the > > >>> accounting > > >>> > > > > > easier. > > >>> > > > > > > >> > > >>> > > > > > > >> 2. Somebody is going to struggle with high-volume > > >>> updates, and it > > >>> > > > > > > would be > > >>> > > > > > > >> nice > > >>> > > > > > > >> for them to know that this feature is saving them > > >>> X-thousand updates > > >>> > > > > > per > > >>> > > > > > > >> second, > > >>> > > > > > > >> etc. > > >>> > > > > > > >> > > >>> > > > > > > >> What does everyone think about this? Note, as I read > it, > > >>> what I've > > >>> > > > > > said > > >>> > > > > > > >> above is > > >>> > > > > > > >> already reflected in the text of the KIP. > > >>> > > > > > > >> > > >>> > > > > > > >> Thanks, > > >>> > > > > > > >> -John > > >>> > > > > > > >> > > >>> > > > > > > >> > > >>> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard Yu wrote: > > >>> > > > > > > >>> Hi all, > > >>> > > > > > > >>> > > >>> > > > > > > >>> Bumping this. If you feel that this KIP is not too > > >>> urgent. Then let > > >>> > > > > > me > > >>> > > > > > > >>> know. :) > > >>> > > > > > > >>> > > >>> > > > > > > >>> Cheers, > > >>> > > > > > > >>> Richard > > >>> > > > > > > >>> > > >>> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard Yu < > > >>> > > > > > [email protected]> > > >>> > > > > > > >>> wrote: > > >>> > > > > > > >>> > > >>> > > > > > > >>>> Hi all, > > >>> > > > > > > >>>> > > >>> > > > > > > >>>> I've had just a few thoughts regarding the > forwarding > > >>> of <key, > > >>> > > > > > > >>>> change<old_value, new_value>>. As Matthias already > > >>> mentioned, there > > >>> > > > > > > >> are two > > >>> > > > > > > >>>> separate priorities by which we can judge this KIP: > > >>> > > > > > > >>>> > > >>> > > > > > > >>>> 1. A optimization perspective: In this case, the > user > > >>> would prefer > > >>> > > > > > the > > >>> > > > > > > >>>> impact of this KIP to be as minimal as possible. By > > >>> such logic, if > > >>> > > > > > > >>>> stateless operations are performed twice, that could > > >>> prove > > >>> > > > > > > >> unacceptable for > > >>> > > > > > > >>>> them. (since operations can prove expensive) > > >>> > > > > > > >>>> > > >>> > > > > > > >>>> 2. Semantics correctness perspective: Unlike the > > >>> optimization > > >>> > > > > > > >> approach, we > > >>> > > > > > > >>>> are more concerned with all KTable operations > obeying > > >>> the same > > >>> > > > > > emission > > >>> > > > > > > >>>> policy. i.e. emit on change. In this case, a > > >>> discrepancy would not > > >>> > > > > > be > > >>> > > > > > > >>>> tolerated, even though an extra performance cost > will > > >>> be incurred. > > >>> > > > > > > >>>> Therefore, we will follow Matthias's approach, and > then > > >>> perform the > > >>> > > > > > > >>>> operation once on the old value, and once on the > new. > > >>> > > > > > > >>>> > > >>> > > > > > > >>>> The issue here I think is more black and white than > in > > >>> between. The > > >>> > > > > > > >> second > > >>> > > > > > > >>>> option in particular would be favorable for users > with > > >>> inexpensive > > >>> > > > > > > >>>> stateless operations, while for the former option, > we > > >>> are probably > > >>> > > > > > > >> dealing > > >>> > > > > > > >>>> with more expensive ones. So the simplest solution > is > > >>> probably to > > >>> > > > > > > >> allow the > > >>> > > > > > > >>>> user to choose one of the behaviors, and have a > config > > >>> which can > > >>> > > > > > > >> switch in > > >>> > > > > > > >>>> between them. > > >>> > > > > > > >>>> > > >>> > > > > > > >>>> Its the simplest compromise I can come up with at > the > > >>> moment, but if > > >>> > > > > > > >> you > > >>> > > > > > > >>>> think you have a better plan which could better > balance > > >>> tradeoffs. > > >>> > > > > > Then > > >>> > > > > > > >>>> please let us know. :) > > >>> > > > > > > >>>> > > >>> > > > > > > >>>> Best, > > >>> > > > > > > >>>> Richard > > >>> > > > > > > >>>> > > >>> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John Roesler < > > >>> [email protected]> > > >>> > > > > > > >> wrote: > > >>> > > > > > > >>>> > > >>> > > > > > > >>>>> Hi all, > > >>> > > > > > > >>>>> > > >>> > > > > > > >>>>> Thanks for the thoughtful comments! > > >>> > > > > > > >>>>> > > >>> > > > > > > >>>>> I need more time to reflect on your thoughts, but > just > > >>> wanted to > > >>> > > > > > offer > > >>> > > > > > > >>>>> a quick clarification about equals(). > > >>> > > > > > > >>>>> > > >>> > > > > > > >>>>> I only meant that we can't be sure if a class's > > >>> equals() > > >>> > > > > > > >> implementation > > >>> > > > > > > >>>>> returns true for two semantically identical > instances. > > >>> I.e., if a > > >>> > > > > > > >> class > > >>> > > > > > > >>>>> doesn't > > >>> > > > > > > >>>>> override the default equals() implementation, then > we > > >>> would see > > >>> > > > > > > >> behavior > > >>> > > > > > > >>>>> like: > > >>> > > > > > > >>>>> > > >>> > > > > > > >>>>> new MyPair("A", 1).equals(new MyPair("A", 1)) > returns > > >>> false > > >>> > > > > > > >>>>> > > >>> > > > > > > >>>>> In that case, I would still like to catch no-op > > >>> updates by > > >>> > > > > > comparing > > >>> > > > > > > >> the > > >>> > > > > > > >>>>> serialized form of the records when we happen to > have > > >>> it serialized > > >>> > > > > > > >> anyway > > >>> > > > > > > >>>>> (such as when the operation is stateful, or when > we're > > >>> sending to a > > >>> > > > > > > >>>>> repartition topic and we have both the "new" and > "old" > > >>> value from > > >>> > > > > > > >>>>> upstream). > > >>> > > > > > > >>>>> > > >>> > > > > > > >>>>> I didn't mean to suggest we'd try to use > reflection to > > >>> detect > > >>> > > > > > whether > > >>> > > > > > > >>>>> equals > > >>> > > > > > > >>>>> is implemented, although that is a neat trick. I > was > > >>> thinking more > > >>> > > > > > of > > >>> > > > > > > >> a > > >>> > > > > > > >>>>> belt-and-suspenders algorithm where we do the check > > >>> for no-ops > > >>> > > > > > based > > >>> > > > > > > >> on > > >>> > > > > > > >>>>> equals() and then _also_ check the serialized bytes > > >>> for equality. > > >>> > > > > > > >>>>> > > >>> > > > > > > >>>>> Thanks, > > >>> > > > > > > >>>>> -John > > >>> > > > > > > >>>>> > > >>> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu wrote: > > >>> > > > > > > >>>>>> Thanks for the comments, Matthias. > > >>> > > > > > > >>>>>> > > >>> > > > > > > >>>>>> w.r.t. requirement of an `equals()` > implementation, > > >>> each template > > >>> > > > > > > >> type > > >>> > > > > > > >>>>>> would have an equals() method. We can use the > > >>> following code to > > >>> > > > > > know > > >>> > > > > > > >>>>>> whether it is provided by JVM or provided by user. > > >>> > > > > > > >>>>>> > > >>> > > > > > > >>>>>> boolean customEquals = false; > > >>> > > > > > > >>>>>> try { > > >>> > > > > > > >>>>>> Class cls = > value.getClass().getMethod("equals", > > >>> > > > > > > >>>>>> Object.class).getDeclaringClass(); > > >>> > > > > > > >>>>>> if (!Object.class.equals(cls)) { > > >>> > > > > > > >>>>>> customEquals = true; > > >>> > > > > > > >>>>>> } > > >>> > > > > > > >>>>>> } catch (NoSuchMethodException nsme) { > > >>> > > > > > > >>>>>> // equals is always defined, this wouldn't hit > > >>> > > > > > > >>>>>> } > > >>> > > > > > > >>>>>> > > >>> > > > > > > >>>>>> The next question is: what if the user doesn't > > >>> provide equals() > > >>> > > > > > > >> method ? > > >>> > > > > > > >>>>>> Would we automatically fall back to > emit-on-update ? > > >>> > > > > > > >>>>>> > > >>> > > > > > > >>>>>> Cheers > > >>> > > > > > > >>>>>> > > >>> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM Matthias J. Sax < > > >>> [email protected]> > > >>> > > > > > > >>>>> wrote: > > >>> > > > > > > >>>>>> > > >>> > > > > > > > First a high level comment: > > >>> > > > > > > > > > >>> > > > > > > > Overall, I would like to make one step back, and make > sure > > >>> we are > > >>> > > > > > > > discussion on the same level. Originally, I understood > > >>> this KIP > > >>> > > > > > > >>> as a > > >>> > > > > > > > proposed change of _semantics_, however, given the > latest > > >>> > > > > > > >>> discussion > > >>> > > > > > > > it seems it's actually not -- it's more an > _optimization_ > > >>> > > > > > > >>> proposal. > > >>> > > > > > > > Hence, we only need to make sure that this optimization > > >>> does not > > >>> > > > > > > >>> break > > >>> > > > > > > > existing semantics. It this the right way to think > about > > >>> it? > > >>> > > > > > > > > > >>> > > > > > > > If yes, than it might actually be ok to have different > > >>> behavior > > >>> > > > > > > > depending if there is a materialized KTable or not. So > > >>> far, we > > >>> > > > > > > >>> never > > >>> > > > > > > > defined a public contract about our emit strategy and > it > > >>> seems > > >>> > > > > > > >>> this > > >>> > > > > > > > KIP does not define one either. > > >>> > > > > > > > > > >>> > > > > > > > Hence, I don't have as strong of an opinion about > sending > > >>> > > > > > > >>> oldValues > > >>> > > > > > > > for example any longer. I guess the question is really, > > >>> what can > > >>> > > > > > > >>> we > > >>> > > > > > > > implement in a reasonable way. > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > Other comments: > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > @Richard: > > >>> > > > > > > > > > >>> > > > > > > > Can you please add the KIP to the KIP overview table: > It's > > >>> missing > > >>> > > > > > > > ( > > >>> > > > > > > >>>>>> > > >>> > > > > > > >>> > > >>> > > > > > > > >>> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro > > >>> > > > > > > > posals). > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > @Bruno: > > >>> > > > > > > > > > >>> > > > > > > > You mentioned caching. I think it's irrelevant > > >>> (orthogonal) and > > >>> > > > > > > >>> we can > > >>> > > > > > > > discuss this KIP without considering it. > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > @John: > > >>> > > > > > > > > > >>> > > > > > > >>>>>>>>> Even in the source table, we forward the > updated > > >>> record with > > >>> > > > > > the > > >>> > > > > > > >>>>>>>>> higher of the two timestamps. So the example is > > >>> more like: > > >>> > > > > > > > > > >>> > > > > > > > That is not correct. Currently, we forward with the > smaller > > >>> > > > > > > > out-of-order timestamp (changing the timestamp would > > >>> corrupt the > > >>> > > > > > > >>> data > > >>> > > > > > > > -- we don't know, because we don't check, if the value > is > > >>> the > > >>> > > > > > > >>> same > > >>> > > > > > > >>>>>> or > > >>> > > > > > > > a different one, hence, we must emit the out-of-order > > >>> record > > >>> > > > > > > >>> as-is). > > >>> > > > > > > > > > >>> > > > > > > > If we start to do emit-on-change, we also need to emit > a > > >>> new > > >>> > > > > > > >>> record if > > >>> > > > > > > > the timestamp changes due to out-of-order data, hence, > we > > >>> would > > >>> > > > > > > >>> still > > >>> > > > > > > > need to emit <K,V,T1> because that give us correct > > >>> semantics: > > >>> > > > > > > >>> assume > > >>> > > > > > > > you have a filter() and afterward use the filter > KTable in > > >>> a > > >>> > > > > > > > stream-table join -- the lower T1 timestamp must be > > >>> propagated to > > >>> > > > > > > >>> the > > >>> > > > > > > > filtered KTable to ensure that that the stream-table > join > > >>> compute > > >>> > > > > > > >>> the > > >>> > > > > > > > correct result. > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > Your point about requiring an `equals()` > implementation is > > >>> > > > > > > >>> actually a > > >>> > > > > > > > quite interesting one and boils down to my statement > from > > >>> above > > >>> > > > > > > >>> about > > >>> > > > > > > > "what can we actually implement". What I don't > understand > > >>> is: > > >>> > > > > > > > > > >>> > > > > > > >>>>>>>>> This way, we still don't have to rely on the > > >>> existence of an > > >>> > > > > > > >>>>>>>>> equals() method, but if it is there, we can > > >>> benefit from it. > > >>> > > > > > > > > > >>> > > > > > > > Your bullet point (2) says it uses `equals()` -- > hence, it > > >>> seems > > >>> > > > > > > >>> we > > >>> > > > > > > > actually to rely on it? Also, how can we detect if > there > > >>> is an > > >>> > > > > > > > `equals()` method to do the comparison? Would be fail > if > > >>> we don't > > >>> > > > > > > >>> have > > >>> > > > > > > > `equals()` nor corresponding serializes to do the > > >>> comparison? > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we absolutely need > > >>> metrics and > > >>> > > > > > > >>> logs if > > >>> > > > > > > >>>>>>>>> we're going to drop any records. And, yes, we > > >>> should propose > > >>> > > > > > > >>>>>>>>> metrics and logs that are similar to the > existing > > >>> ones when we > > >>> > > > > > > >>> drop > > >>> > > > > > > >>>>>>>>> records for other reasons. > > >>> > > > > > > > > > >>> > > > > > > > I am not sure about this point. In fact, we have > already > > >>> some > > >>> > > > > > > >>> no-ops > > >>> > > > > > > > in Kafka Streams in our join-operators and don't report > > >>> any of > > >>> > > > > > > >>> those > > >>> > > > > > > > either. Emit-on-change is operator semantics and I > don't > > >>> see why > > >>> > > > > > > >>> we > > >>> > > > > > > > would need to have a metric for it? It seems to be > quite > > >>> different > > >>> > > > > > > > compared to dropping late or malformed records. > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > -Matthias > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote: > > >>> > > > > > > >>>>>>>>> Thanks John for your thoughtful reply. Some > > >>> comments inline. > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 -0600, John Roesler > > >>> wrote: > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was > sent > > >>> from outside > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments > > >>> unless you > > >>> > > > > > expected > > >>> > > > > > > >>>>>>>>>> them. ________________________________ > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> Hi Tommy, > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> Thanks for the context. I can see the > attraction > > >>> of > > >>> > > > > > considering > > >>> > > > > > > >>>>>>>>>> these use cases together. > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> To answer your question, if a part of the > record > > >>> is not > > >>> > > > > > > >>> relevant > > >>> > > > > > > >>>>>>>>>> to downstream consumers, I was thinking you > could > > >>> just use a > > >>> > > > > > > >>>>>>>>>> mapValue to remove it. > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a join between > two > > >>> tables. > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> employeeInfo.join( employeePayroll, (info, > > >>> payroll) -> new > > >>> > > > > > > >>>>>>>>>> Result(info.name(), payroll.salary()) ) > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> We only care about one attribute from the Info > > >>> table (name), > > >>> > > > > > > >>> and > > >>> > > > > > > >>>>>>>>>> one from the Payroll table (salary), and these > > >>> attributes > > >>> > > > > > > >>> change > > >>> > > > > > > >>>>>>>>>> rarely. On the other hand, there might be many > > >>> other > > >>> > > > > > attributes > > >>> > > > > > > >>>>>>>>>> that change frequently of these tables. We can > > >>> avoid > > >>> > > > > > triggering > > >>> > > > > > > >>>>>>>>>> the join unnecessarily by mapping the input > > >>> tables to drop the > > >>> > > > > > > >>>>>>>>>> unnecessary information before the join: > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> names = employeeInfo.mapValues(info -> > info.name()) > > >>> salaries > > >>> > > > > > = > > >>> > > > > > > >>>>>>>>>> employeePayroll.mapValues(payroll -> > > >>> payroll.salary()) > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> names.join( salaries, (name, salary) -> new > > >>> Result(name, > > >>> > > > > > > >>> salary) > > >>> > > > > > > >>>>>>>>>> ) > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> Ahh yes I see. This works, but in the case > where > > >>> you're using > > >>> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it seems like > this > > >>> approach > > >>> > > > > > could > > >>> > > > > > > >>>>>>>>> lead to a proliferation of "skinny" record > types > > >>> that just drop > > >>> > > > > > > >>>>>>>>> various fields. > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> Especially if we take Matthias's idea to drop > > >>> non-changes even > > >>> > > > > > > >>>>>>>>>> for stateless operations, this would be quite > > >>> efficient and is > > >>> > > > > > > >>>>>>>>>> also a very straightforward optimization to > > >>> understand once > > >>> > > > > > you > > >>> > > > > > > >>>>>>>>>> know that Streams provides emit-on-change. > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> From the context that you provided, it seems > like > > >>> a slightly > > >>> > > > > > > >>>>>>>>>> different situation, though. Reading between > the > > >>> lines a > > >>> > > > > > > >>> little, > > >>> > > > > > > >>>>>>>>>> it sounds like: in contrast to the example > above, > > >>> in which we > > >>> > > > > > > >>> are > > >>> > > > > > > >>>>>>>>>> filtering out extra _data_, you have some > extra > > >>> _metadata_ > > >>> > > > > > that > > >>> > > > > > > >>>>>>>>>> you still wish to pass down with the data when > > >>> there is a > > >>> > > > > > > >>> "real" > > >>> > > > > > > >>>>>>>>>> update, but you don't want the metadata > itself to > > >>> cause an > > >>> > > > > > > >>>>>>>>>> update. > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> Despite my lack of clarity, yes you've got it > > >>> right ;) This > > >>> > > > > > > >>>>>>>>> particular processor is the first stop for this > > >>> data after > > >>> > > > > > > >>> coming > > >>> > > > > > > >>>>>>>>> in from external users, who often simply post > the > > >>> same content > > >>> > > > > > > >>> each > > >>> > > > > > > >>>>>>>>> time and we're trying to shield downstream > > >>> consumers from > > >>> > > > > > > >>>>>>>>> unnecessary churn. > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> It does seem handy to be able to plug in a > custom > > >>> > > > > > > >>> ChangeDetector > > >>> > > > > > > >>>>>>>>>> for this purpose, but I worry about the API > > >>> complexity. Maybe > > >>> > > > > > > >>> you > > >>> > > > > > > >>>>>>>>>> can help think though how to provide the same > > >>> benefit while > > >>> > > > > > > >>>>>>>>>> limiting user-facing complexity. > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> Here's some extra context to consider: > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> We currently don't make any extra requirements > > >>> about the > > >>> > > > > > nature > > >>> > > > > > > >>>>>>>>>> of data that you can use in Streams. For > example, > > >>> you don't > > >>> > > > > > > >>> have > > >>> > > > > > > >>>>>>>>>> to implement hashCode and equals, or > compareTo, > > >>> etc. With the > > >>> > > > > > > >>>>>>>>>> current proposal, we can do an airtight > > >>> comparison based only > > >>> > > > > > > >>> on > > >>> > > > > > > >>>>>>>>>> the serialized form of the values, and we > > >>> actually don't have > > >>> > > > > > > >>> to > > >>> > > > > > > >>>>>>>>>> deserialize the "prior" value at all for a > large > > >>> number of > > >>> > > > > > > >>>>>>>>>> operations. Admitedly, if we extend the > proposal > > >>> to include > > >>> > > > > > > >>> no-op > > >>> > > > > > > >>>>>>>>>> detection for stateless operations, we'd > probably > > >>> need to rely > > >>> > > > > > > >>> on > > >>> > > > > > > >>>>>>>>>> equals() for no-op checking, otherwise we'd > wind > > >>> up requiring > > >>> > > > > > > >>>>>>>>>> serdes for stateless operations as well. > > >>> Actually, I'd > > >>> > > > > > probably > > >>> > > > > > > >>>>>>>>>> argue for doing exactly that: > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> 1. In stateful operations, drop if the > serialized > > >>> byte[]s are > > >>> > > > > > > >>> the > > >>> > > > > > > >>>>>>>>>> same. After deserializing, also drop if the > > >>> objects are equal > > >>> > > > > > > >>>>>>>>>> according to Object#equals(). > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> 2. In stateless operations, compare the "new" > and > > >>> "old" values > > >>> > > > > > > >>>>>>>>>> (if "old" is available) based on > Object#equals(). > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> 3. As a final optimization, after serializing > and > > >>> before > > >>> > > > > > > >>> sending > > >>> > > > > > > >>>>>>>>>> repartition records, compare the serialized > data > > >>> and drop > > >>> > > > > > > >>>>>>>>>> no-ops. > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> This way, we still don't have to rely on the > > >>> existence of an > > >>> > > > > > > >>>>>>>>>> equals() method, but if it is there, we can > > >>> benefit from it. > > >>> > > > > > > >>>>>>>>>> Also, we don't require a serde in any new > > >>> situations, but we > > >>> > > > > > > >>> can > > >>> > > > > > > >>>>>>>>>> still leverage it when it is available. > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> For clarity, in my example above, even if the > > >>> employeeInfo and > > >>> > > > > > > >>>>>>>>>> employeePayroll and Result records all have > > >>> serdes, we need > > >>> > > > > > the > > >>> > > > > > > >>>>>>>>>> "name" field (presumably String) and the > "salary" > > >>> field > > >>> > > > > > > >>>>>>>>>> (presumable a Double) to have serdes as well > in > > >>> the naive > > >>> > > > > > > >>>>>>>>>> implementation. But if we can leverage > equals(), > > >>> then the > > >>> > > > > > > >>> "right > > >>> > > > > > > >>>>>>>>>> thing" happens automatically. > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> I still don't totally follow why the individual > > >>> components > > >>> > > > > > > >>> (name, > > >>> > > > > > > >>>>>>>>> salary) would have to have serdes here. If > Result > > >>> has one, we > > >>> > > > > > > >>>>>>>>> compare bytes, and if Result additionally has > an > > >>> equals() > > >>> > > > > > method > > >>> > > > > > > >>>>>>>>> (which presumably includes equals comparisons > on > > >>> the > > >>> > > > > > constituent > > >>> > > > > > > >>>>>>>>> fields), have we not covered our bases? > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> This dovetails in with my primary UX concern; > > >>> where would the > > >>> > > > > > > >>>>>>>>>> ChangeDetector actually be registered? None of > > >>> the operators > > >>> > > > > > in > > >>> > > > > > > >>>>>>>>>> my example have names or topics or any other > > >>> identifiable > > >>> > > > > > > >>>>>>>>>> characteristic that could be passed to a > > >>> ChangeDetector class > > >>> > > > > > > >>>>>>>>>> registered via config. You could say that we > make > > >>> > > > > > > >>> ChangeDetector > > >>> > > > > > > >>>>>>>>>> an optional parameter to every operation in > > >>> Streams, but this > > >>> > > > > > > >>>>>>>>>> seems to carry quite a bit of mental burden > with > > >>> it. People > > >>> > > > > > > >>> will > > >>> > > > > > > >>>>>>>>>> wonder what it's for and whether or not they > > >>> should be using > > >>> > > > > > > >>> it. > > >>> > > > > > > >>>>>>>>>> There would almost certainly be a > misconception > > >>> that it's > > >>> > > > > > > >>>>>>>>>> preferable to implement it always, which > would be > > >>> unfortunate. > > >>> > > > > > > >>>>>>>>>> Plus, to actually implment metadata flowing > > >>> through the > > >>> > > > > > > >>> topology > > >>> > > > > > > >>>>>>>>>> as in your use case, you'd have to do two > things: > > >>> 1. make sure > > >>> > > > > > > >>>>>>>>>> that all operations actually preserve the > > >>> metadata alongside > > >>> > > > > > > >>> the > > >>> > > > > > > >>>>>>>>>> data (e.g., don't accidentally add a mapValues > > >>> like I did, or > > >>> > > > > > > >>> you > > >>> > > > > > > >>>>>>>>>> drop the metadata). 2. implement a > ChangeDetector > > >>> for every > > >>> > > > > > > >>>>>>>>>> single operation in the topology, or you don't > > >>> get the benefit > > >>> > > > > > > >>> of > > >>> > > > > > > >>>>>>>>>> dropping non-changes internally 2b. > > >>> Alternatively, you could > > >>> > > > > > > >>> just > > >>> > > > > > > >>>>>>>>>> add the ChangeDetector to one operation toward > > >>> the end of the > > >>> > > > > > > >>>>>>>>>> topology. This would not drop redundant > > >>> computation > > >>> > > > > > internally, > > >>> > > > > > > >>>>>>>>>> but only drop redundant _outputs_. But this is > > >>> just about the > > >>> > > > > > > >>>>>>>>>> same as your current solution. > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> I definitely see your point regarding > > >>> configuration. I was > > >>> > > > > > > >>>>>>>>> originally thinking about this when the > > >>> deduplication was going > > >>> > > > > > > >>> to > > >>> > > > > > > >>>>>>>>> be opt-in, and it seemed very natural to say > > >>> something like: > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> employeeInfo.join(employeePayroll, (info, > payroll) > > >>> -> new > > >>> > > > > > > >>>>>>>>> Result(info.name(), payroll.salary())) > > >>> > > > > > > >>>>>>>>> > > >>> .suppress(duplicatesAccordingTo(someChangeDetector)) > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> Alternatively you can imagine a similar method > > >>> being on > > >>> > > > > > > >>>>>>>>> Materialized, though obviously this makes less > > >>> sense if we > > >>> > > > > > don't > > >>> > > > > > > >>>>>>>>> want to require materialization. If we're now > > >>> talking about > > >>> > > > > > > >>>>>>>>> changing the default behavior and not having > any > > >>> configuration > > >>> > > > > > > >>>>>>>>> options, it's harder to find a place for this. > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>>> A final thought; if it really is a metadata > > >>> question, can we > > >>> > > > > > > >>> just > > >>> > > > > > > >>>>>>>>>> plan to finish up the support for headers in > > >>> Streams? I.e., > > >>> > > > > > > >>> give > > >>> > > > > > > >>>>>>>>>> you a way to control the way that headers flow > > >>> through the > > >>> > > > > > > >>>>>>>>>> topology? Then, we could treat headers the > same > > >>> way we treat > > >>> > > > > > > >>>>>>>>>> timestamps in the no-op checking... We > completely > > >>> ignore them > > >>> > > > > > > >>>>>>>>>> for the sake of comparison. Thus, neither the > > >>> timestamp nor > > >>> > > > > > the > > >>> > > > > > > >>>>>>>>>> headers would get updated in internal state > or in > > >>> downstream > > >>> > > > > > > >>>>>>>>>> views as long as the value itself doesn't > change. > > >>> This seems > > >>> > > > > > to > > >>> > > > > > > >>>>>>>>>> give us a way to support your use case without > > >>> adding to the > > >>> > > > > > > >>>>>>>>>> mental overhead of using Streams for simple > > >>> things. > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> Agree headers could be a decent fit for this > > >>> particular case > > >>> > > > > > > >>>>>>>>> because it's mostly metadata, though to be > honest > > >>> we haven't > > >>> > > > > > > >>> looked > > >>> > > > > > > >>>>>>>>> at headers much (mostly because, and to your > > >>> point, support > > >>> > > > > > > >>> seems > > >>> > > > > > > >>>>>>>>> to be lacking). I feel like there would be > other > > >>> cases where > > >>> > > > > > > >>> this > > >>> > > > > > > >>>>>>>>> feature could be valuable, but I admit I can't > > >>> come up with > > >>> > > > > > > >>>>>>>>> anything right this second. Perhaps yuzhihong > had > > >>> an example in > > >>> > > > > > > >>>>>>>>> mind? > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> I.e., simple things should be easy, and > complex > > >>> things should > > >>> > > > > > > >>> be > > >>> > > > > > > >>>>>>>>>> possible. > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> What are your thoughts? Thanks, -John > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19, Thomas Becker > > >>> wrote: > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> Hi John, Can you describe how you'd use > > >>> filtering/mapping to > > >>> > > > > > > >>>>>>>>>> deduplicate records? To give some background > on > > >>> my suggestion > > >>> > > > > > > >>> we > > >>> > > > > > > >>>>>>>>>> currently have a small stream processor that > > >>> exists solely to > > >>> > > > > > > >>>>>>>>>> deduplicate, which we do using a process that > I > > >>> assume would > > >>> > > > > > be > > >>> > > > > > > >>>>>>>>>> similar to what would be done here (with a > store > > >>> of keys and > > >>> > > > > > > >>> hash > > >>> > > > > > > >>>>>>>>>> values). But the records we are deduplicating > > >>> have some > > >>> > > > > > > >>> metadata > > >>> > > > > > > >>>>>>>>>> fields (such as timestamps of when the record > was > > >>> posted) that > > >>> > > > > > > >>> we > > >>> > > > > > > >>>>>>>>>> don't consider semantically meaningful for > > >>> downstream > > >>> > > > > > > >>> consumers, > > >>> > > > > > > >>>>>>>>>> and therefore we also suppress updates that > only > > >>> touch those > > >>> > > > > > > >>>>>>>>>> fields. > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> -Tommy > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30 -0600, John > Roesler > > >>> wrote: > > >>> > > > > > > >>> [EXTERNAL > > >>> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was sent from > > >>> outside TiVo. DO > > >>> > > > > > NOT > > >>> > > > > > > >>>>>>>>>> CLICK any links or attachments unless you > > >>> expected them. > > >>> > > > > > > >>>>>>>>>> ________________________________ > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s an interesting > > >>> idea. Can you > > >>> > > > > > > >>> help > > >>> > > > > > > >>>>>>>>>> think of a use case that isn’t also served by > > >>> filtering or > > >>> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for helping to > design > > >>> this feature! > > >>> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at 18:56, > > >>> [email protected] > > >>> > > > > > > >>>>>>>>>> <mailto:[email protected]> wrote: I think > this > > >>> is good > > >>> > > > > > > >>> idea. On > > >>> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas Becker < > > >>> > > > > > > >>> [email protected] > > >>> > > > > > > >>>>>>>>>> <mailto:[email protected]>> wrote: How > do > > >>> folks feel > > >>> > > > > > > >>> about > > >>> > > > > > > >>>>>>>>>> allowing the mechanism by which no-ops are > > >>> detected to be > > >>> > > > > > > >>>>>>>>>> pluggable? Meaning use something like a hash > by > > >>> default, but > > >>> > > > > > > >>> you > > >>> > > > > > > >>>>>>>>>> could optionally provide an implementation of > > >>> something to use > > >>> > > > > > > >>>>>>>>>> instead, like a ChangeDetector. This could be > > >>> useful for > > >>> > > > > > > >>> example > > >>> > > > > > > >>>>>>>>>> to ignore changes to certain fields, which may > > >>> not be relevant > > >>> > > > > > > >>> to > > >>> > > > > > > >>>>>>>>>> the operation being performed. > > >>> > > > > > ________________________________ > > >>> > > > > > > >>>>>>>>>> From: John Roesler <[email protected] > > >>> > > > > > > >>>>>>>>>> <mailto:[email protected]>> Sent: Friday, > > >>> January 31, 2020 > > >>> > > > > > > >>> 4:51 > > >>> > > > > > > >>>>>>>>>> PM To: [email protected] <mailto: > > >>> [email protected]> > > >>> > > > > > > >>>>>>>>>> <[email protected] <mailto: > > >>> [email protected]>> Subject: > > >>> > > > > > > >>> Re: > > >>> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change support for > Kafka > > >>> Streams > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was > sent > > >>> from outside > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments > > >>> unless you > > >>> > > > > > expected > > >>> > > > > > > >>>>>>>>>> them. ________________________________ > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> Hello all, Sorry for my silence. It seems > like we > > >>> are getting > > >>> > > > > > > >>>>>>>>>> close to consensus. Hopefully, we could move > to a > > >>> vote soon! > > >>> > > > > > > >>> All > > >>> > > > > > > >>>>>>>>>> of the reasoning from Matthias and Bruno > around > > >>> timestamp is > > >>> > > > > > > >>>>>>>>>> compelling. I would be strongly in favor of > > >>> stating a few > > >>> > > > > > > >>> things > > >>> > > > > > > >>>>>>>>>> very clearly in the KIP: 1. Streams will drop > > >>> no-op updates > > >>> > > > > > > >>> only > > >>> > > > > > > >>>>>>>>>> for KTable operations. That is, we won't make > any > > >>> changes to > > >>> > > > > > > >>>>>>>>>> KStream aggregations at the moment. It does > seem > > >>> like we can > > >>> > > > > > > >>>>>>>>>> potentially revisit the time semantics of that > > >>> operation in > > >>> > > > > > the > > >>> > > > > > > >>>>>>>>>> future, but we don't need to do it now. On the > > >>> other hand, the > > >>> > > > > > > >>>>>>>>>> proposed semantics for KTable timestamps > (marking > > >>> the > > >>> > > > > > beginning > > >>> > > > > > > >>>>>>>>>> of the validity of that record) makes sense to > > >>> me. 2. Streams > > >>> > > > > > > >>>>>>>>>> will only drop no-op updates for _stateful_ > > >>> KTable operations. > > >>> > > > > > > >>> We > > >>> > > > > > > >>>>>>>>>> don't want to add a hard guarantee that > Streams > > >>> will _never_ > > >>> > > > > > > >>>>>>>>>> emit a no-op table update because it would > > >>> require adding > > >>> > > > > > state > > >>> > > > > > > >>>>>>>>>> to otherwise stateless operations. If someone > is > > >>> really > > >>> > > > > > > >>> concerned > > >>> > > > > > > >>>>>>>>>> about a particular stateless operation > producing > > >>> a lot of > > >>> > > > > > no-op > > >>> > > > > > > >>>>>>>>>> results, all they have to do is materialize > it, > > >>> and Streams > > >>> > > > > > > >>> would > > >>> > > > > > > >>>>>>>>>> automatically drop the no-ops. Additionally, > I'm > > >>> +1 on not > > >>> > > > > > > >>> adding > > >>> > > > > > > >>>>>>>>>> an opt-out at this time. Regarding the KIP > > >>> itself, I would > > >>> > > > > > > >>> clean > > >>> > > > > > > >>>>>>>>>> it up a bit before calling for a vote. There > is a > > >>> lot of > > >>> > > > > > > >>>>>>>>>> "discussion"-type language there, which is > very > > >>> natural to > > >>> > > > > > > >>> read, > > >>> > > > > > > >>>>>>>>>> but makes it a bit hard to see what _exactly_ > the > > >>> kip is > > >>> > > > > > > >>>>>>>>>> proposing. Richard, would you mind just making > > >>> the "proposed > > >>> > > > > > > >>>>>>>>>> behavior change" a simple and succinct list of > > >>> bullet points? > > >>> > > > > > > >>>>>>>>>> I.e., please drop glue phrases like "there has > > >>> been some > > >>> > > > > > > >>>>>>>>>> discussion" or "possibly we could do X". For > the > > >>> final version > > >>> > > > > > > >>> of > > >>> > > > > > > >>>>>>>>>> the KIP, it should just say, "Streams will do > X, > > >>> Streams will > > >>> > > > > > > >>> do > > >>> > > > > > > >>>>>>>>>> Y". Feel free to add an elaboration section to > > >>> explain more > > >>> > > > > > > >>> about > > >>> > > > > > > >>>>>>>>>> what X and Y mean, but we don't need to talk > about > > >>> > > > > > > >>> possibilities > > >>> > > > > > > >>>>>>>>>> or alternatives except in the "rejected > > >>> alternatives" section. > > >>> > > > > > > >>>>>>>>>> Accordingly, can you also move the options you > > >>> presented in > > >>> > > > > > the > > >>> > > > > > > >>>>>>>>>> intro to the "rejected alternatives" section > and > > >>> only mention > > >>> > > > > > > >>> the > > >>> > > > > > > >>>>>>>>>> final proposal itself? This just really helps > > >>> reviewers to > > >>> > > > > > know > > >>> > > > > > > >>>>>>>>>> what they are voting for, and it helps > everyone > > >>> after the fact > > >>> > > > > > > >>>>>>>>>> when they are trying to get clarity on what > > >>> exactly the > > >>> > > > > > > >>> proposal > > >>> > > > > > > >>>>>>>>>> is, versus all the things it could have been. > > >>> Thanks, -John > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14, Richard Yu > wrote: > > >>> Hello to > > >>> > > > > > all, > > >>> > > > > > > >>>>>>>>>> I've finished making some initial > modifications > > >>> to the KIP. I > > >>> > > > > > > >>>>>>>>>> have decided to keep the implementation > section > > >>> in the KIP for > > >>> > > > > > > >>>>>>>>>> record-keeping purposes. For now, we should > focus > > >>> on only the > > >>> > > > > > > >>>>>>>>>> proposed behavior changes instead. See if you > > >>> have any > > >>> > > > > > > >>> comments! > > >>> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25, 2020 at 11:12 > AM > > >>> Richard Yu > > >>> > > > > > > >>>>>>>>>> <[email protected] <mailto: > > >>> > > > > > [email protected] > > >>> > > > > > > >>>>> > > >>> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all the discussion! > > >>> @John and @Bruno > > >>> > > > > > > >>> I > > >>> > > > > > > >>>>>>>>>> will survey other possible systems and see > what I > > >>> can do. Just > > >>> > > > > > > >>> a > > >>> > > > > > > >>>>>>>>>> question, by systems, I suppose you would mean > > >>> the pros and > > >>> > > > > > > >>> cons > > >>> > > > > > > >>>>>>>>>> of different reporting strategies? I'm not > > >>> completely certain > > >>> > > > > > > >>> on > > >>> > > > > > > >>>>>>>>>> this point, so it would be great if you can > > >>> clarify on that. > > >>> > > > > > So > > >>> > > > > > > >>>>>>>>>> here's what I got from all the discussion so > far: > > >>> - Since both > > >>> > > > > > > >>>>>>>>>> Matthias and John seems to have come to a > > >>> consensus on this, > > >>> > > > > > > >>> then > > >>> > > > > > > >>>>>>>>>> we will go for an all-round behavorial change > for > > >>> KTables. > > >>> > > > > > > >>> After > > >>> > > > > > > >>>>>>>>>> some thought, I decided that for now, an > opt-out > > >>> config will > > >>> > > > > > > >>> not > > >>> > > > > > > >>>>>>>>>> be added. As John have pointed out, no-op > changes > > >>> tend to > > >>> > > > > > > >>>>>>>>>> explode further down the topology as they are > > >>> forwarded to > > >>> > > > > > more > > >>> > > > > > > >>>>>>>>>> and more processor nodes downstream. - About > > >>> using hash codes, > > >>> > > > > > > >>>>>>>>>> after some explanation from John, it looks > like > > >>> hash codes > > >>> > > > > > > >>> might > > >>> > > > > > > >>>>>>>>>> not be as ideal (for implementation). For > now, we > > >>> will omit > > >>> > > > > > > >>> that > > >>> > > > > > > >>>>>>>>>> detail, and save it for the PR. - @Bruno You > do > > >>> have valid > > >>> > > > > > > >>>>>>>>>> concerns. Though, I am not completely certain > if > > >>> we want to do > > >>> > > > > > > >>>>>>>>>> emit-on-change only for materialized KTables. > I > > >>> will put it > > >>> > > > > > > >>> down > > >>> > > > > > > >>>>>>>>>> in the KIP regardless. I will do my best to > > >>> address all points > > >>> > > > > > > >>>>>>>>>> raised so far on the discussion. Hope we could > > >>> keep this > > >>> > > > > > going! > > >>> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24, 2020 at 6:07 PM > > >>> Bruno Cadonna > > >>> > > > > > > >>>>>>>>>> <[email protected] <mailto: > [email protected]>> > > >>> wrote: Thank > > >>> > > > > > > >>> you > > >>> > > > > > > >>>>>>>>>> Matthias for the use cases! Looking at both > use > > >>> cases, I think > > >>> > > > > > > >>>>>>>>>> you need to elaborate on them in the KIP, > > >>> Richard. Emit from > > >>> > > > > > > >>>>>>>>>> plain KTable: I agree with Matthias that the > > >>> lower timestamp > > >>> > > > > > > >>>>>>>>>> makes sense because it marks the start of the > > >>> validity of the > > >>> > > > > > > >>>>>>>>>> record. Idempotent records with a higher > > >>> timestamp can be > > >>> > > > > > > >>> safely > > >>> > > > > > > >>>>>>>>>> ignored. A corner case that I discussed with > > >>> Matthias offline > > >>> > > > > > > >>> is > > >>> > > > > > > >>>>>>>>>> when we do not materialize a KTable due to > > >>> optimization. Then > > >>> > > > > > > >>> we > > >>> > > > > > > >>>>>>>>>> cannot avoid the idempotent records because > we do > > >>> not keep the > > >>> > > > > > > >>>>>>>>>> first record with the lower timestamp to > compare > > >>> to. Emit from > > >>> > > > > > > >>>>>>>>>> KTable with aggregations: If we specify that > an > > >>> aggregation > > >>> > > > > > > >>>>>>>>>> result should have the highest timestamp of > the > > >>> records that > > >>> > > > > > > >>>>>>>>>> participated in the aggregation, we cannot > ignore > > >>> any > > >>> > > > > > > >>> idempotent > > >>> > > > > > > >>>>>>>>>> records. Admittedly, the result of an > aggregation > > >>> usually > > >>> > > > > > > >>>>>>>>>> changes, but there are aggregations where the > > >>> result may not > > >>> > > > > > > >>>>>>>>>> change like min and max, or sum when the > incoming > > >>> records have > > >>> > > > > > > >>> a > > >>> > > > > > > >>>>>>>>>> value of zero. In those cases, we could > benefit > > >>> of the emit on > > >>> > > > > > > >>>>>>>>>> change, but only if we define the semantics > of the > > >>> > > > > > aggregations > > >>> > > > > > > >>>>>>>>>> to not use the highest timestamp of the > > >>> participating records > > >>> > > > > > > >>> for > > >>> > > > > > > >>>>>>>>>> the result. In Kafka Streams, we do not have > min, > > >>> max, and sum > > >>> > > > > > > >>> as > > >>> > > > > > > >>>>>>>>>> explicit aggregations, but we need to provide > an > > >>> API to define > > >>> > > > > > > >>>>>>>>>> what timestamp should be used for the result > of > > >>> an aggregation > > >>> > > > > > > >>> if > > >>> > > > > > > >>>>>>>>>> we want to go down this path. All of this does > > >>> not block this > > >>> > > > > > > >>> KIP > > >>> > > > > > > >>>>>>>>>> and I just wanted to put this aspects up for > > >>> discussion. The > > >>> > > > > > > >>> KIP > > >>> > > > > > > >>>>>>>>>> can limit itself to emit from materialized > > >>> KTables. However, > > >>> > > > > > > >>> the > > >>> > > > > > > >>>>>>>>>> limits should be explicitly stated in the KIP. > > >>> Best, Bruno > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. > Sax > > >>> > > > > > > >>>>>>>>>> <[email protected] <mailto: > > >>> [email protected]>> wrote: > > >>> > > > > > > >>>>>>>>>> IMHO, the question about semantics depends on > the > > >>> use case, in > > >>> > > > > > > >>>>>>>>>> particular on the origin of a KTable. If > there is > > >>> a changlog > > >>> > > > > > > >>>>>>>>>> topic that one reads directly into a KTable, > > >>> emit-on-change > > >>> > > > > > > >>> does > > >>> > > > > > > >>>>>>>>>> actually make sense, because the timestamp > > >>> indicates _when_ > > >>> > > > > > the > > >>> > > > > > > >>>>>>>>>> update was _effective_. For this case, it is > > >>> semantically > > >>> > > > > > sound > > >>> > > > > > > >>>>>>>>>> to _not_ update the timestamp in the store, > > >>> because the second > > >>> > > > > > > >>>>>>>>>> update is actually idempotent and advancing > the > > >>> timestamp is > > >>> > > > > > > >>> not > > >>> > > > > > > >>>>>>>>>> ideal (one could even consider it to be wrong > to > > >>> advance the > > >>> > > > > > > >>>>>>>>>> timestamp) because the "valid time" of the > record > > >>> pair did not > > >>> > > > > > > >>>>>>>>>> change. This reasoning also applies to > > >>> KTable-KTable joins. > > >>> > > > > > > >>>>>>>>>> However, if the KTable is the result of an > > >>> aggregation, I > > >>> > > > > > think > > >>> > > > > > > >>>>>>>>>> emit-on-update is more natural, because the > > >>> timestamp reflects > > >>> > > > > > > >>>>>>>>>> the _last_ time (ie, highest timestamp) of all > > >>> input records > > >>> > > > > > > >>> the > > >>> > > > > > > >>>>>>>>>> contributed to the result. Hence, updating the > > >>> timestamp and > > >>> > > > > > > >>>>>>>>>> emitting a new record actually sounds correct > to > > >>> me. This > > >>> > > > > > > >>> applies > > >>> > > > > > > >>>>>>>>>> to windowed and non-windowed aggregations > IMHO. > > >>> However, > > >>> > > > > > > >>>>>>>>>> considering the argument that the timestamp > > >>> should not be > > >>> > > > > > > >>> update > > >>> > > > > > > >>>>>>>>>> in the first case in the store to begin with, > > >>> both cases are > > >>> > > > > > > >>>>>>>>>> actually the same, and both can be modeled as > > >>> emit-on-change: > > >>> > > > > > > >>> if > > >>> > > > > > > >>>>>>>>>> a `table()` operator does not update the > > >>> timestamp if the > > >>> > > > > > value > > >>> > > > > > > >>>>>>>>>> does not change, there is _no_ change and thus > > >>> nothing is > > >>> > > > > > > >>>>>>>>>> emitted. At the same time, if an aggregation > > >>> operator does > > >>> > > > > > > >>> update > > >>> > > > > > > >>>>>>>>>> the timestamp (even if the value does not > change) > > >>> there _is_ a > > >>> > > > > > > >>>>>>>>>> change and we emit. Note that handling > > >>> out-of-order data for > > >>> > > > > > > >>>>>>>>>> aggregations would also work seamlessly with > this > > >>> approach -- > > >>> > > > > > > >>> for > > >>> > > > > > > >>>>>>>>>> out-of-order records, the timestamp does never > > >>> change, and > > >>> > > > > > > >>> thus, > > >>> > > > > > > >>>>>>>>>> we only emit if the result itself changes. > > >>> Therefore, I would > > >>> > > > > > > >>>>>>>>>> argue that we might not even need any config, > > >>> because the > > >>> > > > > > > >>>>>>>>>> emit-on-change behavior is just correct and > > >>> reduced the > > >>> > > > > > > >>>>>>>>>> downstream load, while our current behavior is > > >>> not ideal (even > > >>> > > > > > > >>> if > > >>> > > > > > > >>>>>>>>>> it's also correct). Thoughts? -Matthias On > > >>> 1/24/20 9:37 AM, > > >>> > > > > > > >>> John > > >>> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks for that > idea. I > > >>> hadn't > > >>> > > > > > > >>>>>>>>>> considered that option before, and it does > seem > > >>> like that > > >>> > > > > > would > > >>> > > > > > > >>>>>>>>>> be the right place to put it if we think it > might > > >>> be > > >>> > > > > > > >>> semantically > > >>> > > > > > > >>>>>>>>>> important to control on a table-by-table > basis. I > > >>> had been > > >>> > > > > > > >>>>>>>>>> thinking of it less semantically and more > > >>> practically. In the > > >>> > > > > > > >>>>>>>>>> context of a large topology, or more > generally, a > > >>> large > > >>> > > > > > > >>> software > > >>> > > > > > > >>>>>>>>>> system that contains many topologies and other > > >>> event-driven > > >>> > > > > > > >>>>>>>>>> systems, each no-op result becomes an input > that > > >>> is destined > > >>> > > > > > to > > >>> > > > > > > >>>>>>>>>> itself become a no-op result, and so on, all > the > > >>> way through > > >>> > > > > > > >>> the > > >>> > > > > > > >>>>>>>>>> system. Thus, a single pointless processing > > >>> result becomes > > >>> > > > > > > >>>>>>>>>> amplified into a large number of pointless > > >>> computations, cache > > >>> > > > > > > >>>>>>>>>> perturbations, and network and disk I/O > > >>> operations. If you > > >>> > > > > > also > > >>> > > > > > > >>>>>>>>>> consider operations with fan-out implications, > > >>> like branching > > >>> > > > > > > >>> or > > >>> > > > > > > >>>>>>>>>> foreign-key joins, the wasted resources are > > >>> amplified not just > > >>> > > > > > > >>> in > > >>> > > > > > > >>>>>>>>>> proportion to the size of the system, but the > > >>> size of the > > >>> > > > > > > >>> system > > >>> > > > > > > >>>>>>>>>> times the average fan-out (to the power of the > > >>> number of > > >>> > > > > > > >>> fan-out > > >>> > > > > > > >>>>>>>>>> operations on the path(s) through the > system). In > > >>> my time > > >>> > > > > > > >>>>>>>>>> operating such systems, I've observed these > > >>> effects to be very > > >>> > > > > > > >>>>>>>>>> real, and actually, the system and use case > > >>> doesn't have to be > > >>> > > > > > > >>>>>>>>>> very large before the amplification poses an > > >>> existential > > >>> > > > > > threat > > >>> > > > > > > >>>>>>>>>> to the system as a whole. This is the basis > of my > > >>> advocating > > >>> > > > > > > >>> for > > >>> > > > > > > >>>>>>>>>> a simple behavior change, rather than an > opt-in > > >>> config of any > > >>> > > > > > > >>>>>>>>>> kind. It seems like Streams should "do the > right > > >>> thing" for > > >>> > > > > > the > > >>> > > > > > > >>>>>>>>>> majority use case. My theory (which may be > wrong) > > >>> is that the > > >>> > > > > > > >>>>>>>>>> majority use case is more like "relational > > >>> queries" than "CEP > > >>> > > > > > > >>>>>>>>>> queries". Even if you were doing some > > >>> event-sensitive > > >>> > > > > > > >>>>>>>>>> computation, wouldn't you do them as Stream > > >>> operations (where > > >>> > > > > > > >>>>>>>>>> this feature is inapplicable anyway)? In > keeping > > >>> with the > > >>> > > > > > > >>>>>>>>>> "practical" perspective, I suggested the > opt-out > > >>> config only > > >>> > > > > > in > > >>> > > > > > > >>>>>>>>>> the (I think unlikely) event that filtering > out > > >>> pointless > > >>> > > > > > > >>> updates > > >>> > > > > > > >>>>>>>>>> actually harms performance. I'd also be > perfectly > > >>> fine without > > >>> > > > > > > >>>>>>>>>> the opt-out config. I really think that > (because > > >>> of the > > >>> > > > > > > >>> timestamp > > >>> > > > > > > >>>>>>>>>> semantics work already underway), we're > already > > >>> pre-fetching > > >>> > > > > > > >>> the > > >>> > > > > > > >>>>>>>>>> prior result most of the time, so there would > > >>> actually be very > > >>> > > > > > > >>>>>>>>>> little extra I/O involved in implementing > > >>> emit-on-change. > > >>> > > > > > > >>>>>>>>>> However, we should consider whether my > experience > > >>> is likely to > > >>> > > > > > > >>>>>>>>>> be general. Do you have some use case in mind > for > > >>> which you'd > > >>> > > > > > > >>>>>>>>>> actually want some KTable results to be > > >>> emit-on-update for > > >>> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna > > >>> wrote: Hi > > >>> > > > > > > >>> Richard, > > >>> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree with John that > we > > >>> should focus > > >>> > > > > > > >>> on > > >>> > > > > > > >>>>>>>>>> the interface and behavior change in a KIP. We > > >>> can discuss the > > >>> > > > > > > >>>>>>>>>> implementation later. I am also +1 for the > > >>> survey. I had a > > >>> > > > > > > >>>>>>>>>> thought about this. Couldn't we consider > > >>> emit-on-change to be > > >>> > > > > > > >>> one > > >>> > > > > > > >>>>>>>>>> config of suppress (like `untilWindowCloses`)? > > >>> What you > > >>> > > > > > > >>>>>>>>>> basically propose is to suppress updates if > they > > >>> do not change > > >>> > > > > > > >>>>>>>>>> the result. Considering emit on change as a > > >>> flavour of > > >>> > > > > > suppress > > >>> > > > > > > >>>>>>>>>> would be more flexible because it would > specify > > >>> the behavior > > >>> > > > > > > >>>>>>>>>> locally for a KTable instead of globally for > all > > >>> KTables. > > >>> > > > > > > >>>>>>>>>> Additionally, specifying the behavior in one > > >>> place instead of > > >>> > > > > > > >>>>>>>>>> multiple places feels more intuitive and > > >>> consistent to me. > > >>> > > > > > > >>> Best, > > >>> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at 7:49 AM John > Roesler > > >>> > > > > > > >>>>>>>>>> <[email protected] <mailto: > [email protected]>> > > >>> wrote: Hi > > >>> > > > > > > >>>>>>>>>> Richard, Thanks for picking this up! I know > of at > > >>> least one > > >>> > > > > > > >>> large > > >>> > > > > > > >>>>>>>>>> community member for which this feature is > > >>> absolutely > > >>> > > > > > > >>> essential. > > >>> > > > > > > >>>>>>>>>> If I understand your two options, it seems > like > > >>> the proposal > > >>> > > > > > is > > >>> > > > > > > >>>>>>>>>> to implement it as a behavior change > regardless, > > >>> and the > > >>> > > > > > > >>> question > > >>> > > > > > > >>>>>>>>>> is whether to provide an opt-out config or > not. > > >>> Given that any > > >>> > > > > > > >>>>>>>>>> implementation of this feature would have some > > >>> performance > > >>> > > > > > > >>> impact > > >>> > > > > > > >>>>>>>>>> under some workloads, and also that we don't > know > > >>> if anyone > > >>> > > > > > > >>>>>>>>>> really depends on emit-on-update time > semantics, > > >>> it seems like > > >>> > > > > > > >>> we > > >>> > > > > > > >>>>>>>>>> should propose to add an opt-out config. Can > you > > >>> update the > > >>> > > > > > KIP > > >>> > > > > > > >>>>>>>>>> to mention the exact config key and value(s) > > >>> you'd propose? > > >>> > > > > > > >>> Just > > >>> > > > > > > >>>>>>>>>> to move the discussion forward, maybe > something > > >>> like: emit.on > > >>> > > > > > > >>> := > > >>> > > > > > > >>>>>>>>>> change|update with the new default being > "change" > > >>> Thanks for > > >>> > > > > > > >>>>>>>>>> pointing out the timestamp issue in > particular. I > > >>> agree that > > >>> > > > > > if > > >>> > > > > > > >>>>>>>>>> we discard the latter update as a no-op, then > we > > >>> also have to > > >>> > > > > > > >>>>>>>>>> discard its timestamp (obviously, we don't > > >>> forward the > > >>> > > > > > > >>> timestamp > > >>> > > > > > > >>>>>>>>>> update, as that's the whole point, but we also > > >>> can't update > > >>> > > > > > the > > >>> > > > > > > >>>>>>>>>> timestamp in the store, as the store must > remain > > >>> consistent > > >>> > > > > > > >>> with > > >>> > > > > > > >>>>>>>>>> what has been emitted). I have to confess > that I > > >>> disagree with > > >>> > > > > > > >>>>>>>>>> your implementation proposal, but it's also > not > > >>> necessary to > > >>> > > > > > > >>>>>>>>>> discuss implementation in the KIP. Maybe it > would > > >>> be less > > >>> > > > > > > >>>>>>>>>> controversial if you just drop that section > for > > >>> now, so that > > >>> > > > > > > >>> the > > >>> > > > > > > >>>>>>>>>> KIP discussion can focus on the behavior > change > > >>> and config. > > >>> > > > > > > >>> Just > > >>> > > > > > > >>>>>>>>>> for reference, there is some research into > this > > >>> domain. For > > >>> > > > > > > >>>>>>>>>> example, see the "Report" section (3.2.3) of > the > > >>> SECRET paper: > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>> > > >>> > > > > > > > >>> > https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop > > >>> > > > > > > > le.csail.mit.edu > > >>> > > > > > > >>>>>> > > >>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&data > > >>> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com > > >>> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7 > > >>> > > > > > > > > > >>> > > > > > > >>>>>> > > >>> > > > > > > >>> > > >>> > > > > > > > >>> > Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&sdata > > >>> > > > > > > > > > >>> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&reserved=0 > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > > It might help to round out the proposal if you take a > brief > > >>> > > > > > > >>> survey of > > >>> > > > > > > >>>>>>>>>> the behaviors of other systems, along with > pros > > >>> and cons if > > >>> > > > > > any > > >>> > > > > > > >>>>>>>>>> are reported. Thanks, -John > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu > wrote: > > >>> Hi > > >>> > > > > > everybody! > > >>> > > > > > > >>>>>>>>>> I'd like to propose a change that we probably > > >>> should've added > > >>> > > > > > > >>> for > > >>> > > > > > > >>>>>>>>>> a long time now. The key benefit of this KIP > > >>> would be reduced > > >>> > > > > > > >>>>>>>>>> traffic in Kafka Streams since a lot of no-op > > >>> results would no > > >>> > > > > > > >>>>>>>>>> longer be sent downstream. Here is the KIP for > > >>> reference. > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>> > > >>> > > > > > > > >>> > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi > > >>> > > > > > > > ki.apache.org > > >>> > > > > > > >>>>>> > > >>> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit > > >>> > > > > > > > > > >>> > > > > > > >>>>>> > > >>> > > > > > > >>> > > >>> > > > > > > > >>> > %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&data=02%7C01%7CThom > > >>> > > > > > > > as.Becker%40tivo.com > > >>> > > > > > > >>>>>> > %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c > > >>> > > > > > > > > > >>> > > > > > > >>>>>> > > >>> > > > > > > >>> > > >>> > > > > > > > >>> > 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&sdata=zYpCSFOsyN4%2B > > >>> > > > > > > > 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&reserved=0 > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > > Currently, I seek to formalize our approach for this > KIP > > >>> first > > >>> > > > > > > >>> before > > >>> > > > > > > >>>>>>>>>> we determine concrete API additions / > > >>> configurations. Some > > >>> > > > > > > >>>>>>>>>> configs might warrant adding, whiles others > are > > >>> not necessary > > >>> > > > > > > >>>>>>>>>> since adding them would only increase > complexity > > >>> of Kafka > > >>> > > > > > > >>>>>>>>>> Streams. Cheers, Richard > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> ________________________________ This email > and > > >>> any > > >>> > > > > > attachments > > >>> > > > > > > >>>>>>>>>> may contain confidential and privileged > material > > >>> for the sole > > >>> > > > > > > >>> use > > >>> > > > > > > >>>>>>>>>> of the intended recipient. Any review, > copying, or > > >>> > > > > > distribution > > >>> > > > > > > >>>>>>>>>> of this email (or any attachments) by others > is > > >>> prohibited. If > > >>> > > > > > > >>>>>>>>>> you are not the intended recipient, please > > >>> contact the sender > > >>> > > > > > > >>>>>>>>>> immediately and permanently delete this email > and > > >>> any > > >>> > > > > > > >>>>>>>>>> attachments. No employee or agent of TiVo is > > >>> authorized to > > >>> > > > > > > >>>>>>>>>> conclude any binding agreement on behalf of > TiVo > > >>> by email. > > >>> > > > > > > >>>>>>>>>> Binding agreements with TiVo may only be made > by > > >>> a signed > > >>> > > > > > > >>> written > > >>> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker* *Principal > Engineer * > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> *Personalized Content Discovery* > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* < > > >>> http://www.tivo.com/> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> > > >>> > > > > > > >>>>>>>>>> This email and any attachments may contain > > >>> confidential and > > >>> > > > > > > >>>>>>>>>> privileged material for the sole use of the > > >>> intended > > >>> > > > > > recipient. > > >>> > > > > > > >>>>>>>>>> Any review, copying, or distribution of this > > >>> email (or any > > >>> > > > > > > >>>>>>>>>> attachments) by others is prohibited. If you > are > > >>> not the > > >>> > > > > > > >>> intended > > >>> > > > > > > >>>>>>>>>> recipient, please contact the sender > immediately > > >>> and > > >>> > > > > > > >>> permanently > > >>> > > > > > > >>>>>>>>>> delete this email and any attachments. No > > >>> employee or agent of > > >>> > > > > > > >>>>>>>>>> TiVo is authorized to conclude any binding > > >>> agreement on behalf > > >>> > > > > > > >>> of > > >>> > > > > > > >>>>>>>>>> TiVo by email. Binding agreements with TiVo > may > > >>> only be made > > >>> > > > > > > >>> by a > > >>> > > > > > > >>>>>>>>>> signed written agreement. > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> -- > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> *Tommy Becker* /Principal Engineer / > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> /Personalized Content Discovery/ > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* < > > >>> http://www.tivo.com/> > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>>>>> > > >>> > > > > > > >>>>>> > > >>> > > > > > > > >>> > ---------------------------------------------------------------------- > > >>> > > > > > > >>>>>>> > > >>> > > > > > > >>>>>> > > >>> > > > > > > >>>>> > > >>> > > > > > > >>>> > > >>> > > > > > > >>> > > >>> > > > > > > >> > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > Attachments: > > >>> > > > > > > * signature.asc > > >>> > > > > > > > >>> > > > > > >>> > > > >>> > > >> > > >
