Hi all, I might've made a minor mistake. The processor node level is level 3, not level 1. I will correct the KIP accordingly.
After looking over things, I decided to start the voting thread this afternoon. Cheers, Richard On Thu, Feb 27, 2020 at 12:29 PM Richard Yu <yohan.richard...@gmail.com> wrote: > Hi Bruno, Hi John, > > Thanks for your comments! I updated the KIP accordingly, and it looks like > for quite a few points. I was doing some beating around the bush which > could've been avoided. > > Looks like we can reduce the metric to Level 1 (per processor node) then. > > I've cleaned up most of the unnecessary info, and we should be fairly > close. > I will start working on a PR soon for this KIP. (although we might split > that up into stages) > > Cheers, > Richard > > On Thu, Feb 27, 2020 at 6:06 AM Bruno Cadonna <br...@confluent.io> wrote: > >> Hi John, >> >> I agree with you. It is better to measure the metric on processor node >> level. The users can do the rollup to task-level by themselves. >> >> Best, >> Bruno >> >> On Thu, Feb 27, 2020 at 12:09 AM John Roesler <vvcep...@apache.org> >> wrote: >> > >> > Hi Richard, >> > >> > I've been making a final pass over the KIP. >> > >> > Re: Proposed Behavior Change: >> > >> > I think this point is controversial and probably doesn't need to be >> there at all: >> > > 2.b. In certain situations where there is a high volume of idempotent >> > > updates throughout the Streams DAG, it will be recommended practice >> > > to materialize all operations to reduce traffic overall across the >> entire >> > > network of nodes. >> > >> > Re-reading all the points, it seems like we can sum them up in a way >> that's >> > a little more straight to the point, and gives us the right amount of >> flexibility: >> > >> > > Proposed Behavior Changes >> > > >> > > Definition: "idempotent update" is one in which the new result and >> prior >> > > result, when serialized, are identical byte arrays >> > > >> > > Note: an "update" is a concept that only applies to Table operations, >> so >> > > the concept of an "idempotent update" also only applies to Table >> operations. >> > > See >> https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable >> > > for more information. >> > > >> > > Given that definition, we propose for Streams to drop idempotent >> updates >> > > in any situation where it's possible and convenient to do so. For >> example, >> > > any time we already have both the prior and new results serialized, we >> > > may compare them, and drop the update if it is idempotent. >> > > >> > > Note that under this proposal, we can implement idempotence checking >> > > in the following situations: >> > > 1. Any aggregation (for example, KGroupedStream, KGroupedTable, >> > > TimeWindowedKStream, and SessionWindowedKStream operations) >> > > 2. Any Materialized KTable operation >> > > 3. Repartition operations, when we need to send both prior and new >> results >> > >> > Notice that in my proposed wording, we neither limit ourselves to just >> the >> > situations enumerated, nor promise to implement the optimization in >> every >> > possible situation. IMHO, this is the best way to propose such a >> feature. >> > That way, we have the flexibility to implement it in stages, and also >> to add >> > on to the implementation in the future. >> > >> > >> > Re: Metrics >> > >> > I agree with Bruno, although, I think it might just be a confusing >> statement. >> > It might be clearer to drop all the "discussion", and just say: "We >> will add a >> > metric to count the number of idempotent updates that we have dropped". >> > >> > Also, with respect to the metric, I'm wondering if the metric should be >> task- >> > level or processor-node-level. Since the interesting action takes place >> inside >> > individual processor nodes, I _think_ it would be higher leverage to >> just >> > measure it at the node level. WDYT? >> > >> > Re: Design Reasoning >> > >> > This section seems to be a little bit outdated. I also just noticed a >> "surprise" >> > configuration "timestamp.aggregation.selection.policy" hidden in point >> 1.a. >> > Is that part of the proposal? We haven't discussed it, and I think we >> were >> > talking about this KIP being "configuration free". >> > >> > There is also some discussion of discarded alternative in the Design >> Reasoning >> > section, which is confusing. Finally, there was a point there I didn't >> understand >> > at all, about stateless operators not being intended to load prior >> results. >> > This statement doesn't seem to be true, but it also doesn't seem to be >> relevant, >> > so maybe we can just drop it. >> > >> > Overall, it might help if you make a pass on this section, and just >> discuss as >> > briefly as possible the justification for the proposed behavior change, >> and >> > not adding a configuration. Try to avoid talking about things that we >> are not >> > proposing, since that will just lead to confusion. >> > >> > Similarly, I'd just completely remove the "Implementation [discarded]" >> section. >> > It was good to have this as part of the discussion initially, but as we >> move >> > toward a vote, it's better to just streamline the KIP document as much >> as >> > possible. Keeping a "discarded" section in the document will just make >> it >> > harder for new people to understand the proposal. We did the same thing >> > with KIP-441, where there were two prior drafts included at the end of >> the >> > document, and we just deleted them for clarity. >> > >> > I liked the "Compatibility" and "Rejected Alternatives" section. Very >> clear >> > and to the point. >> > >> > Thanks again for the contribution! I think once the KIP document is >> cleaned >> > up, we'll be in good shape to finalize the discussion. >> > -John >> > >> > >> > On Wed, Feb 26, 2020, at 07:27, Bruno Cadonna wrote: >> > > Hi Richard, >> > > >> > > 1. Could you change "idempotent update operations will only be dropped >> > > from KTables, not from other classes." -> idempotent update operations >> > > will only be dropped from materialized KTables? For non-materialized >> > > KTables -- as they can occur after optimization of the topology -- we >> > > cannot drop idempotent updates. >> > > >> > > 2. I cannot completely follow the metrics section. Do you want to >> > > record all idempotent updates or only the dropped ones? In particular, >> > > I do not understand the following sentences: >> > > "For that matter, even if we don't drop idempotent updates, we should >> > > at the very least record the number of idempotent updates that has >> > > been seen go through a particular processor." >> > > "Therefore, we should add some metrics which will count the number of >> > > idempotent updates that each node has seen." >> > > I do not see how we can record idempotent updates that we do not drop. >> > > If we see them, we should drop them. If we do not see them, we cannot >> > > drop them and we cannot record them. >> > > >> > > Best, >> > > Bruno >> > > >> > > On Wed, Feb 26, 2020 at 4:57 AM Richard Yu < >> yohan.richard...@gmail.com> wrote: >> > > > >> > > > Hi John, >> > > > >> > > > Sounds goods. It looks like we are close to wrapping things up. If >> there >> > > > isn't any other revisions which needs to be made. (If so, please >> comment in >> > > > the thread) >> > > > I will start the voting process this Thursday (Pacific Standard >> Time). >> > > > >> > > > Cheers, >> > > > Richard >> > > > >> > > > On Tue, Feb 25, 2020 at 11:59 AM John Roesler <vvcep...@apache.org> >> wrote: >> > > > >> > > > > Hi Richard, >> > > > > >> > > > > Sorry for the slow reply. I actually think we should avoid >> checking >> > > > > equals() for now. Your reasoning is good, but the truth is that >> > > > > depending on the implementation of equals() is non-trivial, >> > > > > semantically, and (though I proposed it before), I'm not convinced >> > > > > it's worth the risk. Much better to start with exactly one kind of >> > > > > "idempotence detection". >> > > > > >> > > > > Even if someone does update their serdes, we know that the new >> > > > > serde would still be able to _de_serialize the old format, or the >> whole >> > > > > app would break. The situation is that the new result gets encoded >> > > > > in the new binary format, which means we don't detect an >> idempotent >> > > > > update for what it is. In this case, we'd write the new binary >> format to >> > > > > disk and the changelog, and forward it downstream. However, we >> only >> > > > > do this once. Now that the binary format for that record has been >> updated, >> > > > > we would correctly detect idempotence of any subsequent updates. >> > > > > >> > > > > Plus, we would still be able to filter out idempotent updates in >> > > > > repartition >> > > > > sinks, since for those, we use the new serde to serialize both >> the "old" >> > > > > and >> > > > > "new" result. >> > > > > >> > > > > It's certainly a good observation, but I think we can just make a >> note of >> > > > > it >> > > > > in "rejected alternatives" for now, and plan to refine it later, >> if it does >> > > > > pose a big performance problem. >> > > > > >> > > > > Thanks! >> > > > > -John >> > > > > >> > > > > On Sat, Feb 22, 2020, at 18:14, Richard Yu wrote: >> > > > > > Hi all, >> > > > > > >> > > > > > Updated the KIP. >> > > > > > >> > > > > > Just a question: do you think it would be a good idea if we >> check for >> > > > > both >> > > > > > Object#equals() and binary equality? >> > > > > > Because there might be some subtle changes in the serialization >> (for >> > > > > > example, if the user decides to upgrade their serialization >> procedure to >> > > > > a >> > > > > > new one), but the underlying values of the result might be the >> same. >> > > > > > (therefore equals() might return true) >> > > > > > >> > > > > > Do you think this would be plausible? >> > > > > > >> > > > > > Cheers, >> > > > > > Richard >> > > > > > >> > > > > > On Fri, Feb 21, 2020 at 2:37 PM Richard Yu < >> yohan.richard...@gmail.com> >> > > > > > wrote: >> > > > > > >> > > > > > > Hello, >> > > > > > > >> > > > > > > Just to make some updates. I changed the name of the metric >> so that it >> > > > > was >> > > > > > > more in line with usual Kafka naming conventions for metrics >> / sensors. >> > > > > > > Below is the updated description of the metric: >> > > > > > > >> > > > > > > dropped-idempotent-updates : (Level 2 - Per Task) DEBUG (rate >> | total) >> > > > > > > >> > > > > > > Description: This metric will record the number of updates >> that have >> > > > > been >> > > > > > > dropped since they are essentially re-performing an earlier >> operation. >> > > > > > > >> > > > > > > Note: >> > > > > > > >> > > > > > > - The rate option indicates the ratio of records dropped >> to actual >> > > > > > > volume of records passing through the task. >> > > > > > > - The total option will just give a raw count of the >> number of >> > > > > records >> > > > > > > dropped. >> > > > > > > >> > > > > > > >> > > > > > > I hope that this is more on point. >> > > > > > > >> > > > > > > Best, >> > > > > > > Richard >> > > > > > > >> > > > > > > On Fri, Feb 21, 2020 at 2:20 PM Richard Yu < >> yohan.richard...@gmail.com >> > > > > > >> > > > > > > wrote: >> > > > > > > >> > > > > > >> Hi all, >> > > > > > >> >> > > > > > >> Thanks for the clarification. I was just confused a little >> on what was >> > > > > > >> going on. >> > > > > > >> >> > > > > > >> So I guess then that for the actual proposal. We got the >> following: >> > > > > > >> >> > > > > > >> 1. We check for binary equality, and perform no extra look >> ups. >> > > > > > >> 2. Emphasize that this applies only to materialized tables. >> > > > > > >> 3. We drop aggregation updates if key, value and timestamp >> is the >> > > > > same. >> > > > > > >> >> > > > > > >> Then that settles the behavior changes. So it looks like the >> Metric >> > > > > that >> > > > > > >> is the only thing that is left. In this case, I think the >> metric >> > > > > would be >> > > > > > >> named the following: IdempotentUpdateMetric. This is mostly >> off the >> > > > > top of >> > > > > > >> my head. So if you think that we change it, feel free to say >> so. >> > > > > > >> The metric will report the number of dropped operations >> inherently. >> > > > > > >> >> > > > > > >> It will probably be added as a Sensor, similar to the >> dropped records >> > > > > > >> sensor we already have. >> > > > > > >> >> > > > > > >> If there isn't anything else, I will probably start the >> voting process >> > > > > > >> next week! >> > > > > > >> >> > > > > > >> Cheers, >> > > > > > >> Richard >> > > > > > >> >> > > > > > >> >> > > > > > >> On Fri, Feb 21, 2020 at 11:23 AM John Roesler < >> vvcep...@apache.org> >> > > > > > >> wrote: >> > > > > > >> >> > > > > > >>> Hi Bruno, >> > > > > > >>> >> > > > > > >>> Thanks for the clarification. Indeed, I was thinking two >> things: >> > > > > > >>> 1. For the initial implementation, we can just avoid adding >> any extra >> > > > > > >>> lookups, but only do the comparison when we already happen >> to have >> > > > > > >>> the prior value. >> > > > > > >>> 2. I think, as a result of the timestamp semantics, we >> actually _do_ >> > > > > look >> > > > > > >>> up the prior value approximately all the time, so the >> idempotence >> > > > > check >> > > > > > >>> should be quite effective. >> > > > > > >>> >> > > > > > >>> I think that second point is the same thing you're >> referring to >> > > > > > >>> potentially >> > > > > > >>> being unnecessary. It does mean that we do fetch the whole >> value in a >> > > > > > >>> lot of cases where we really only need the timestamp, so it >> could >> > > > > > >>> certainly >> > > > > > >>> be optimized in the future. In that future, we would need >> to weigh >> > > > > that >> > > > > > >>> optimization against losing the idempotence check. But, >> that's a >> > > > > problem >> > > > > > >>> for tomorrow :) >> > > > > > >>> >> > > > > > >>> I'm 100% on board with scrutinizing the performance as we >> implement >> > > > > > >>> this feature. >> > > > > > >>> >> > > > > > >>> Thanks again, >> > > > > > >>> -John >> > > > > > >>> >> > > > > > >>> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote: >> > > > > > >>> > Hi John, >> > > > > > >>> > >> > > > > > >>> > I am glad to help you with your imagination. With >> overhead, I >> > > > > mainly >> > > > > > >>> > meant the additional lookup into the state store to get >> the current >> > > > > > >>> > value, but I see now in the code that we do that lookup >> anyways >> > > > > > >>> > (although I think we could avoid that in the cases where >> we do not >> > > > > > >>> > need the old value). With or without config, we need to >> evaluate >> > > > > the >> > > > > > >>> > performance benefits of this change, in any case. >> > > > > > >>> > >> > > > > > >>> > Best, >> > > > > > >>> > Bruno >> > > > > > >>> > >> > > > > > >>> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler < >> vvcep...@apache.org> >> > > > > > >>> wrote: >> > > > > > >>> > > >> > > > > > >>> > > Thanks for your remarks, Bruno! >> > > > > > >>> > > >> > > > > > >>> > > I'm in favor of standardizing on terminology like "not >> forwarding >> > > > > > >>> > > idempotent updates" or "dropping idempotent updates". >> Maybe we >> > > > > > >>> > > should make a pass on the KIP and just convert >> everything to this >> > > > > > >>> > > phrasing. In retrospect, even the term "emit-on-change" >> has too >> > > > > much >> > > > > > >>> > > semantic baggage, since it implies the semantics from >> the SECRET >> > > > > > >>> > > paper, which we don't really want to imply here. >> > > > > > >>> > > >> > > > > > >>> > > I'm also in favor of the metric as you propose. >> > > > > > >>> > > >> > > > > > >>> > > Likewise with stream aggregations, I was also under the >> > > > > impression >> > > > > > >>> > > that we agreed on dropping idempotent updates to the >> aggregation >> > > > > > >>> > > result, any time we find that our "new" (key, value, >> timestamp) >> > > > > > >>> result >> > > > > > >>> > > is identical to the prior one. >> > > > > > >>> > > >> > > > > > >>> > > Also, I'm +1 on all your recommendations for updating >> the KIP >> > > > > > >>> document >> > > > > > >>> > > for clarity. >> > > > > > >>> > > >> > > > > > >>> > > Regarding the opt-out config. Perhaps I'm suffering >> from a >> > > > > failure of >> > > > > > >>> > > imagination, but I don't see how the current proposal >> could >> > > > > really >> > > > > > >>> have >> > > > > > >>> > > a measurable impact on latency. If all we do is make a >> single >> > > > > extra >> > > > > > >>> pass >> > > > > > >>> > > to compare two byte arrays for equality, only in the >> cases where >> > > > > we >> > > > > > >>> already >> > > > > > >>> > > have the byte arrays available, it seems unlikely to >> measurably >> > > > > > >>> affect the >> > > > > > >>> > > processing of non-idempotent updates. It seems >> guaranteed to >> > > > > > >>> _decrease_ >> > > > > > >>> > > the latency of processing idempotent updates, since we >> get to >> > > > > skip a >> > > > > > >>> > > store#put, at least one producer#send, and also all >> downstream >> > > > > > >>> processing, >> > > > > > >>> > > including all the disk and network operations >> associated with >> > > > > > >>> downstream >> > > > > > >>> > > operations. >> > > > > > >>> > > >> > > > > > >>> > > It seems like if we're pretty sure this change would >> only >> > > > > _help_, we >> > > > > > >>> shouldn't >> > > > > > >>> > > introduce the operational burden of an extra >> configuration. If we >> > > > > > >>> want to >> > > > > > >>> > > be more aggressive about dropping idempotent operations >> in the >> > > > > > >>> future, >> > > > > > >>> > > such as depending on equals() or adding a ChangeDetector >> > > > > interface, >> > > > > > >>> then >> > > > > > >>> > > we should consider adding a configuration as part of >> that future >> > > > > > >>> work. In >> > > > > > >>> > > fact, if we add a simple "opt-in/opt-out" switch right >> now, we >> > > > > might >> > > > > > >>> find >> > > > > > >>> > > that it's actually insufficient for whatever future >> feature we >> > > > > might >> > > > > > >>> propose, >> > > > > > >>> > > then we have a mess of deprecating the opt-out config >> and >> > > > > replacing >> > > > > > >>> it. >> > > > > > >>> > > >> > > > > > >>> > > What do you think? >> > > > > > >>> > > -John >> > > > > > >>> > > >> > > > > > >>> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna wrote: >> > > > > > >>> > > > Hi all, >> > > > > > >>> > > > >> > > > > > >>> > > > Sorry for the late reply! >> > > > > > >>> > > > >> > > > > > >>> > > > I am also in favour of baby steps. >> > > > > > >>> > > > >> > > > > > >>> > > > I am undecided whether the KIP should contain a >> opt-out config >> > > > > or >> > > > > > >>> not. >> > > > > > >>> > > > The overhead of emit-on-change might affect latency. >> For >> > > > > > >>> applications >> > > > > > >>> > > > where low latency is crucial and there are not too >> many >> > > > > idempotent >> > > > > > >>> > > > updates, it would be better to fall back to >> emit-on-update. >> > > > > > >>> However, >> > > > > > >>> > > > we do not know how much emit-on-change impacts >> latency. We >> > > > > would >> > > > > > >>> first >> > > > > > >>> > > > need to benchmark that before we can decide about the >> > > > > > >>> opt-out-config. >> > > > > > >>> > > > >> > > > > > >>> > > > A metric of dropped idempotent updates seems useful >> to me to be >> > > > > > >>> > > > informed about potential upstream applications or >> upstream >> > > > > > >>> operators >> > > > > > >>> > > > that produce too many idempotent updates. The KIP >> should state >> > > > > the >> > > > > > >>> > > > name of the metric, its group, its tags, and its >> recording >> > > > > level >> > > > > > >>> (see >> > > > > > >>> > > > KIP-444 or KIP-471 for examples). I propose DEBUG as >> reporting >> > > > > > >>> level. >> > > > > > >>> > > > >> > > > > > >>> > > > Richard, what competing proposals for emit-on-change >> for >> > > > > > >>> aggregations >> > > > > > >>> > > > do you mean? I have the feeling that we agreed to get >> rid of >> > > > > > >>> > > > idempotent updates if the aggregate is updated with >> the same >> > > > > key, >> > > > > > >>> > > > value, AND timestamp. I am also fine if we do not >> include this >> > > > > into >> > > > > > >>> > > > this KIP (remember: baby steps). >> > > > > > >>> > > > >> > > > > > >>> > > > You write that "emit-on-change is more correct". >> Since we >> > > > > agreed >> > > > > > >>> that >> > > > > > >>> > > > this is an optimization, IMO you cannot argue this >> way. >> > > > > > >>> > > > >> > > > > > >>> > > > Please put "Alternative Approaches" under "Rejected >> > > > > Alternatives", >> > > > > > >>> so >> > > > > > >>> > > > that it becomes clear that we are not going to >> implement them. >> > > > > In >> > > > > > >>> > > > general, I think the KIP needs a bit of clean-up >> (probably, you >> > > > > > >>> > > > already planned for it). "Design Reasoning" is a bit >> of >> > > > > behavior >> > > > > > >>> > > > changes, rejected alternatives and duplicates a bit >> the >> > > > > content in >> > > > > > >>> > > > those sections. >> > > > > > >>> > > > >> > > > > > >>> > > > I do not like the name "no-op operations" or >> "no-ops", because >> > > > > they >> > > > > > >>> > > > are rather generic. I like more "idempotent updates". >> > > > > > >>> > > > >> > > > > > >>> > > > Best, >> > > > > > >>> > > > Bruno >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu < >> > > > > > >>> yohan.richard...@gmail.com> wrote: >> > > > > > >>> > > > > >> > > > > > >>> > > > > Hi all, >> > > > > > >>> > > > > >> > > > > > >>> > > > > We are definitely making progress! >> > > > > > >>> > > > > >> > > > > > >>> > > > > @John should I emphasize in the proposed behavior >> changes >> > > > > that >> > > > > > >>> we are only >> > > > > > >>> > > > > doing binary equality checks for stateful operators? >> > > > > > >>> > > > > It looks like we have come close to finalizing this >> part of >> > > > > the >> > > > > > >>> KIP. (I >> > > > > > >>> > > > > will note in the KIP that this proposal is intended >> for >> > > > > > >>> optimization, not >> > > > > > >>> > > > > semantics correctness) >> > > > > > >>> > > > > >> > > > > > >>> > > > > I do think maybe we still have one other detail we >> need to >> > > > > > >>> discuss. So far, >> > > > > > >>> > > > > there has been quite a bit of back and forth about >> what the >> > > > > > >>> behavior of >> > > > > > >>> > > > > aggregations should look like in emit on change. I >> have seen >> > > > > > >>> > > > > multiple competing proposals, so I am not >> completely certain >> > > > > > >>> which one we >> > > > > > >>> > > > > should go with, or how we will be able to >> compromise in >> > > > > between >> > > > > > >>> them. >> > > > > > >>> > > > > >> > > > > > >>> > > > > Let me know what your thoughts are on this matter, >> since we >> > > > > are >> > > > > > >>> probably >> > > > > > >>> > > > > close to wrapping up most other stuff. >> > > > > > >>> > > > > @Matthias J. Sax <matth...@confluent.io> and >> @Bruno, see >> > > > > what >> > > > > > >>> you think >> > > > > > >>> > > > > about this. >> > > > > > >>> > > > > >> > > > > > >>> > > > > Best, >> > > > > > >>> > > > > Richard >> > > > > > >>> > > > > >> > > > > > >>> > > > > >> > > > > > >>> > > > > >> > > > > > >>> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler < >> > > > > > >>> vvcep...@apache.org> wrote: >> > > > > > >>> > > > > >> > > > > > >>> > > > > > Thanks, Matthias! >> > > > > > >>> > > > > > >> > > > > > >>> > > > > > Regarding numbers, it would be hard to know how >> many >> > > > > > >>> applications >> > > > > > >>> > > > > > would benefit, since we don't know how many >> applications >> > > > > there >> > > > > > >>> are, >> > > > > > >>> > > > > > or anything about their data sets or topologies. >> We could >> > > > > do a >> > > > > > >>> survey, >> > > > > > >>> > > > > > but it seems overkill if we take the conservative >> approach. >> > > > > > >>> > > > > > >> > > > > > >>> > > > > > I have my own practical stream processing >> experience that >> > > > > > >>> tells me this >> > > > > > >>> > > > > > is absolutely critical for any moderate-to-large >> relational >> > > > > > >>> stream >> > > > > > >>> > > > > > processing use cases. I'll leave it to you to >> decide if you >> > > > > > >>> find that >> > > > > > >>> > > > > > convincing, but it's definitely not an >> _assumption_. I've >> > > > > also >> > > > > > >>> heard from >> > > > > > >>> > > > > > a few Streams users who have already had to >> implement >> > > > > their own >> > > > > > >>> > > > > > noop-suppression transformers in order to get to >> production >> > > > > > >>> scale. >> > > > > > >>> > > > > > >> > > > > > >>> > > > > > Regardless, it sounds like we can agree on taking >> an >> > > > > > >>> opportunistic approach >> > > > > > >>> > > > > > and targeting the optimization just to use a >> > > > > binary-equality >> > > > > > >>> check at >> > > > > > >>> > > > > > stateful operators. (I'd also suggest in sink >> nodes, when >> > > > > we >> > > > > > >>> are about to >> > > > > > >>> > > > > > send old and new values, since they are also >> already >> > > > > present >> > > > > > >>> and serialized >> > > > > > >>> > > > > > at that point.) We could make the KIP even more >> vague, and >> > > > > > >>> just say that >> > > > > > >>> > > > > > we'll drop no-op updates "when possible". >> > > > > > >>> > > > > > >> > > > > > >>> > > > > > I'm curious what Bruno and the others think about >> this. If >> > > > > it >> > > > > > >>> seems like >> > > > > > >>> > > > > > a good starting point, perhaps we could move to a >> vote soon >> > > > > > >>> and get to >> > > > > > >>> > > > > > work on the implementation! >> > > > > > >>> > > > > > >> > > > > > >>> > > > > > Thanks, >> > > > > > >>> > > > > > -John >> > > > > > >>> > > > > > >> > > > > > >>> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J. Sax >> wrote: >> > > > > > >>> > > > > > > Talking about optimizations and reducing >> downstream load: >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > Do we actually have any numbers? I have the >> impression >> > > > > that >> > > > > > >>> this KIP is >> > > > > > >>> > > > > > > more or less build on the _assumption_ that >> there is a >> > > > > > >>> problem. Yes, >> > > > > > >>> > > > > > > there are some use cases that would benefit >> from this; >> > > > > But >> > > > > > >>> how many >> > > > > > >>> > > > > > > applications would actually benefit? And how >> much load >> > > > > > >>> reduction would >> > > > > > >>> > > > > > > they get? >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > The simplest approach (following John idea to >> make baby >> > > > > > >>> steps) would be >> > > > > > >>> > > > > > > to apply the emit-on-change pattern only if >> there is a >> > > > > > >>> store. For this >> > > > > > >>> > > > > > > case we need to serialize old and new result >> anyway and >> > > > > thus >> > > > > > >>> a simple >> > > > > > >>> > > > > > > byte-array comparison is no overhead. >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > Sending `oldValues` by default would become >> expensive >> > > > > > >>> because we would >> > > > > > >>> > > > > > > need to serialize the recomputed old result, as >> well as >> > > > > the >> > > > > > >>> new result, >> > > > > > >>> > > > > > > to make the comparison (and we now the >> serialization is >> > > > > not >> > > > > > >>> cheap). We >> > > > > > >>> > > > > > > are facing a trade-off between CPU overhead and >> > > > > downstream >> > > > > > >>> load and I am >> > > > > > >>> > > > > > > not sure if we should hard code this. My >> original >> > > > > argument >> > > > > > >>> for sending >> > > > > > >>> > > > > > > `oldValues` was about semantics; but for an >> > > > > optimization, I >> > > > > > >>> am not sure >> > > > > > >>> > > > > > > if this would be the right choice. >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > For now, users who want to opt-in can force a >> > > > > > >>> materialization. A >> > > > > > >>> > > > > > > materialization may be expensive and if we see >> future >> > > > > > >>> demand, we could >> > > > > > >>> > > > > > > still add an option to send `oldValues` instead >> of >> > > > > > >>> materialization (this >> > > > > > >>> > > > > > > would at least save the store overhead). As we >> consider >> > > > > the >> > > > > > >>> KIP an >> > > > > > >>> > > > > > > optimization, a "config" seems to make sense. >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > -Matthias >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote: >> > > > > > >>> > > > > > > > Hi John! >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > Thanks for the reply. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > About the changes we have discussed so far. I >> think >> > > > > upon >> > > > > > >>> further >> > > > > > >>> > > > > > > > consideration, we have been mostly talking >> about this >> > > > > from >> > > > > > >>> the >> > > > > > >>> > > > > > perspective >> > > > > > >>> > > > > > > > that no stop-gap effort is acceptable. >> However, in >> > > > > recent >> > > > > > >>> discussion, >> > > > > > >>> > > > > > > if we >> > > > > > >>> > > > > > > > consider optimization, then it appears that >> the >> > > > > > >>> perspective I >> > > > > > >>> > > > > > mentioned no >> > > > > > >>> > > > > > > > longer applies. After all, we are no longer >> concerned >> > > > > so >> > > > > > >>> much about >> > > > > > >>> > > > > > > > semantics correctness, then reducing traffic >> as much as >> > > > > > >>> possible >> > > > > > >>> > > > > > without >> > > > > > >>> > > > > > > > performance tradeoffs. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > In this case, I think a cache would be a good >> idea for >> > > > > > >>> stateless >> > > > > > >>> > > > > > > > operations. This cache will not be backed by >> a store >> > > > > > >>> obviously. We can >> > > > > > >>> > > > > > > > probably use Kafka's ThreadCache. We should >> be able to >> > > > > > >>> catch a large >> > > > > > >>> > > > > > > > portion of the no-ops if we at least store >> some >> > > > > results in >> > > > > > >>> the cache. >> > > > > > >>> > > > > > Not >> > > > > > >>> > > > > > > > all will be caught, but I think the impact >> will be >> > > > > > >>> significant. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > On another note, I think that we should >> implement >> > > > > > >>> competing proposals >> > > > > > >>> > > > > > i.e. >> > > > > > >>> > > > > > > > one where we forward both old and new values >> with a >> > > > > > >>> reasonable >> > > > > > >>> > > > > > proportion >> > > > > > >>> > > > > > > > of artificial no-ops (we do not necessarily >> have to >> > > > > rely >> > > > > > >>> on equals so >> > > > > > >>> > > > > > much >> > > > > > >>> > > > > > > > as comparing the serialized binary data after >> the >> > > > > > >>> operation), and in >> > > > > > >>> > > > > > > > another scenario, the cache for stateless >> ops. It >> > > > > would be >> > > > > > >>> > > > > > unreasonable if >> > > > > > >>> > > > > > > > we completely disregard either approach, >> since they >> > > > > both >> > > > > > >>> have merit. >> > > > > > >>> > > > > > The >> > > > > > >>> > > > > > > > reason for implementing both is to perform >> benchmark >> > > > > tests >> > > > > > >>> on them, and >> > > > > > >>> > > > > > > > compare them with the original. This way, we >> can more >> > > > > > >>> clearly see what >> > > > > > >>> > > > > > is >> > > > > > >>> > > > > > > > the drawbacks and the gains. So far, we have >> been >> > > > > > >>> discussing only >> > > > > > >>> > > > > > > > hypotheticals, and if we continue to do so, I >> think it >> > > > > is >> > > > > > >>> likely no >> > > > > > >>> > > > > > ground >> > > > > > >>> > > > > > > > will be gained. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > After all, what we seek is optimization, and >> > > > > performance >> > > > > > >>> benchmarks >> > > > > > >>> > > > > > > will be >> > > > > > >>> > > > > > > > mandatory for a KIP of this nature. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > Hope this helps, >> > > > > > >>> > > > > > > > Richard >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John Roesler < >> > > > > > >>> vvcep...@apache.org> >> > > > > > >>> > > > > > wrote: >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > >> Hi again, all, >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> Sorry on my part for my silence. >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> I've just taken another look over the recent >> history >> > > > > of >> > > > > > >>> this >> > > > > > >>> > > > > > > discussion. It >> > > > > > >>> > > > > > > >> seems like the #1 point to clarify (because >> it affect >> > > > > > >>> everything >> > > > > > >>> > > > > > else) is >> > > > > > >>> > > > > > > >> that, >> > > > > > >>> > > > > > > >> yes, I was 100% envisioning this as an >> _optimization_. >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> As a consequence, I don't think it's >> critical to make >> > > > > any >> > > > > > >>> hard >> > > > > > >>> > > > > > guarantees >> > > > > > >>> > > > > > > >> about >> > > > > > >>> > > > > > > >> what results get forwarded and what (no-op >> updates) >> > > > > get >> > > > > > >>> dropped. I'd >> > > > > > >>> > > > > > > >> initially >> > > > > > >>> > > > > > > >> just been thinking about doing this >> opportunistically >> > > > > in >> > > > > > >>> cases where >> > > > > > >>> > > > > > we >> > > > > > >>> > > > > > > >> already >> > > > > > >>> > > > > > > >> had the "old" and "new" result in memory, >> thanks to a >> > > > > > >>> request to >> > > > > > >>> > > > > > > "emit old >> > > > > > >>> > > > > > > >> values", or to the implementation of >> timestamp >> > > > > semantics. >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> However, whether or not it's semantically >> critical, I >> > > > > do >> > > > > > >>> think that >> > > > > > >>> > > > > > > >> Matthias's >> > > > > > >>> > > > > > > >> idea to use the change-forwarding mechanism >> to check >> > > > > for >> > > > > > >>> no-ops even >> > > > > > >>> > > > > > on >> > > > > > >>> > > > > > > >> stateless operations is pretty interesting. >> > > > > Specifically, >> > > > > > >>> this would >> > > > > > >>> > > > > > > >> _really_ >> > > > > > >>> > > > > > > >> let you pare down useless updates by using >> mapValues >> > > > > to >> > > > > > >>> strip down >> > > > > > >>> > > > > > > records >> > > > > > >>> > > > > > > >> only >> > > > > > >>> > > > > > > >> to what you really need. However, the >> dependence on >> > > > > the >> > > > > > >>> > > > > > implementation of >> > > > > > >>> > > > > > > >> equals() is troubling. >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> It might make sense to table this idea, as >> well as my >> > > > > > >>> complicated >> > > > > > >>> > > > > > no-op >> > > > > > >>> > > > > > > >> detection algorithm, and initially propose >> just a >> > > > > > >>> nonconfigurable >> > > > > > >>> > > > > > feature >> > > > > > >>> > > > > > > >> to >> > > > > > >>> > > > > > > >> check "old" and "new" results for binary >> equality >> > > > > before >> > > > > > >>> forwarding. >> > > > > > >>> > > > > > > I.e., >> > > > > > >>> > > > > > > >> if >> > > > > > >>> > > > > > > >> any operation determines that the old and >> new results >> > > > > are >> > > > > > >>> > > > > > > >> binary-identical, we >> > > > > > >>> > > > > > > >> would not forward. >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> I'll admit that this doesn't serve Tommy's >> use case >> > > > > very >> > > > > > >>> well, but it >> > > > > > >>> > > > > > > >> might be >> > > > > > >>> > > > > > > >> better to take baby steps with an >> optimization like >> > > > > this >> > > > > > >>> and not risk >> > > > > > >>> > > > > > > >> over-reaching in a way that actually harms >> > > > > performance or >> > > > > > >>> > > > > > correctness. We >> > > > > > >>> > > > > > > >> could >> > > > > > >>> > > > > > > >> always expand the feature to use equals() or >> some >> > > > > kind of >> > > > > > >>> > > > > > ChangeDetector >> > > > > > >>> > > > > > > >> later >> > > > > > >>> > > > > > > >> on, in a more focused discussion. >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> Regarding metrics or debug logs, I guess I >> don't feel >> > > > > > >>> strongly, but it >> > > > > > >>> > > > > > > >> feels >> > > > > > >>> > > > > > > >> like two things will happen that make it >> nicer to add >> > > > > > >>> them: >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> 1. This feature is going to surprise/annoy >> _somebody_, >> > > > > > >>> and it would be >> > > > > > >>> > > > > > > >> nice to >> > > > > > >>> > > > > > > >> be able to definitively say the reason that >> updates >> > > > > are >> > > > > > >>> dropped is >> > > > > > >>> > > > > > that >> > > > > > >>> > > > > > > >> they >> > > > > > >>> > > > > > > >> were no-ops. The easiest smoking gun is if >> there are >> > > > > > >>> debug-logs that >> > > > > > >>> > > > > > > can be >> > > > > > >>> > > > > > > >> enabled. This person might just be looking >> at the >> > > > > > >>> dashboards, >> > > > > > >>> > > > > > > wondering why >> > > > > > >>> > > > > > > >> there are 100K updates per second going into >> their >> > > > > app, >> > > > > > >>> but only 1K >> > > > > > >>> > > > > > > >> results per >> > > > > > >>> > > > > > > >> second coming out. Having the metric there >> makes the >> > > > > > >>> accounting >> > > > > > >>> > > > > > easier. >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> 2. Somebody is going to struggle with >> high-volume >> > > > > > >>> updates, and it >> > > > > > >>> > > > > > > would be >> > > > > > >>> > > > > > > >> nice >> > > > > > >>> > > > > > > >> for them to know that this feature is saving >> them >> > > > > > >>> X-thousand updates >> > > > > > >>> > > > > > per >> > > > > > >>> > > > > > > >> second, >> > > > > > >>> > > > > > > >> etc. >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> What does everyone think about this? Note, >> as I read >> > > > > it, >> > > > > > >>> what I've >> > > > > > >>> > > > > > said >> > > > > > >>> > > > > > > >> above is >> > > > > > >>> > > > > > > >> already reflected in the text of the KIP. >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> Thanks, >> > > > > > >>> > > > > > > >> -John >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard Yu >> wrote: >> > > > > > >>> > > > > > > >>> Hi all, >> > > > > > >>> > > > > > > >>> >> > > > > > >>> > > > > > > >>> Bumping this. If you feel that this KIP is >> not too >> > > > > > >>> urgent. Then let >> > > > > > >>> > > > > > me >> > > > > > >>> > > > > > > >>> know. :) >> > > > > > >>> > > > > > > >>> >> > > > > > >>> > > > > > > >>> Cheers, >> > > > > > >>> > > > > > > >>> Richard >> > > > > > >>> > > > > > > >>> >> > > > > > >>> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard Yu < >> > > > > > >>> > > > > > yohan.richard...@gmail.com> >> > > > > > >>> > > > > > > >>> wrote: >> > > > > > >>> > > > > > > >>> >> > > > > > >>> > > > > > > >>>> Hi all, >> > > > > > >>> > > > > > > >>>> >> > > > > > >>> > > > > > > >>>> I've had just a few thoughts regarding the >> > > > > forwarding >> > > > > > >>> of <key, >> > > > > > >>> > > > > > > >>>> change<old_value, new_value>>. As Matthias >> already >> > > > > > >>> mentioned, there >> > > > > > >>> > > > > > > >> are two >> > > > > > >>> > > > > > > >>>> separate priorities by which we can judge >> this KIP: >> > > > > > >>> > > > > > > >>>> >> > > > > > >>> > > > > > > >>>> 1. A optimization perspective: In this >> case, the >> > > > > user >> > > > > > >>> would prefer >> > > > > > >>> > > > > > the >> > > > > > >>> > > > > > > >>>> impact of this KIP to be as minimal as >> possible. By >> > > > > > >>> such logic, if >> > > > > > >>> > > > > > > >>>> stateless operations are performed twice, >> that could >> > > > > > >>> prove >> > > > > > >>> > > > > > > >> unacceptable for >> > > > > > >>> > > > > > > >>>> them. (since operations can prove >> expensive) >> > > > > > >>> > > > > > > >>>> >> > > > > > >>> > > > > > > >>>> 2. Semantics correctness perspective: >> Unlike the >> > > > > > >>> optimization >> > > > > > >>> > > > > > > >> approach, we >> > > > > > >>> > > > > > > >>>> are more concerned with all KTable >> operations >> > > > > obeying >> > > > > > >>> the same >> > > > > > >>> > > > > > emission >> > > > > > >>> > > > > > > >>>> policy. i.e. emit on change. In this case, >> a >> > > > > > >>> discrepancy would not >> > > > > > >>> > > > > > be >> > > > > > >>> > > > > > > >>>> tolerated, even though an extra >> performance cost >> > > > > will >> > > > > > >>> be incurred. >> > > > > > >>> > > > > > > >>>> Therefore, we will follow Matthias's >> approach, and >> > > > > then >> > > > > > >>> perform the >> > > > > > >>> > > > > > > >>>> operation once on the old value, and once >> on the >> > > > > new. >> > > > > > >>> > > > > > > >>>> >> > > > > > >>> > > > > > > >>>> The issue here I think is more black and >> white than >> > > > > in >> > > > > > >>> between. The >> > > > > > >>> > > > > > > >> second >> > > > > > >>> > > > > > > >>>> option in particular would be favorable >> for users >> > > > > with >> > > > > > >>> inexpensive >> > > > > > >>> > > > > > > >>>> stateless operations, while for the former >> option, >> > > > > we >> > > > > > >>> are probably >> > > > > > >>> > > > > > > >> dealing >> > > > > > >>> > > > > > > >>>> with more expensive ones. So the simplest >> solution >> > > > > is >> > > > > > >>> probably to >> > > > > > >>> > > > > > > >> allow the >> > > > > > >>> > > > > > > >>>> user to choose one of the behaviors, and >> have a >> > > > > config >> > > > > > >>> which can >> > > > > > >>> > > > > > > >> switch in >> > > > > > >>> > > > > > > >>>> between them. >> > > > > > >>> > > > > > > >>>> >> > > > > > >>> > > > > > > >>>> Its the simplest compromise I can come up >> with at >> > > > > the >> > > > > > >>> moment, but if >> > > > > > >>> > > > > > > >> you >> > > > > > >>> > > > > > > >>>> think you have a better plan which could >> better >> > > > > balance >> > > > > > >>> tradeoffs. >> > > > > > >>> > > > > > Then >> > > > > > >>> > > > > > > >>>> please let us know. :) >> > > > > > >>> > > > > > > >>>> >> > > > > > >>> > > > > > > >>>> Best, >> > > > > > >>> > > > > > > >>>> Richard >> > > > > > >>> > > > > > > >>>> >> > > > > > >>> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John >> Roesler < >> > > > > > >>> vvcep...@apache.org> >> > > > > > >>> > > > > > > >> wrote: >> > > > > > >>> > > > > > > >>>> >> > > > > > >>> > > > > > > >>>>> Hi all, >> > > > > > >>> > > > > > > >>>>> >> > > > > > >>> > > > > > > >>>>> Thanks for the thoughtful comments! >> > > > > > >>> > > > > > > >>>>> >> > > > > > >>> > > > > > > >>>>> I need more time to reflect on your >> thoughts, but >> > > > > just >> > > > > > >>> wanted to >> > > > > > >>> > > > > > offer >> > > > > > >>> > > > > > > >>>>> a quick clarification about equals(). >> > > > > > >>> > > > > > > >>>>> >> > > > > > >>> > > > > > > >>>>> I only meant that we can't be sure if a >> class's >> > > > > > >>> equals() >> > > > > > >>> > > > > > > >> implementation >> > > > > > >>> > > > > > > >>>>> returns true for two semantically >> identical >> > > > > instances. >> > > > > > >>> I.e., if a >> > > > > > >>> > > > > > > >> class >> > > > > > >>> > > > > > > >>>>> doesn't >> > > > > > >>> > > > > > > >>>>> override the default equals() >> implementation, then >> > > > > we >> > > > > > >>> would see >> > > > > > >>> > > > > > > >> behavior >> > > > > > >>> > > > > > > >>>>> like: >> > > > > > >>> > > > > > > >>>>> >> > > > > > >>> > > > > > > >>>>> new MyPair("A", 1).equals(new MyPair("A", >> 1)) >> > > > > returns >> > > > > > >>> false >> > > > > > >>> > > > > > > >>>>> >> > > > > > >>> > > > > > > >>>>> In that case, I would still like to catch >> no-op >> > > > > > >>> updates by >> > > > > > >>> > > > > > comparing >> > > > > > >>> > > > > > > >> the >> > > > > > >>> > > > > > > >>>>> serialized form of the records when we >> happen to >> > > > > have >> > > > > > >>> it serialized >> > > > > > >>> > > > > > > >> anyway >> > > > > > >>> > > > > > > >>>>> (such as when the operation is stateful, >> or when >> > > > > we're >> > > > > > >>> sending to a >> > > > > > >>> > > > > > > >>>>> repartition topic and we have both the >> "new" and >> > > > > "old" >> > > > > > >>> value from >> > > > > > >>> > > > > > > >>>>> upstream). >> > > > > > >>> > > > > > > >>>>> >> > > > > > >>> > > > > > > >>>>> I didn't mean to suggest we'd try to use >> > > > > reflection to >> > > > > > >>> detect >> > > > > > >>> > > > > > whether >> > > > > > >>> > > > > > > >>>>> equals >> > > > > > >>> > > > > > > >>>>> is implemented, although that is a neat >> trick. I >> > > > > was >> > > > > > >>> thinking more >> > > > > > >>> > > > > > of >> > > > > > >>> > > > > > > >> a >> > > > > > >>> > > > > > > >>>>> belt-and-suspenders algorithm where we do >> the check >> > > > > > >>> for no-ops >> > > > > > >>> > > > > > based >> > > > > > >>> > > > > > > >> on >> > > > > > >>> > > > > > > >>>>> equals() and then _also_ check the >> serialized bytes >> > > > > > >>> for equality. >> > > > > > >>> > > > > > > >>>>> >> > > > > > >>> > > > > > > >>>>> Thanks, >> > > > > > >>> > > > > > > >>>>> -John >> > > > > > >>> > > > > > > >>>>> >> > > > > > >>> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu >> wrote: >> > > > > > >>> > > > > > > >>>>>> Thanks for the comments, Matthias. >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > > >>>>>> w.r.t. requirement of an `equals()` >> > > > > implementation, >> > > > > > >>> each template >> > > > > > >>> > > > > > > >> type >> > > > > > >>> > > > > > > >>>>>> would have an equals() method. We can >> use the >> > > > > > >>> following code to >> > > > > > >>> > > > > > know >> > > > > > >>> > > > > > > >>>>>> whether it is provided by JVM or >> provided by user. >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > > >>>>>> boolean customEquals = false; >> > > > > > >>> > > > > > > >>>>>> try { >> > > > > > >>> > > > > > > >>>>>> Class cls = >> > > > > value.getClass().getMethod("equals", >> > > > > > >>> > > > > > > >>>>>> Object.class).getDeclaringClass(); >> > > > > > >>> > > > > > > >>>>>> if (!Object.class.equals(cls)) { >> > > > > > >>> > > > > > > >>>>>> customEquals = true; >> > > > > > >>> > > > > > > >>>>>> } >> > > > > > >>> > > > > > > >>>>>> } catch (NoSuchMethodException nsme) { >> > > > > > >>> > > > > > > >>>>>> // equals is always defined, this >> wouldn't hit >> > > > > > >>> > > > > > > >>>>>> } >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > > >>>>>> The next question is: what if the user >> doesn't >> > > > > > >>> provide equals() >> > > > > > >>> > > > > > > >> method ? >> > > > > > >>> > > > > > > >>>>>> Would we automatically fall back to >> > > > > emit-on-update ? >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > > >>>>>> Cheers >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM Matthias >> J. Sax < >> > > > > > >>> mj...@apache.org> >> > > > > > >>> > > > > > > >>>>> wrote: >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > > > First a high level comment: >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > Overall, I would like to make one step back, >> and make >> > > > > sure >> > > > > > >>> we are >> > > > > > >>> > > > > > > > discussion on the same level. Originally, I >> understood >> > > > > > >>> this KIP >> > > > > > >>> > > > > > > >>> as a >> > > > > > >>> > > > > > > > proposed change of _semantics_, however, >> given the >> > > > > latest >> > > > > > >>> > > > > > > >>> discussion >> > > > > > >>> > > > > > > > it seems it's actually not -- it's more an >> > > > > _optimization_ >> > > > > > >>> > > > > > > >>> proposal. >> > > > > > >>> > > > > > > > Hence, we only need to make sure that this >> optimization >> > > > > > >>> does not >> > > > > > >>> > > > > > > >>> break >> > > > > > >>> > > > > > > > existing semantics. It this the right way to >> think >> > > > > about >> > > > > > >>> it? >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > If yes, than it might actually be ok to have >> different >> > > > > > >>> behavior >> > > > > > >>> > > > > > > > depending if there is a materialized KTable >> or not. So >> > > > > > >>> far, we >> > > > > > >>> > > > > > > >>> never >> > > > > > >>> > > > > > > > defined a public contract about our emit >> strategy and >> > > > > it >> > > > > > >>> seems >> > > > > > >>> > > > > > > >>> this >> > > > > > >>> > > > > > > > KIP does not define one either. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > Hence, I don't have as strong of an opinion >> about >> > > > > sending >> > > > > > >>> > > > > > > >>> oldValues >> > > > > > >>> > > > > > > > for example any longer. I guess the question >> is really, >> > > > > > >>> what can >> > > > > > >>> > > > > > > >>> we >> > > > > > >>> > > > > > > > implement in a reasonable way. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > Other comments: >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > @Richard: >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > Can you please add the KIP to the KIP >> overview table: >> > > > > It's >> > > > > > >>> missing >> > > > > > >>> > > > > > > > ( >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > > >>> >> > > > > > >>> > > > > > >> > > > > > >>> >> > > > > >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro >> > > > > > >>> > > > > > > > posals). >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > @Bruno: >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > You mentioned caching. I think it's irrelevant >> > > > > > >>> (orthogonal) and >> > > > > > >>> > > > > > > >>> we can >> > > > > > >>> > > > > > > > discuss this KIP without considering it. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > @John: >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > >>>>>>>>> Even in the source table, we forward >> the >> > > > > updated >> > > > > > >>> record with >> > > > > > >>> > > > > > the >> > > > > > >>> > > > > > > >>>>>>>>> higher of the two timestamps. So the >> example is >> > > > > > >>> more like: >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > That is not correct. Currently, we forward >> with the >> > > > > smaller >> > > > > > >>> > > > > > > > out-of-order timestamp (changing the >> timestamp would >> > > > > > >>> corrupt the >> > > > > > >>> > > > > > > >>> data >> > > > > > >>> > > > > > > > -- we don't know, because we don't check, if >> the value >> > > > > is >> > > > > > >>> the >> > > > > > >>> > > > > > > >>> same >> > > > > > >>> > > > > > > >>>>>> or >> > > > > > >>> > > > > > > > a different one, hence, we must emit the >> out-of-order >> > > > > > >>> record >> > > > > > >>> > > > > > > >>> as-is). >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > If we start to do emit-on-change, we also >> need to emit >> > > > > a >> > > > > > >>> new >> > > > > > >>> > > > > > > >>> record if >> > > > > > >>> > > > > > > > the timestamp changes due to out-of-order >> data, hence, >> > > > > we >> > > > > > >>> would >> > > > > > >>> > > > > > > >>> still >> > > > > > >>> > > > > > > > need to emit <K,V,T1> because that give us >> correct >> > > > > > >>> semantics: >> > > > > > >>> > > > > > > >>> assume >> > > > > > >>> > > > > > > > you have a filter() and afterward use the >> filter >> > > > > KTable in >> > > > > > >>> a >> > > > > > >>> > > > > > > > stream-table join -- the lower T1 timestamp >> must be >> > > > > > >>> propagated to >> > > > > > >>> > > > > > > >>> the >> > > > > > >>> > > > > > > > filtered KTable to ensure that that the >> stream-table >> > > > > join >> > > > > > >>> compute >> > > > > > >>> > > > > > > >>> the >> > > > > > >>> > > > > > > > correct result. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > Your point about requiring an `equals()` >> > > > > implementation is >> > > > > > >>> > > > > > > >>> actually a >> > > > > > >>> > > > > > > > quite interesting one and boils down to my >> statement >> > > > > from >> > > > > > >>> above >> > > > > > >>> > > > > > > >>> about >> > > > > > >>> > > > > > > > "what can we actually implement". What I don't >> > > > > understand >> > > > > > >>> is: >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > >>>>>>>>> This way, we still don't have to rely >> on the >> > > > > > >>> existence of an >> > > > > > >>> > > > > > > >>>>>>>>> equals() method, but if it is there, >> we can >> > > > > > >>> benefit from it. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > Your bullet point (2) says it uses `equals()` >> -- >> > > > > hence, it >> > > > > > >>> seems >> > > > > > >>> > > > > > > >>> we >> > > > > > >>> > > > > > > > actually to rely on it? Also, how can we >> detect if >> > > > > there >> > > > > > >>> is an >> > > > > > >>> > > > > > > > `equals()` method to do the comparison? Would >> be fail >> > > > > if >> > > > > > >>> we don't >> > > > > > >>> > > > > > > >>> have >> > > > > > >>> > > > > > > > `equals()` nor corresponding serializes to do >> the >> > > > > > >>> comparison? >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we >> absolutely need >> > > > > > >>> metrics and >> > > > > > >>> > > > > > > >>> logs if >> > > > > > >>> > > > > > > >>>>>>>>> we're going to drop any records. And, >> yes, we >> > > > > > >>> should propose >> > > > > > >>> > > > > > > >>>>>>>>> metrics and logs that are similar to >> the >> > > > > existing >> > > > > > >>> ones when we >> > > > > > >>> > > > > > > >>> drop >> > > > > > >>> > > > > > > >>>>>>>>> records for other reasons. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > I am not sure about this point. In fact, we >> have >> > > > > already >> > > > > > >>> some >> > > > > > >>> > > > > > > >>> no-ops >> > > > > > >>> > > > > > > > in Kafka Streams in our join-operators and >> don't report >> > > > > > >>> any of >> > > > > > >>> > > > > > > >>> those >> > > > > > >>> > > > > > > > either. Emit-on-change is operator semantics >> and I >> > > > > don't >> > > > > > >>> see why >> > > > > > >>> > > > > > > >>> we >> > > > > > >>> > > > > > > > would need to have a metric for it? It seems >> to be >> > > > > quite >> > > > > > >>> different >> > > > > > >>> > > > > > > > compared to dropping late or malformed >> records. >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > -Matthias >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote: >> > > > > > >>> > > > > > > >>>>>>>>> Thanks John for your thoughtful >> reply. Some >> > > > > > >>> comments inline. >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 -0600, >> John Roesler >> > > > > > >>> wrote: >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This >> email was >> > > > > sent >> > > > > > >>> from outside >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or >> attachments >> > > > > > >>> unless you >> > > > > > >>> > > > > > expected >> > > > > > >>> > > > > > > >>>>>>>>>> them. >> ________________________________ >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Tommy, >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> Thanks for the context. I can see the >> > > > > attraction >> > > > > > >>> of >> > > > > > >>> > > > > > considering >> > > > > > >>> > > > > > > >>>>>>>>>> these use cases together. >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> To answer your question, if a part >> of the >> > > > > record >> > > > > > >>> is not >> > > > > > >>> > > > > > > >>> relevant >> > > > > > >>> > > > > > > >>>>>>>>>> to downstream consumers, I was >> thinking you >> > > > > could >> > > > > > >>> just use a >> > > > > > >>> > > > > > > >>>>>>>>>> mapValue to remove it. >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a >> join between >> > > > > two >> > > > > > >>> tables. >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> employeeInfo.join( employeePayroll, >> (info, >> > > > > > >>> payroll) -> new >> > > > > > >>> > > > > > > >>>>>>>>>> Result(info.name(), >> payroll.salary()) ) >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> We only care about one attribute >> from the Info >> > > > > > >>> table (name), >> > > > > > >>> > > > > > > >>> and >> > > > > > >>> > > > > > > >>>>>>>>>> one from the Payroll table (salary), >> and these >> > > > > > >>> attributes >> > > > > > >>> > > > > > > >>> change >> > > > > > >>> > > > > > > >>>>>>>>>> rarely. On the other hand, there >> might be many >> > > > > > >>> other >> > > > > > >>> > > > > > attributes >> > > > > > >>> > > > > > > >>>>>>>>>> that change frequently of these >> tables. We can >> > > > > > >>> avoid >> > > > > > >>> > > > > > triggering >> > > > > > >>> > > > > > > >>>>>>>>>> the join unnecessarily by mapping >> the input >> > > > > > >>> tables to drop the >> > > > > > >>> > > > > > > >>>>>>>>>> unnecessary information before the >> join: >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> names = employeeInfo.mapValues(info >> -> >> > > > > info.name()) >> > > > > > >>> salaries >> > > > > > >>> > > > > > = >> > > > > > >>> > > > > > > >>>>>>>>>> employeePayroll.mapValues(payroll -> >> > > > > > >>> payroll.salary()) >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> names.join( salaries, (name, salary) >> -> new >> > > > > > >>> Result(name, >> > > > > > >>> > > > > > > >>> salary) >> > > > > > >>> > > > > > > >>>>>>>>>> ) >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> Ahh yes I see. This works, but in the >> case >> > > > > where >> > > > > > >>> you're using >> > > > > > >>> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it >> seems like >> > > > > this >> > > > > > >>> approach >> > > > > > >>> > > > > > could >> > > > > > >>> > > > > > > >>>>>>>>> lead to a proliferation of "skinny" >> record >> > > > > types >> > > > > > >>> that just drop >> > > > > > >>> > > > > > > >>>>>>>>> various fields. >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> Especially if we take Matthias's >> idea to drop >> > > > > > >>> non-changes even >> > > > > > >>> > > > > > > >>>>>>>>>> for stateless operations, this would >> be quite >> > > > > > >>> efficient and is >> > > > > > >>> > > > > > > >>>>>>>>>> also a very straightforward >> optimization to >> > > > > > >>> understand once >> > > > > > >>> > > > > > you >> > > > > > >>> > > > > > > >>>>>>>>>> know that Streams provides >> emit-on-change. >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> From the context that you provided, >> it seems >> > > > > like >> > > > > > >>> a slightly >> > > > > > >>> > > > > > > >>>>>>>>>> different situation, though. Reading >> between >> > > > > the >> > > > > > >>> lines a >> > > > > > >>> > > > > > > >>> little, >> > > > > > >>> > > > > > > >>>>>>>>>> it sounds like: in contrast to the >> example >> > > > > above, >> > > > > > >>> in which we >> > > > > > >>> > > > > > > >>> are >> > > > > > >>> > > > > > > >>>>>>>>>> filtering out extra _data_, you have >> some >> > > > > extra >> > > > > > >>> _metadata_ >> > > > > > >>> > > > > > that >> > > > > > >>> > > > > > > >>>>>>>>>> you still wish to pass down with the >> data when >> > > > > > >>> there is a >> > > > > > >>> > > > > > > >>> "real" >> > > > > > >>> > > > > > > >>>>>>>>>> update, but you don't want the >> metadata >> > > > > itself to >> > > > > > >>> cause an >> > > > > > >>> > > > > > > >>>>>>>>>> update. >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> Despite my lack of clarity, yes >> you've got it >> > > > > > >>> right ;) This >> > > > > > >>> > > > > > > >>>>>>>>> particular processor is the first >> stop for this >> > > > > > >>> data after >> > > > > > >>> > > > > > > >>> coming >> > > > > > >>> > > > > > > >>>>>>>>> in from external users, who often >> simply post >> > > > > the >> > > > > > >>> same content >> > > > > > >>> > > > > > > >>> each >> > > > > > >>> > > > > > > >>>>>>>>> time and we're trying to shield >> downstream >> > > > > > >>> consumers from >> > > > > > >>> > > > > > > >>>>>>>>> unnecessary churn. >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> It does seem handy to be able to >> plug in a >> > > > > custom >> > > > > > >>> > > > > > > >>> ChangeDetector >> > > > > > >>> > > > > > > >>>>>>>>>> for this purpose, but I worry about >> the API >> > > > > > >>> complexity. Maybe >> > > > > > >>> > > > > > > >>> you >> > > > > > >>> > > > > > > >>>>>>>>>> can help think though how to provide >> the same >> > > > > > >>> benefit while >> > > > > > >>> > > > > > > >>>>>>>>>> limiting user-facing complexity. >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> Here's some extra context to >> consider: >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> We currently don't make any extra >> requirements >> > > > > > >>> about the >> > > > > > >>> > > > > > nature >> > > > > > >>> > > > > > > >>>>>>>>>> of data that you can use in Streams. >> For >> > > > > example, >> > > > > > >>> you don't >> > > > > > >>> > > > > > > >>> have >> > > > > > >>> > > > > > > >>>>>>>>>> to implement hashCode and equals, or >> > > > > compareTo, >> > > > > > >>> etc. With the >> > > > > > >>> > > > > > > >>>>>>>>>> current proposal, we can do an >> airtight >> > > > > > >>> comparison based only >> > > > > > >>> > > > > > > >>> on >> > > > > > >>> > > > > > > >>>>>>>>>> the serialized form of the values, >> and we >> > > > > > >>> actually don't have >> > > > > > >>> > > > > > > >>> to >> > > > > > >>> > > > > > > >>>>>>>>>> deserialize the "prior" value at all >> for a >> > > > > large >> > > > > > >>> number of >> > > > > > >>> > > > > > > >>>>>>>>>> operations. Admitedly, if we extend >> the >> > > > > proposal >> > > > > > >>> to include >> > > > > > >>> > > > > > > >>> no-op >> > > > > > >>> > > > > > > >>>>>>>>>> detection for stateless operations, >> we'd >> > > > > probably >> > > > > > >>> need to rely >> > > > > > >>> > > > > > > >>> on >> > > > > > >>> > > > > > > >>>>>>>>>> equals() for no-op checking, >> otherwise we'd >> > > > > wind >> > > > > > >>> up requiring >> > > > > > >>> > > > > > > >>>>>>>>>> serdes for stateless operations as >> well. >> > > > > > >>> Actually, I'd >> > > > > > >>> > > > > > probably >> > > > > > >>> > > > > > > >>>>>>>>>> argue for doing exactly that: >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> 1. In stateful operations, drop if >> the >> > > > > serialized >> > > > > > >>> byte[]s are >> > > > > > >>> > > > > > > >>> the >> > > > > > >>> > > > > > > >>>>>>>>>> same. After deserializing, also drop >> if the >> > > > > > >>> objects are equal >> > > > > > >>> > > > > > > >>>>>>>>>> according to Object#equals(). >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> 2. In stateless operations, compare >> the "new" >> > > > > and >> > > > > > >>> "old" values >> > > > > > >>> > > > > > > >>>>>>>>>> (if "old" is available) based on >> > > > > Object#equals(). >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> 3. As a final optimization, after >> serializing >> > > > > and >> > > > > > >>> before >> > > > > > >>> > > > > > > >>> sending >> > > > > > >>> > > > > > > >>>>>>>>>> repartition records, compare the >> serialized >> > > > > data >> > > > > > >>> and drop >> > > > > > >>> > > > > > > >>>>>>>>>> no-ops. >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> This way, we still don't have to >> rely on the >> > > > > > >>> existence of an >> > > > > > >>> > > > > > > >>>>>>>>>> equals() method, but if it is there, >> we can >> > > > > > >>> benefit from it. >> > > > > > >>> > > > > > > >>>>>>>>>> Also, we don't require a serde in >> any new >> > > > > > >>> situations, but we >> > > > > > >>> > > > > > > >>> can >> > > > > > >>> > > > > > > >>>>>>>>>> still leverage it when it is >> available. >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> For clarity, in my example above, >> even if the >> > > > > > >>> employeeInfo and >> > > > > > >>> > > > > > > >>>>>>>>>> employeePayroll and Result records >> all have >> > > > > > >>> serdes, we need >> > > > > > >>> > > > > > the >> > > > > > >>> > > > > > > >>>>>>>>>> "name" field (presumably String) and >> the >> > > > > "salary" >> > > > > > >>> field >> > > > > > >>> > > > > > > >>>>>>>>>> (presumable a Double) to have serdes >> as well >> > > > > in >> > > > > > >>> the naive >> > > > > > >>> > > > > > > >>>>>>>>>> implementation. But if we can >> leverage >> > > > > equals(), >> > > > > > >>> then the >> > > > > > >>> > > > > > > >>> "right >> > > > > > >>> > > > > > > >>>>>>>>>> thing" happens automatically. >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> I still don't totally follow why the >> individual >> > > > > > >>> components >> > > > > > >>> > > > > > > >>> (name, >> > > > > > >>> > > > > > > >>>>>>>>> salary) would have to have serdes >> here. If >> > > > > Result >> > > > > > >>> has one, we >> > > > > > >>> > > > > > > >>>>>>>>> compare bytes, and if Result >> additionally has >> > > > > an >> > > > > > >>> equals() >> > > > > > >>> > > > > > method >> > > > > > >>> > > > > > > >>>>>>>>> (which presumably includes equals >> comparisons >> > > > > on >> > > > > > >>> the >> > > > > > >>> > > > > > constituent >> > > > > > >>> > > > > > > >>>>>>>>> fields), have we not covered our >> bases? >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> This dovetails in with my primary UX >> concern; >> > > > > > >>> where would the >> > > > > > >>> > > > > > > >>>>>>>>>> ChangeDetector actually be >> registered? None of >> > > > > > >>> the operators >> > > > > > >>> > > > > > in >> > > > > > >>> > > > > > > >>>>>>>>>> my example have names or topics or >> any other >> > > > > > >>> identifiable >> > > > > > >>> > > > > > > >>>>>>>>>> characteristic that could be passed >> to a >> > > > > > >>> ChangeDetector class >> > > > > > >>> > > > > > > >>>>>>>>>> registered via config. You could say >> that we >> > > > > make >> > > > > > >>> > > > > > > >>> ChangeDetector >> > > > > > >>> > > > > > > >>>>>>>>>> an optional parameter to every >> operation in >> > > > > > >>> Streams, but this >> > > > > > >>> > > > > > > >>>>>>>>>> seems to carry quite a bit of mental >> burden >> > > > > with >> > > > > > >>> it. People >> > > > > > >>> > > > > > > >>> will >> > > > > > >>> > > > > > > >>>>>>>>>> wonder what it's for and whether or >> not they >> > > > > > >>> should be using >> > > > > > >>> > > > > > > >>> it. >> > > > > > >>> > > > > > > >>>>>>>>>> There would almost certainly be a >> > > > > misconception >> > > > > > >>> that it's >> > > > > > >>> > > > > > > >>>>>>>>>> preferable to implement it always, >> which >> > > > > would be >> > > > > > >>> unfortunate. >> > > > > > >>> > > > > > > >>>>>>>>>> Plus, to actually implment metadata >> flowing >> > > > > > >>> through the >> > > > > > >>> > > > > > > >>> topology >> > > > > > >>> > > > > > > >>>>>>>>>> as in your use case, you'd have to >> do two >> > > > > things: >> > > > > > >>> 1. make sure >> > > > > > >>> > > > > > > >>>>>>>>>> that all operations actually >> preserve the >> > > > > > >>> metadata alongside >> > > > > > >>> > > > > > > >>> the >> > > > > > >>> > > > > > > >>>>>>>>>> data (e.g., don't accidentally add a >> mapValues >> > > > > > >>> like I did, or >> > > > > > >>> > > > > > > >>> you >> > > > > > >>> > > > > > > >>>>>>>>>> drop the metadata). 2. implement a >> > > > > ChangeDetector >> > > > > > >>> for every >> > > > > > >>> > > > > > > >>>>>>>>>> single operation in the topology, or >> you don't >> > > > > > >>> get the benefit >> > > > > > >>> > > > > > > >>> of >> > > > > > >>> > > > > > > >>>>>>>>>> dropping non-changes internally 2b. >> > > > > > >>> Alternatively, you could >> > > > > > >>> > > > > > > >>> just >> > > > > > >>> > > > > > > >>>>>>>>>> add the ChangeDetector to one >> operation toward >> > > > > > >>> the end of the >> > > > > > >>> > > > > > > >>>>>>>>>> topology. This would not drop >> redundant >> > > > > > >>> computation >> > > > > > >>> > > > > > internally, >> > > > > > >>> > > > > > > >>>>>>>>>> but only drop redundant _outputs_. >> But this is >> > > > > > >>> just about the >> > > > > > >>> > > > > > > >>>>>>>>>> same as your current solution. >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> I definitely see your point regarding >> > > > > > >>> configuration. I was >> > > > > > >>> > > > > > > >>>>>>>>> originally thinking about this when >> the >> > > > > > >>> deduplication was going >> > > > > > >>> > > > > > > >>> to >> > > > > > >>> > > > > > > >>>>>>>>> be opt-in, and it seemed very natural >> to say >> > > > > > >>> something like: >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> employeeInfo.join(employeePayroll, >> (info, >> > > > > payroll) >> > > > > > >>> -> new >> > > > > > >>> > > > > > > >>>>>>>>> Result(info.name(), >> payroll.salary())) >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> .suppress(duplicatesAccordingTo(someChangeDetector)) >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> Alternatively you can imagine a >> similar method >> > > > > > >>> being on >> > > > > > >>> > > > > > > >>>>>>>>> Materialized, though obviously this >> makes less >> > > > > > >>> sense if we >> > > > > > >>> > > > > > don't >> > > > > > >>> > > > > > > >>>>>>>>> want to require materialization. If >> we're now >> > > > > > >>> talking about >> > > > > > >>> > > > > > > >>>>>>>>> changing the default behavior and not >> having >> > > > > any >> > > > > > >>> configuration >> > > > > > >>> > > > > > > >>>>>>>>> options, it's harder to find a place >> for this. >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> A final thought; if it really is a >> metadata >> > > > > > >>> question, can we >> > > > > > >>> > > > > > > >>> just >> > > > > > >>> > > > > > > >>>>>>>>>> plan to finish up the support for >> headers in >> > > > > > >>> Streams? I.e., >> > > > > > >>> > > > > > > >>> give >> > > > > > >>> > > > > > > >>>>>>>>>> you a way to control the way that >> headers flow >> > > > > > >>> through the >> > > > > > >>> > > > > > > >>>>>>>>>> topology? Then, we could treat >> headers the >> > > > > same >> > > > > > >>> way we treat >> > > > > > >>> > > > > > > >>>>>>>>>> timestamps in the no-op checking... >> We >> > > > > completely >> > > > > > >>> ignore them >> > > > > > >>> > > > > > > >>>>>>>>>> for the sake of comparison. Thus, >> neither the >> > > > > > >>> timestamp nor >> > > > > > >>> > > > > > the >> > > > > > >>> > > > > > > >>>>>>>>>> headers would get updated in >> internal state >> > > > > or in >> > > > > > >>> downstream >> > > > > > >>> > > > > > > >>>>>>>>>> views as long as the value itself >> doesn't >> > > > > change. >> > > > > > >>> This seems >> > > > > > >>> > > > > > to >> > > > > > >>> > > > > > > >>>>>>>>>> give us a way to support your use >> case without >> > > > > > >>> adding to the >> > > > > > >>> > > > > > > >>>>>>>>>> mental overhead of using Streams for >> simple >> > > > > > >>> things. >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> Agree headers could be a decent fit >> for this >> > > > > > >>> particular case >> > > > > > >>> > > > > > > >>>>>>>>> because it's mostly metadata, though >> to be >> > > > > honest >> > > > > > >>> we haven't >> > > > > > >>> > > > > > > >>> looked >> > > > > > >>> > > > > > > >>>>>>>>> at headers much (mostly because, and >> to your >> > > > > > >>> point, support >> > > > > > >>> > > > > > > >>> seems >> > > > > > >>> > > > > > > >>>>>>>>> to be lacking). I feel like there >> would be >> > > > > other >> > > > > > >>> cases where >> > > > > > >>> > > > > > > >>> this >> > > > > > >>> > > > > > > >>>>>>>>> feature could be valuable, but I >> admit I can't >> > > > > > >>> come up with >> > > > > > >>> > > > > > > >>>>>>>>> anything right this second. Perhaps >> yuzhihong >> > > > > had >> > > > > > >>> an example in >> > > > > > >>> > > > > > > >>>>>>>>> mind? >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., simple things should be easy, >> and >> > > > > complex >> > > > > > >>> things should >> > > > > > >>> > > > > > > >>> be >> > > > > > >>> > > > > > > >>>>>>>>>> possible. >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> What are your thoughts? Thanks, -John >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19, >> Thomas Becker >> > > > > > >>> wrote: >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> Hi John, Can you describe how you'd >> use >> > > > > > >>> filtering/mapping to >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate records? To give some >> background >> > > > > on >> > > > > > >>> my suggestion >> > > > > > >>> > > > > > > >>> we >> > > > > > >>> > > > > > > >>>>>>>>>> currently have a small stream >> processor that >> > > > > > >>> exists solely to >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate, which we do using a >> process that >> > > > > I >> > > > > > >>> assume would >> > > > > > >>> > > > > > be >> > > > > > >>> > > > > > > >>>>>>>>>> similar to what would be done here >> (with a >> > > > > store >> > > > > > >>> of keys and >> > > > > > >>> > > > > > > >>> hash >> > > > > > >>> > > > > > > >>>>>>>>>> values). But the records we are >> deduplicating >> > > > > > >>> have some >> > > > > > >>> > > > > > > >>> metadata >> > > > > > >>> > > > > > > >>>>>>>>>> fields (such as timestamps of when >> the record >> > > > > was >> > > > > > >>> posted) that >> > > > > > >>> > > > > > > >>> we >> > > > > > >>> > > > > > > >>>>>>>>>> don't consider semantically >> meaningful for >> > > > > > >>> downstream >> > > > > > >>> > > > > > > >>> consumers, >> > > > > > >>> > > > > > > >>>>>>>>>> and therefore we also suppress >> updates that >> > > > > only >> > > > > > >>> touch those >> > > > > > >>> > > > > > > >>>>>>>>>> fields. >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> -Tommy >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30 -0600, >> John >> > > > > Roesler >> > > > > > >>> wrote: >> > > > > > >>> > > > > > > >>> [EXTERNAL >> > > > > > >>> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was >> sent from >> > > > > > >>> outside TiVo. DO >> > > > > > >>> > > > > > NOT >> > > > > > >>> > > > > > > >>>>>>>>>> CLICK any links or attachments >> unless you >> > > > > > >>> expected them. >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________ >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s an >> interesting >> > > > > > >>> idea. Can you >> > > > > > >>> > > > > > > >>> help >> > > > > > >>> > > > > > > >>>>>>>>>> think of a use case that isn’t also >> served by >> > > > > > >>> filtering or >> > > > > > >>> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for >> helping to >> > > > > design >> > > > > > >>> this feature! >> > > > > > >>> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at 18:56, >> > > > > > >>> yuzhih...@gmail.com >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com> wrote: >> I think >> > > > > this >> > > > > > >>> is good >> > > > > > >>> > > > > > > >>> idea. On >> > > > > > >>> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas >> Becker < >> > > > > > >>> > > > > > > >>> thomas.bec...@tivo.com >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com>> >> wrote: How >> > > > > do >> > > > > > >>> folks feel >> > > > > > >>> > > > > > > >>> about >> > > > > > >>> > > > > > > >>>>>>>>>> allowing the mechanism by which >> no-ops are >> > > > > > >>> detected to be >> > > > > > >>> > > > > > > >>>>>>>>>> pluggable? Meaning use something >> like a hash >> > > > > by >> > > > > > >>> default, but >> > > > > > >>> > > > > > > >>> you >> > > > > > >>> > > > > > > >>>>>>>>>> could optionally provide an >> implementation of >> > > > > > >>> something to use >> > > > > > >>> > > > > > > >>>>>>>>>> instead, like a ChangeDetector. This >> could be >> > > > > > >>> useful for >> > > > > > >>> > > > > > > >>> example >> > > > > > >>> > > > > > > >>>>>>>>>> to ignore changes to certain fields, >> which may >> > > > > > >>> not be relevant >> > > > > > >>> > > > > > > >>> to >> > > > > > >>> > > > > > > >>>>>>>>>> the operation being performed. >> > > > > > >>> > > > > > ________________________________ >> > > > > > >>> > > > > > > >>>>>>>>>> From: John Roesler < >> vvcep...@apache.org >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>> Sent: >> Friday, >> > > > > > >>> January 31, 2020 >> > > > > > >>> > > > > > > >>> 4:51 >> > > > > > >>> > > > > > > >>>>>>>>>> PM To: dev@kafka.apache.org <mailto: >> > > > > > >>> dev@kafka.apache.org> >> > > > > > >>> > > > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto: >> > > > > > >>> dev@kafka.apache.org>> Subject: >> > > > > > >>> > > > > > > >>> Re: >> > > > > > >>> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change >> support for >> > > > > Kafka >> > > > > > >>> Streams >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This >> email was >> > > > > sent >> > > > > > >>> from outside >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or >> attachments >> > > > > > >>> unless you >> > > > > > >>> > > > > > expected >> > > > > > >>> > > > > > > >>>>>>>>>> them. >> ________________________________ >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> Hello all, Sorry for my silence. It >> seems >> > > > > like we >> > > > > > >>> are getting >> > > > > > >>> > > > > > > >>>>>>>>>> close to consensus. Hopefully, we >> could move >> > > > > to a >> > > > > > >>> vote soon! >> > > > > > >>> > > > > > > >>> All >> > > > > > >>> > > > > > > >>>>>>>>>> of the reasoning from Matthias and >> Bruno >> > > > > around >> > > > > > >>> timestamp is >> > > > > > >>> > > > > > > >>>>>>>>>> compelling. I would be strongly in >> favor of >> > > > > > >>> stating a few >> > > > > > >>> > > > > > > >>> things >> > > > > > >>> > > > > > > >>>>>>>>>> very clearly in the KIP: 1. Streams >> will drop >> > > > > > >>> no-op updates >> > > > > > >>> > > > > > > >>> only >> > > > > > >>> > > > > > > >>>>>>>>>> for KTable operations. That is, we >> won't make >> > > > > any >> > > > > > >>> changes to >> > > > > > >>> > > > > > > >>>>>>>>>> KStream aggregations at the moment. >> It does >> > > > > seem >> > > > > > >>> like we can >> > > > > > >>> > > > > > > >>>>>>>>>> potentially revisit the time >> semantics of that >> > > > > > >>> operation in >> > > > > > >>> > > > > > the >> > > > > > >>> > > > > > > >>>>>>>>>> future, but we don't need to do it >> now. On the >> > > > > > >>> other hand, the >> > > > > > >>> > > > > > > >>>>>>>>>> proposed semantics for KTable >> timestamps >> > > > > (marking >> > > > > > >>> the >> > > > > > >>> > > > > > beginning >> > > > > > >>> > > > > > > >>>>>>>>>> of the validity of that record) >> makes sense to >> > > > > > >>> me. 2. Streams >> > > > > > >>> > > > > > > >>>>>>>>>> will only drop no-op updates for >> _stateful_ >> > > > > > >>> KTable operations. >> > > > > > >>> > > > > > > >>> We >> > > > > > >>> > > > > > > >>>>>>>>>> don't want to add a hard guarantee >> that >> > > > > Streams >> > > > > > >>> will _never_ >> > > > > > >>> > > > > > > >>>>>>>>>> emit a no-op table update because it >> would >> > > > > > >>> require adding >> > > > > > >>> > > > > > state >> > > > > > >>> > > > > > > >>>>>>>>>> to otherwise stateless operations. >> If someone >> > > > > is >> > > > > > >>> really >> > > > > > >>> > > > > > > >>> concerned >> > > > > > >>> > > > > > > >>>>>>>>>> about a particular stateless >> operation >> > > > > producing >> > > > > > >>> a lot of >> > > > > > >>> > > > > > no-op >> > > > > > >>> > > > > > > >>>>>>>>>> results, all they have to do is >> materialize >> > > > > it, >> > > > > > >>> and Streams >> > > > > > >>> > > > > > > >>> would >> > > > > > >>> > > > > > > >>>>>>>>>> automatically drop the no-ops. >> Additionally, >> > > > > I'm >> > > > > > >>> +1 on not >> > > > > > >>> > > > > > > >>> adding >> > > > > > >>> > > > > > > >>>>>>>>>> an opt-out at this time. Regarding >> the KIP >> > > > > > >>> itself, I would >> > > > > > >>> > > > > > > >>> clean >> > > > > > >>> > > > > > > >>>>>>>>>> it up a bit before calling for a >> vote. There >> > > > > is a >> > > > > > >>> lot of >> > > > > > >>> > > > > > > >>>>>>>>>> "discussion"-type language there, >> which is >> > > > > very >> > > > > > >>> natural to >> > > > > > >>> > > > > > > >>> read, >> > > > > > >>> > > > > > > >>>>>>>>>> but makes it a bit hard to see what >> _exactly_ >> > > > > the >> > > > > > >>> kip is >> > > > > > >>> > > > > > > >>>>>>>>>> proposing. Richard, would you mind >> just making >> > > > > > >>> the "proposed >> > > > > > >>> > > > > > > >>>>>>>>>> behavior change" a simple and >> succinct list of >> > > > > > >>> bullet points? >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., please drop glue phrases like >> "there has >> > > > > > >>> been some >> > > > > > >>> > > > > > > >>>>>>>>>> discussion" or "possibly we could do >> X". For >> > > > > the >> > > > > > >>> final version >> > > > > > >>> > > > > > > >>> of >> > > > > > >>> > > > > > > >>>>>>>>>> the KIP, it should just say, >> "Streams will do >> > > > > X, >> > > > > > >>> Streams will >> > > > > > >>> > > > > > > >>> do >> > > > > > >>> > > > > > > >>>>>>>>>> Y". Feel free to add an elaboration >> section to >> > > > > > >>> explain more >> > > > > > >>> > > > > > > >>> about >> > > > > > >>> > > > > > > >>>>>>>>>> what X and Y mean, but we don't need >> to talk >> > > > > about >> > > > > > >>> > > > > > > >>> possibilities >> > > > > > >>> > > > > > > >>>>>>>>>> or alternatives except in the >> "rejected >> > > > > > >>> alternatives" section. >> > > > > > >>> > > > > > > >>>>>>>>>> Accordingly, can you also move the >> options you >> > > > > > >>> presented in >> > > > > > >>> > > > > > the >> > > > > > >>> > > > > > > >>>>>>>>>> intro to the "rejected alternatives" >> section >> > > > > and >> > > > > > >>> only mention >> > > > > > >>> > > > > > > >>> the >> > > > > > >>> > > > > > > >>>>>>>>>> final proposal itself? This just >> really helps >> > > > > > >>> reviewers to >> > > > > > >>> > > > > > know >> > > > > > >>> > > > > > > >>>>>>>>>> what they are voting for, and it >> helps >> > > > > everyone >> > > > > > >>> after the fact >> > > > > > >>> > > > > > > >>>>>>>>>> when they are trying to get clarity >> on what >> > > > > > >>> exactly the >> > > > > > >>> > > > > > > >>> proposal >> > > > > > >>> > > > > > > >>>>>>>>>> is, versus all the things it could >> have been. >> > > > > > >>> Thanks, -John >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14, >> Richard Yu >> > > > > wrote: >> > > > > > >>> Hello to >> > > > > > >>> > > > > > all, >> > > > > > >>> > > > > > > >>>>>>>>>> I've finished making some initial >> > > > > modifications >> > > > > > >>> to the KIP. I >> > > > > > >>> > > > > > > >>>>>>>>>> have decided to keep the >> implementation >> > > > > section >> > > > > > >>> in the KIP for >> > > > > > >>> > > > > > > >>>>>>>>>> record-keeping purposes. For now, we >> should >> > > > > focus >> > > > > > >>> on only the >> > > > > > >>> > > > > > > >>>>>>>>>> proposed behavior changes instead. >> See if you >> > > > > > >>> have any >> > > > > > >>> > > > > > > >>> comments! >> > > > > > >>> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25, 2020 >> at 11:12 >> > > > > AM >> > > > > > >>> Richard Yu >> > > > > > >>> > > > > > > >>>>>>>>>> <yohan.richard...@gmail.com <mailto: >> > > > > > >>> > > > > > yohan.richard...@gmail.com >> > > > > > >>> > > > > > > >>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all the >> discussion! >> > > > > > >>> @John and @Bruno >> > > > > > >>> > > > > > > >>> I >> > > > > > >>> > > > > > > >>>>>>>>>> will survey other possible systems >> and see >> > > > > what I >> > > > > > >>> can do. Just >> > > > > > >>> > > > > > > >>> a >> > > > > > >>> > > > > > > >>>>>>>>>> question, by systems, I suppose you >> would mean >> > > > > > >>> the pros and >> > > > > > >>> > > > > > > >>> cons >> > > > > > >>> > > > > > > >>>>>>>>>> of different reporting strategies? >> I'm not >> > > > > > >>> completely certain >> > > > > > >>> > > > > > > >>> on >> > > > > > >>> > > > > > > >>>>>>>>>> this point, so it would be great if >> you can >> > > > > > >>> clarify on that. >> > > > > > >>> > > > > > So >> > > > > > >>> > > > > > > >>>>>>>>>> here's what I got from all the >> discussion so >> > > > > far: >> > > > > > >>> - Since both >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias and John seems to have come >> to a >> > > > > > >>> consensus on this, >> > > > > > >>> > > > > > > >>> then >> > > > > > >>> > > > > > > >>>>>>>>>> we will go for an all-round >> behavorial change >> > > > > for >> > > > > > >>> KTables. >> > > > > > >>> > > > > > > >>> After >> > > > > > >>> > > > > > > >>>>>>>>>> some thought, I decided that for >> now, an >> > > > > opt-out >> > > > > > >>> config will >> > > > > > >>> > > > > > > >>> not >> > > > > > >>> > > > > > > >>>>>>>>>> be added. As John have pointed out, >> no-op >> > > > > changes >> > > > > > >>> tend to >> > > > > > >>> > > > > > > >>>>>>>>>> explode further down the topology as >> they are >> > > > > > >>> forwarded to >> > > > > > >>> > > > > > more >> > > > > > >>> > > > > > > >>>>>>>>>> and more processor nodes downstream. >> - About >> > > > > > >>> using hash codes, >> > > > > > >>> > > > > > > >>>>>>>>>> after some explanation from John, it >> looks >> > > > > like >> > > > > > >>> hash codes >> > > > > > >>> > > > > > > >>> might >> > > > > > >>> > > > > > > >>>>>>>>>> not be as ideal (for >> implementation). For >> > > > > now, we >> > > > > > >>> will omit >> > > > > > >>> > > > > > > >>> that >> > > > > > >>> > > > > > > >>>>>>>>>> detail, and save it for the PR. - >> @Bruno You >> > > > > do >> > > > > > >>> have valid >> > > > > > >>> > > > > > > >>>>>>>>>> concerns. Though, I am not >> completely certain >> > > > > if >> > > > > > >>> we want to do >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change only for materialized >> KTables. >> > > > > I >> > > > > > >>> will put it >> > > > > > >>> > > > > > > >>> down >> > > > > > >>> > > > > > > >>>>>>>>>> in the KIP regardless. I will do my >> best to >> > > > > > >>> address all points >> > > > > > >>> > > > > > > >>>>>>>>>> raised so far on the discussion. >> Hope we could >> > > > > > >>> keep this >> > > > > > >>> > > > > > going! >> > > > > > >>> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24, 2020 >> at 6:07 PM >> > > > > > >>> Bruno Cadonna >> > > > > > >>> > > > > > > >>>>>>>>>> <br...@confluent.io <mailto: >> > > > > br...@confluent.io>> >> > > > > > >>> wrote: Thank >> > > > > > >>> > > > > > > >>> you >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias for the use cases! Looking >> at both >> > > > > use >> > > > > > >>> cases, I think >> > > > > > >>> > > > > > > >>>>>>>>>> you need to elaborate on them in the >> KIP, >> > > > > > >>> Richard. Emit from >> > > > > > >>> > > > > > > >>>>>>>>>> plain KTable: I agree with Matthias >> that the >> > > > > > >>> lower timestamp >> > > > > > >>> > > > > > > >>>>>>>>>> makes sense because it marks the >> start of the >> > > > > > >>> validity of the >> > > > > > >>> > > > > > > >>>>>>>>>> record. Idempotent records with a >> higher >> > > > > > >>> timestamp can be >> > > > > > >>> > > > > > > >>> safely >> > > > > > >>> > > > > > > >>>>>>>>>> ignored. A corner case that I >> discussed with >> > > > > > >>> Matthias offline >> > > > > > >>> > > > > > > >>> is >> > > > > > >>> > > > > > > >>>>>>>>>> when we do not materialize a KTable >> due to >> > > > > > >>> optimization. Then >> > > > > > >>> > > > > > > >>> we >> > > > > > >>> > > > > > > >>>>>>>>>> cannot avoid the idempotent records >> because >> > > > > we do >> > > > > > >>> not keep the >> > > > > > >>> > > > > > > >>>>>>>>>> first record with the lower >> timestamp to >> > > > > compare >> > > > > > >>> to. Emit from >> > > > > > >>> > > > > > > >>>>>>>>>> KTable with aggregations: If we >> specify that >> > > > > an >> > > > > > >>> aggregation >> > > > > > >>> > > > > > > >>>>>>>>>> result should have the highest >> timestamp of >> > > > > the >> > > > > > >>> records that >> > > > > > >>> > > > > > > >>>>>>>>>> participated in the aggregation, we >> cannot >> > > > > ignore >> > > > > > >>> any >> > > > > > >>> > > > > > > >>> idempotent >> > > > > > >>> > > > > > > >>>>>>>>>> records. Admittedly, the result of an >> > > > > aggregation >> > > > > > >>> usually >> > > > > > >>> > > > > > > >>>>>>>>>> changes, but there are aggregations >> where the >> > > > > > >>> result may not >> > > > > > >>> > > > > > > >>>>>>>>>> change like min and max, or sum when >> the >> > > > > incoming >> > > > > > >>> records have >> > > > > > >>> > > > > > > >>> a >> > > > > > >>> > > > > > > >>>>>>>>>> value of zero. In those cases, we >> could >> > > > > benefit >> > > > > > >>> of the emit on >> > > > > > >>> > > > > > > >>>>>>>>>> change, but only if we define the >> semantics >> > > > > of the >> > > > > > >>> > > > > > aggregations >> > > > > > >>> > > > > > > >>>>>>>>>> to not use the highest timestamp of >> the >> > > > > > >>> participating records >> > > > > > >>> > > > > > > >>> for >> > > > > > >>> > > > > > > >>>>>>>>>> the result. In Kafka Streams, we do >> not have >> > > > > min, >> > > > > > >>> max, and sum >> > > > > > >>> > > > > > > >>> as >> > > > > > >>> > > > > > > >>>>>>>>>> explicit aggregations, but we need >> to provide >> > > > > an >> > > > > > >>> API to define >> > > > > > >>> > > > > > > >>>>>>>>>> what timestamp should be used for >> the result >> > > > > of >> > > > > > >>> an aggregation >> > > > > > >>> > > > > > > >>> if >> > > > > > >>> > > > > > > >>>>>>>>>> we want to go down this path. All of >> this does >> > > > > > >>> not block this >> > > > > > >>> > > > > > > >>> KIP >> > > > > > >>> > > > > > > >>>>>>>>>> and I just wanted to put this >> aspects up for >> > > > > > >>> discussion. The >> > > > > > >>> > > > > > > >>> KIP >> > > > > > >>> > > > > > > >>>>>>>>>> can limit itself to emit from >> materialized >> > > > > > >>> KTables. However, >> > > > > > >>> > > > > > > >>> the >> > > > > > >>> > > > > > > >>>>>>>>>> limits should be explicitly stated >> in the KIP. >> > > > > > >>> Best, Bruno >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM >> Matthias J. >> > > > > Sax >> > > > > > >>> > > > > > > >>>>>>>>>> <matth...@confluent.io <mailto: >> > > > > > >>> matth...@confluent.io>> wrote: >> > > > > > >>> > > > > > > >>>>>>>>>> IMHO, the question about semantics >> depends on >> > > > > the >> > > > > > >>> use case, in >> > > > > > >>> > > > > > > >>>>>>>>>> particular on the origin of a >> KTable. If >> > > > > there is >> > > > > > >>> a changlog >> > > > > > >>> > > > > > > >>>>>>>>>> topic that one reads directly into a >> KTable, >> > > > > > >>> emit-on-change >> > > > > > >>> > > > > > > >>> does >> > > > > > >>> > > > > > > >>>>>>>>>> actually make sense, because the >> timestamp >> > > > > > >>> indicates _when_ >> > > > > > >>> > > > > > the >> > > > > > >>> > > > > > > >>>>>>>>>> update was _effective_. For this >> case, it is >> > > > > > >>> semantically >> > > > > > >>> > > > > > sound >> > > > > > >>> > > > > > > >>>>>>>>>> to _not_ update the timestamp in the >> store, >> > > > > > >>> because the second >> > > > > > >>> > > > > > > >>>>>>>>>> update is actually idempotent and >> advancing >> > > > > the >> > > > > > >>> timestamp is >> > > > > > >>> > > > > > > >>> not >> > > > > > >>> > > > > > > >>>>>>>>>> ideal (one could even consider it to >> be wrong >> > > > > to >> > > > > > >>> advance the >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp) because the "valid time" >> of the >> > > > > record >> > > > > > >>> pair did not >> > > > > > >>> > > > > > > >>>>>>>>>> change. This reasoning also applies >> to >> > > > > > >>> KTable-KTable joins. >> > > > > > >>> > > > > > > >>>>>>>>>> However, if the KTable is the result >> of an >> > > > > > >>> aggregation, I >> > > > > > >>> > > > > > think >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-update is more natural, >> because the >> > > > > > >>> timestamp reflects >> > > > > > >>> > > > > > > >>>>>>>>>> the _last_ time (ie, highest >> timestamp) of all >> > > > > > >>> input records >> > > > > > >>> > > > > > > >>> the >> > > > > > >>> > > > > > > >>>>>>>>>> contributed to the result. Hence, >> updating the >> > > > > > >>> timestamp and >> > > > > > >>> > > > > > > >>>>>>>>>> emitting a new record actually >> sounds correct >> > > > > to >> > > > > > >>> me. This >> > > > > > >>> > > > > > > >>> applies >> > > > > > >>> > > > > > > >>>>>>>>>> to windowed and non-windowed >> aggregations >> > > > > IMHO. >> > > > > > >>> However, >> > > > > > >>> > > > > > > >>>>>>>>>> considering the argument that the >> timestamp >> > > > > > >>> should not be >> > > > > > >>> > > > > > > >>> update >> > > > > > >>> > > > > > > >>>>>>>>>> in the first case in the store to >> begin with, >> > > > > > >>> both cases are >> > > > > > >>> > > > > > > >>>>>>>>>> actually the same, and both can be >> modeled as >> > > > > > >>> emit-on-change: >> > > > > > >>> > > > > > > >>> if >> > > > > > >>> > > > > > > >>>>>>>>>> a `table()` operator does not update >> the >> > > > > > >>> timestamp if the >> > > > > > >>> > > > > > value >> > > > > > >>> > > > > > > >>>>>>>>>> does not change, there is _no_ >> change and thus >> > > > > > >>> nothing is >> > > > > > >>> > > > > > > >>>>>>>>>> emitted. At the same time, if an >> aggregation >> > > > > > >>> operator does >> > > > > > >>> > > > > > > >>> update >> > > > > > >>> > > > > > > >>>>>>>>>> the timestamp (even if the value >> does not >> > > > > change) >> > > > > > >>> there _is_ a >> > > > > > >>> > > > > > > >>>>>>>>>> change and we emit. Note that >> handling >> > > > > > >>> out-of-order data for >> > > > > > >>> > > > > > > >>>>>>>>>> aggregations would also work >> seamlessly with >> > > > > this >> > > > > > >>> approach -- >> > > > > > >>> > > > > > > >>> for >> > > > > > >>> > > > > > > >>>>>>>>>> out-of-order records, the timestamp >> does never >> > > > > > >>> change, and >> > > > > > >>> > > > > > > >>> thus, >> > > > > > >>> > > > > > > >>>>>>>>>> we only emit if the result itself >> changes. >> > > > > > >>> Therefore, I would >> > > > > > >>> > > > > > > >>>>>>>>>> argue that we might not even need >> any config, >> > > > > > >>> because the >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change behavior is just >> correct and >> > > > > > >>> reduced the >> > > > > > >>> > > > > > > >>>>>>>>>> downstream load, while our current >> behavior is >> > > > > > >>> not ideal (even >> > > > > > >>> > > > > > > >>> if >> > > > > > >>> > > > > > > >>>>>>>>>> it's also correct). Thoughts? >> -Matthias On >> > > > > > >>> 1/24/20 9:37 AM, >> > > > > > >>> > > > > > > >>> John >> > > > > > >>> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks for >> that >> > > > > idea. I >> > > > > > >>> hadn't >> > > > > > >>> > > > > > > >>>>>>>>>> considered that option before, and >> it does >> > > > > seem >> > > > > > >>> like that >> > > > > > >>> > > > > > would >> > > > > > >>> > > > > > > >>>>>>>>>> be the right place to put it if we >> think it >> > > > > might >> > > > > > >>> be >> > > > > > >>> > > > > > > >>> semantically >> > > > > > >>> > > > > > > >>>>>>>>>> important to control on a >> table-by-table >> > > > > basis. I >> > > > > > >>> had been >> > > > > > >>> > > > > > > >>>>>>>>>> thinking of it less semantically and >> more >> > > > > > >>> practically. In the >> > > > > > >>> > > > > > > >>>>>>>>>> context of a large topology, or more >> > > > > generally, a >> > > > > > >>> large >> > > > > > >>> > > > > > > >>> software >> > > > > > >>> > > > > > > >>>>>>>>>> system that contains many topologies >> and other >> > > > > > >>> event-driven >> > > > > > >>> > > > > > > >>>>>>>>>> systems, each no-op result becomes >> an input >> > > > > that >> > > > > > >>> is destined >> > > > > > >>> > > > > > to >> > > > > > >>> > > > > > > >>>>>>>>>> itself become a no-op result, and so >> on, all >> > > > > the >> > > > > > >>> way through >> > > > > > >>> > > > > > > >>> the >> > > > > > >>> > > > > > > >>>>>>>>>> system. Thus, a single pointless >> processing >> > > > > > >>> result becomes >> > > > > > >>> > > > > > > >>>>>>>>>> amplified into a large number of >> pointless >> > > > > > >>> computations, cache >> > > > > > >>> > > > > > > >>>>>>>>>> perturbations, and network and disk >> I/O >> > > > > > >>> operations. If you >> > > > > > >>> > > > > > also >> > > > > > >>> > > > > > > >>>>>>>>>> consider operations with fan-out >> implications, >> > > > > > >>> like branching >> > > > > > >>> > > > > > > >>> or >> > > > > > >>> > > > > > > >>>>>>>>>> foreign-key joins, the wasted >> resources are >> > > > > > >>> amplified not just >> > > > > > >>> > > > > > > >>> in >> > > > > > >>> > > > > > > >>>>>>>>>> proportion to the size of the >> system, but the >> > > > > > >>> size of the >> > > > > > >>> > > > > > > >>> system >> > > > > > >>> > > > > > > >>>>>>>>>> times the average fan-out (to the >> power of the >> > > > > > >>> number of >> > > > > > >>> > > > > > > >>> fan-out >> > > > > > >>> > > > > > > >>>>>>>>>> operations on the path(s) through the >> > > > > system). In >> > > > > > >>> my time >> > > > > > >>> > > > > > > >>>>>>>>>> operating such systems, I've >> observed these >> > > > > > >>> effects to be very >> > > > > > >>> > > > > > > >>>>>>>>>> real, and actually, the system and >> use case >> > > > > > >>> doesn't have to be >> > > > > > >>> > > > > > > >>>>>>>>>> very large before the amplification >> poses an >> > > > > > >>> existential >> > > > > > >>> > > > > > threat >> > > > > > >>> > > > > > > >>>>>>>>>> to the system as a whole. This is >> the basis >> > > > > of my >> > > > > > >>> advocating >> > > > > > >>> > > > > > > >>> for >> > > > > > >>> > > > > > > >>>>>>>>>> a simple behavior change, rather >> than an >> > > > > opt-in >> > > > > > >>> config of any >> > > > > > >>> > > > > > > >>>>>>>>>> kind. It seems like Streams should >> "do the >> > > > > right >> > > > > > >>> thing" for >> > > > > > >>> > > > > > the >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case. My theory (which >> may be >> > > > > wrong) >> > > > > > >>> is that the >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case is more like >> "relational >> > > > > > >>> queries" than "CEP >> > > > > > >>> > > > > > > >>>>>>>>>> queries". Even if you were doing some >> > > > > > >>> event-sensitive >> > > > > > >>> > > > > > > >>>>>>>>>> computation, wouldn't you do them as >> Stream >> > > > > > >>> operations (where >> > > > > > >>> > > > > > > >>>>>>>>>> this feature is inapplicable >> anyway)? In >> > > > > keeping >> > > > > > >>> with the >> > > > > > >>> > > > > > > >>>>>>>>>> "practical" perspective, I suggested >> the >> > > > > opt-out >> > > > > > >>> config only >> > > > > > >>> > > > > > in >> > > > > > >>> > > > > > > >>>>>>>>>> the (I think unlikely) event that >> filtering >> > > > > out >> > > > > > >>> pointless >> > > > > > >>> > > > > > > >>> updates >> > > > > > >>> > > > > > > >>>>>>>>>> actually harms performance. I'd also >> be >> > > > > perfectly >> > > > > > >>> fine without >> > > > > > >>> > > > > > > >>>>>>>>>> the opt-out config. I really think >> that >> > > > > (because >> > > > > > >>> of the >> > > > > > >>> > > > > > > >>> timestamp >> > > > > > >>> > > > > > > >>>>>>>>>> semantics work already underway), >> we're >> > > > > already >> > > > > > >>> pre-fetching >> > > > > > >>> > > > > > > >>> the >> > > > > > >>> > > > > > > >>>>>>>>>> prior result most of the time, so >> there would >> > > > > > >>> actually be very >> > > > > > >>> > > > > > > >>>>>>>>>> little extra I/O involved in >> implementing >> > > > > > >>> emit-on-change. >> > > > > > >>> > > > > > > >>>>>>>>>> However, we should consider whether >> my >> > > > > experience >> > > > > > >>> is likely to >> > > > > > >>> > > > > > > >>>>>>>>>> be general. Do you have some use >> case in mind >> > > > > for >> > > > > > >>> which you'd >> > > > > > >>> > > > > > > >>>>>>>>>> actually want some KTable results to >> be >> > > > > > >>> emit-on-update for >> > > > > > >>> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02, >> Bruno Cadonna >> > > > > > >>> wrote: Hi >> > > > > > >>> > > > > > > >>> Richard, >> > > > > > >>> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree with >> John that >> > > > > we >> > > > > > >>> should focus >> > > > > > >>> > > > > > > >>> on >> > > > > > >>> > > > > > > >>>>>>>>>> the interface and behavior change in >> a KIP. We >> > > > > > >>> can discuss the >> > > > > > >>> > > > > > > >>>>>>>>>> implementation later. I am also +1 >> for the >> > > > > > >>> survey. I had a >> > > > > > >>> > > > > > > >>>>>>>>>> thought about this. Couldn't we >> consider >> > > > > > >>> emit-on-change to be >> > > > > > >>> > > > > > > >>> one >> > > > > > >>> > > > > > > >>>>>>>>>> config of suppress (like >> `untilWindowCloses`)? >> > > > > > >>> What you >> > > > > > >>> > > > > > > >>>>>>>>>> basically propose is to suppress >> updates if >> > > > > they >> > > > > > >>> do not change >> > > > > > >>> > > > > > > >>>>>>>>>> the result. Considering emit on >> change as a >> > > > > > >>> flavour of >> > > > > > >>> > > > > > suppress >> > > > > > >>> > > > > > > >>>>>>>>>> would be more flexible because it >> would >> > > > > specify >> > > > > > >>> the behavior >> > > > > > >>> > > > > > > >>>>>>>>>> locally for a KTable instead of >> globally for >> > > > > all >> > > > > > >>> KTables. >> > > > > > >>> > > > > > > >>>>>>>>>> Additionally, specifying the >> behavior in one >> > > > > > >>> place instead of >> > > > > > >>> > > > > > > >>>>>>>>>> multiple places feels more intuitive >> and >> > > > > > >>> consistent to me. >> > > > > > >>> > > > > > > >>> Best, >> > > > > > >>> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at 7:49 >> AM John >> > > > > Roesler >> > > > > > >>> > > > > > > >>>>>>>>>> <vvcep...@apache.org <mailto: >> > > > > vvcep...@apache.org>> >> > > > > > >>> wrote: Hi >> > > > > > >>> > > > > > > >>>>>>>>>> Richard, Thanks for picking this up! >> I know >> > > > > of at >> > > > > > >>> least one >> > > > > > >>> > > > > > > >>> large >> > > > > > >>> > > > > > > >>>>>>>>>> community member for which this >> feature is >> > > > > > >>> absolutely >> > > > > > >>> > > > > > > >>> essential. >> > > > > > >>> > > > > > > >>>>>>>>>> If I understand your two options, it >> seems >> > > > > like >> > > > > > >>> the proposal >> > > > > > >>> > > > > > is >> > > > > > >>> > > > > > > >>>>>>>>>> to implement it as a behavior change >> > > > > regardless, >> > > > > > >>> and the >> > > > > > >>> > > > > > > >>> question >> > > > > > >>> > > > > > > >>>>>>>>>> is whether to provide an opt-out >> config or >> > > > > not. >> > > > > > >>> Given that any >> > > > > > >>> > > > > > > >>>>>>>>>> implementation of this feature would >> have some >> > > > > > >>> performance >> > > > > > >>> > > > > > > >>> impact >> > > > > > >>> > > > > > > >>>>>>>>>> under some workloads, and also that >> we don't >> > > > > know >> > > > > > >>> if anyone >> > > > > > >>> > > > > > > >>>>>>>>>> really depends on emit-on-update time >> > > > > semantics, >> > > > > > >>> it seems like >> > > > > > >>> > > > > > > >>> we >> > > > > > >>> > > > > > > >>>>>>>>>> should propose to add an opt-out >> config. Can >> > > > > you >> > > > > > >>> update the >> > > > > > >>> > > > > > KIP >> > > > > > >>> > > > > > > >>>>>>>>>> to mention the exact config key and >> value(s) >> > > > > > >>> you'd propose? >> > > > > > >>> > > > > > > >>> Just >> > > > > > >>> > > > > > > >>>>>>>>>> to move the discussion forward, maybe >> > > > > something >> > > > > > >>> like: emit.on >> > > > > > >>> > > > > > > >>> := >> > > > > > >>> > > > > > > >>>>>>>>>> change|update with the new default >> being >> > > > > "change" >> > > > > > >>> Thanks for >> > > > > > >>> > > > > > > >>>>>>>>>> pointing out the timestamp issue in >> > > > > particular. I >> > > > > > >>> agree that >> > > > > > >>> > > > > > if >> > > > > > >>> > > > > > > >>>>>>>>>> we discard the latter update as a >> no-op, then >> > > > > we >> > > > > > >>> also have to >> > > > > > >>> > > > > > > >>>>>>>>>> discard its timestamp (obviously, we >> don't >> > > > > > >>> forward the >> > > > > > >>> > > > > > > >>> timestamp >> > > > > > >>> > > > > > > >>>>>>>>>> update, as that's the whole point, >> but we also >> > > > > > >>> can't update >> > > > > > >>> > > > > > the >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp in the store, as the store >> must >> > > > > remain >> > > > > > >>> consistent >> > > > > > >>> > > > > > > >>> with >> > > > > > >>> > > > > > > >>>>>>>>>> what has been emitted). I have to >> confess >> > > > > that I >> > > > > > >>> disagree with >> > > > > > >>> > > > > > > >>>>>>>>>> your implementation proposal, but >> it's also >> > > > > not >> > > > > > >>> necessary to >> > > > > > >>> > > > > > > >>>>>>>>>> discuss implementation in the KIP. >> Maybe it >> > > > > would >> > > > > > >>> be less >> > > > > > >>> > > > > > > >>>>>>>>>> controversial if you just drop that >> section >> > > > > for >> > > > > > >>> now, so that >> > > > > > >>> > > > > > > >>> the >> > > > > > >>> > > > > > > >>>>>>>>>> KIP discussion can focus on the >> behavior >> > > > > change >> > > > > > >>> and config. >> > > > > > >>> > > > > > > >>> Just >> > > > > > >>> > > > > > > >>>>>>>>>> for reference, there is some >> research into >> > > > > this >> > > > > > >>> domain. For >> > > > > > >>> > > > > > > >>>>>>>>>> example, see the "Report" section >> (3.2.3) of >> > > > > the >> > > > > > >>> SECRET paper: >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > >> > > > > > >>> >> > > > > >> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop >> > > > > > >>> > > > > > > > le.csail.mit.edu >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&data >> > > > > > >>> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com >> > > > > > >>> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7 >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > > >>> >> > > > > > >>> > > > > > >> > > > > > >>> >> > > > > >> Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&sdata >> > > > > > >>> > > > > > > > >> > > > > > >>> >> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&reserved=0 >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > > It might help to round out the proposal if >> you take a >> > > > > brief >> > > > > > >>> > > > > > > >>> survey of >> > > > > > >>> > > > > > > >>>>>>>>>> the behaviors of other systems, >> along with >> > > > > pros >> > > > > > >>> and cons if >> > > > > > >>> > > > > > any >> > > > > > >>> > > > > > > >>>>>>>>>> are reported. Thanks, -John >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27, >> Richard Yu >> > > > > wrote: >> > > > > > >>> Hi >> > > > > > >>> > > > > > everybody! >> > > > > > >>> > > > > > > >>>>>>>>>> I'd like to propose a change that we >> probably >> > > > > > >>> should've added >> > > > > > >>> > > > > > > >>> for >> > > > > > >>> > > > > > > >>>>>>>>>> a long time now. The key benefit of >> this KIP >> > > > > > >>> would be reduced >> > > > > > >>> > > > > > > >>>>>>>>>> traffic in Kafka Streams since a lot >> of no-op >> > > > > > >>> results would no >> > > > > > >>> > > > > > > >>>>>>>>>> longer be sent downstream. Here is >> the KIP for >> > > > > > >>> reference. >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > >> > > > > > >>> >> > > > > >> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi >> > > > > > >>> > > > > > > > ki.apache.org >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > > >>> >> > > > > > >>> > > > > > >> > > > > > >>> >> > > > > >> %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&data=02%7C01%7CThom >> > > > > > >>> > > > > > > > as.Becker%40tivo.com >> > > > > > >>> > > > > > > >>>>>> >> > > > > %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > > >>> >> > > > > > >>> > > > > > >> > > > > > >>> >> > > > > >> 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&sdata=zYpCSFOsyN4%2B >> > > > > > >>> > > > > > > > >> 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&reserved=0 >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > > Currently, I seek to formalize our approach >> for this >> > > > > KIP >> > > > > > >>> first >> > > > > > >>> > > > > > > >>> before >> > > > > > >>> > > > > > > >>>>>>>>>> we determine concrete API additions / >> > > > > > >>> configurations. Some >> > > > > > >>> > > > > > > >>>>>>>>>> configs might warrant adding, whiles >> others >> > > > > are >> > > > > > >>> not necessary >> > > > > > >>> > > > > > > >>>>>>>>>> since adding them would only increase >> > > > > complexity >> > > > > > >>> of Kafka >> > > > > > >>> > > > > > > >>>>>>>>>> Streams. Cheers, Richard >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________ >> This email >> > > > > and >> > > > > > >>> any >> > > > > > >>> > > > > > attachments >> > > > > > >>> > > > > > > >>>>>>>>>> may contain confidential and >> privileged >> > > > > material >> > > > > > >>> for the sole >> > > > > > >>> > > > > > > >>> use >> > > > > > >>> > > > > > > >>>>>>>>>> of the intended recipient. Any >> review, >> > > > > copying, or >> > > > > > >>> > > > > > distribution >> > > > > > >>> > > > > > > >>>>>>>>>> of this email (or any attachments) >> by others >> > > > > is >> > > > > > >>> prohibited. If >> > > > > > >>> > > > > > > >>>>>>>>>> you are not the intended recipient, >> please >> > > > > > >>> contact the sender >> > > > > > >>> > > > > > > >>>>>>>>>> immediately and permanently delete >> this email >> > > > > and >> > > > > > >>> any >> > > > > > >>> > > > > > > >>>>>>>>>> attachments. No employee or agent of >> TiVo is >> > > > > > >>> authorized to >> > > > > > >>> > > > > > > >>>>>>>>>> conclude any binding agreement on >> behalf of >> > > > > TiVo >> > > > > > >>> by email. >> > > > > > >>> > > > > > > >>>>>>>>>> Binding agreements with TiVo may >> only be made >> > > > > by >> > > > > > >>> a signed >> > > > > > >>> > > > > > > >>> written >> > > > > > >>> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker* >> *Principal >> > > > > Engineer * >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> *Personalized Content Discovery* >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* < >> > > > > > >>> http://www.tivo.com/> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>>> This email and any attachments may >> contain >> > > > > > >>> confidential and >> > > > > > >>> > > > > > > >>>>>>>>>> privileged material for the sole use >> of the >> > > > > > >>> intended >> > > > > > >>> > > > > > recipient. >> > > > > > >>> > > > > > > >>>>>>>>>> Any review, copying, or distribution >> of this >> > > > > > >>> email (or any >> > > > > > >>> > > > > > > >>>>>>>>>> attachments) by others is >> prohibited. If you >> > > > > are >> > > > > > >>> not the >> > > > > > >>> > > > > > > >>> intended >> > > > > > >>> > > > > > > >>>>>>>>>> recipient, please contact the sender >> > > > > immediately >> > > > > > >>> and >> > > > > > >>> > > > > > > >>> permanently >> > > > > > >>> > > > > > > >>>>>>>>>> delete this email and any >> attachments. No >> > > > > > >>> employee or agent of >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo is authorized to conclude any >> binding >> > > > > > >>> agreement on behalf >> > > > > > >>> > > > > > > >>> of >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo by email. Binding agreements >> with TiVo >> > > > > may >> > > > > > >>> only be made >> > > > > > >>> > > > > > > >>> by a >> > > > > > >>> > > > > > > >>>>>>>>>> signed written agreement. >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> -- >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> *Tommy Becker* /Principal Engineer / >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> /Personalized Content Discovery/ >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* < >> > > > > > >>> http://www.tivo.com/> >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>>>>> >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > >> > > > > > >>> >> > > > > >> ---------------------------------------------------------------------- >> > > > > > >>> > > > > > > >>>>>>> >> > > > > > >>> > > > > > > >>>>>> >> > > > > > >>> > > > > > > >>>>> >> > > > > > >>> > > > > > > >>>> >> > > > > > >>> > > > > > > >>> >> > > > > > >>> > > > > > > >> >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > Attachments: >> > > > > > >>> > > > > > > * signature.asc >> > > > > > >>> > > > > > >> > > > > > >>> > > > >> > > > > > >>> > >> > > > > > >>> >> > > > > > >> >> > > > > > >> > > > > >> > > >> > > >> >