Hi all, @John Will add some notes accordingly.
To all: Thanks for all your input! It looks like we can wrap up this discussion thread then. I've started a vote thread, so please feel free to cast your vote there! We should be pretty close. :) Cheers, Richard On Thu, Feb 27, 2020 at 2:34 PM John Roesler <vvcep...@apache.org> wrote: > Hi Richard, > > Thanks for the update! > > I read it over, and overall it looks good! > > I have only a minor concern about the rate metric definition: > > The rate option indicates the ratio of records dropped to actual volume > of records passing through the task > That's not the definition of a "rate". It should be something more like > "the average number of dropped idempotent updates per second". > > Incidentally, I mentioned this KIP to Guozhang, and he brought up an > interesting concern I'd like to share with the list. He noted that if we > filter > out idempotent table updates, stream time will not advance with every > input event anymore. He asked if this would have a negative impact on > operations that depend on stream time. > > I think this is a valid concern. For example, you can use Suppress to > buffer KTable updates until a certain amount of stream time passes. > Specifically, > the Suppress processor itself advances stream time as it receives new > records > to its `process` method. In the pathological case, all updates are > idempotent, > get dropped, and the Suppress operator never emits anything, even though to > an outside observer, stream time should have advanced. > > Example scenario: > > inputTable = builder.table(input) > > suppressed = inputTable.suppress(untilTimeLimit(10)) > > > > input: key=A, timestamp=0, value=X > > inputTable: key=A, timestamp=0, value=X > > suppressed buffer: key=A, timestamp=0, value=X (observed stream time = 0) > > output: (nothing) > > > > input: key=A, timestamp=11, value=X > > // update is idempotent, so it gets dropped > > inputTable: key=A, timestamp=0, value=X > > suppressed buffer: key=A, timestamp=0, value=X (observed stream time = 0) > > output: (nothing) > > Note that even though stream time has technically advanced beyond the > suppression config of 10, the suppression buffer doesn't see it because the > KTable source already dropped the idempotent update. > > I'm thinking that this situation is indeed concerning, and we should be > aware > and make a note of it. However, I don't think that it should change our > proposal. > To understand why, I'll enumerate all the usages of stream time in Kafka > Streams: > > windowed operations > ----------------------------- > KStreamSessionWindowAggregate & KStreamWindowAggregate: > - used to determine if an incoming record belongs to a window that is > already closed > Suppressed.untilWindowCloses(): > - used to flush out results for windows, once they are closed > AbstractRocksDBSegmentedBytesStore & InMemorySessionStore & > InMemoryWindowStore: > - used to create new segments and drop old ones that are out of retention > > non-windowed operations > ----------------------------------- > Suppressed.untilTimeLimit > - used to flush out prior updates that have passed the configured age, > in stream time > > Note that most of the usages are in windowed operations. One interesting > thing > about this context is that records with higher timestamps belong to > different windows. > In order to advance stream time far enough to close a window or push it > out of retention, the new records must have timestamps that in later > windows, which means > that they are updates to _new_ keys, which means they would not be > suppressed as > idempotent. > > Updates within a window could still be suppressed, though: > For example, if the window size is 10, and the grace period is 5, and we > get updates all > for the same key with timestamps 0, 11, and 16, today, we would emit the > record for > the [0,10) window as soon as we got the update at time 16 (since stream > time is now > past the window end + grace period time of 15). But if we drop the time=16 > update, we > wouldn't emit the [0,10) window results until we get a record with time >= > 20. > > You can see that the maximum amount of (stream) time that dropping > idempotent > updates could delay updates is one window. This might be surprising, but > it doesn't > seem too bad, especially since Suppress explicitly does _not_ promise to > emit the > results at the earliest possible time, just at some time after the window > closes. > > Even better, though, all the windowed aggregations are _stream_ > aggregations > that _produce_ a KTable, and we already decided that (at least for now), > we would > include the timestamp in the idempotence check for stream aggregation > results. > With this strategy, we would actually not suppress _any_ update to a > stream aggregation (windowed or otherwise) that advances stream time. > > So there's no problem at all with windowed operations. > > This only leaves non-windowed operations, of which there's only one. I > have to admit > that I regret Suppressed.untilTimeLimit. It turns out that everyone I've > heard of who > used this API actually wanted it to be wall-clock based, not stream-time > based. So, > I'm not sure in practice if this sharp edge will actually cut anyone. > > I think a "proper" fix would be to introduce some kind of control message > to advance > stream time independently of records. We've already talked about this a > little in > KAFKA-8769, and it would also be necessary for global consistency markers. > Basically, > once a record is dropped as an idempotent update, we could still forward a > time > marker through the topology, which could trigger suppressions, as well as > window > boundaries, but wouldn't result in any computations. > > But practically speaking, I'm really not confident that there's anyone > really using the > untilTimeLimit suppression, and even if they are, if they would really see > consecutive > idempotent updates for long enough to really have an observable impact on > what gets emitted from the suppression buffer. In fact, if some > hypothetical person did find > themselves on the wrong end of my assumption, they could _remove_ the > suppression, and rely instead on idempotence-dropping to get the same > level of > traffic reduction that they enjoy from the suppression. > > Anyway, long story short, I'm advocating to continue with the current > proposal > and just make a note of the situations I've outlined above. > > Thanks for your time, > -John > > > On Thu, Feb 27, 2020, at 14:33, Richard Yu wrote: > > Hi all, > > > > I might've made a minor mistake. The processor node level is level 3, not > > level 1. > > I will correct the KIP accordingly. > > > > After looking over things, I decided to start the voting thread this > > afternoon. > > > > Cheers, > > Richard > > > > On Thu, Feb 27, 2020 at 12:29 PM Richard Yu <yohan.richard...@gmail.com> > > wrote: > > > > > Hi Bruno, Hi John, > > > > > > Thanks for your comments! I updated the KIP accordingly, and it looks > like > > > for quite a few points. I was doing some beating around the bush which > > > could've been avoided. > > > > > > Looks like we can reduce the metric to Level 1 (per processor node) > then. > > > > > > I've cleaned up most of the unnecessary info, and we should be fairly > > > close. > > > I will start working on a PR soon for this KIP. (although we might > split > > > that up into stages) > > > > > > Cheers, > > > Richard > > > > > > On Thu, Feb 27, 2020 at 6:06 AM Bruno Cadonna <br...@confluent.io> > wrote: > > > > > >> Hi John, > > >> > > >> I agree with you. It is better to measure the metric on processor node > > >> level. The users can do the rollup to task-level by themselves. > > >> > > >> Best, > > >> Bruno > > >> > > >> On Thu, Feb 27, 2020 at 12:09 AM John Roesler <vvcep...@apache.org> > > >> wrote: > > >> > > > >> > Hi Richard, > > >> > > > >> > I've been making a final pass over the KIP. > > >> > > > >> > Re: Proposed Behavior Change: > > >> > > > >> > I think this point is controversial and probably doesn't need to be > > >> there at all: > > >> > > 2.b. In certain situations where there is a high volume of > idempotent > > >> > > updates throughout the Streams DAG, it will be recommended > practice > > >> > > to materialize all operations to reduce traffic overall across the > > >> entire > > >> > > network of nodes. > > >> > > > >> > Re-reading all the points, it seems like we can sum them up in a way > > >> that's > > >> > a little more straight to the point, and gives us the right amount > of > > >> flexibility: > > >> > > > >> > > Proposed Behavior Changes > > >> > > > > >> > > Definition: "idempotent update" is one in which the new result and > > >> prior > > >> > > result, when serialized, are identical byte arrays > > >> > > > > >> > > Note: an "update" is a concept that only applies to Table > operations, > > >> so > > >> > > the concept of an "idempotent update" also only applies to Table > > >> operations. > > >> > > See > > >> > https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable > > >> > > for more information. > > >> > > > > >> > > Given that definition, we propose for Streams to drop idempotent > > >> updates > > >> > > in any situation where it's possible and convenient to do so. For > > >> example, > > >> > > any time we already have both the prior and new results > serialized, we > > >> > > may compare them, and drop the update if it is idempotent. > > >> > > > > >> > > Note that under this proposal, we can implement idempotence > checking > > >> > > in the following situations: > > >> > > 1. Any aggregation (for example, KGroupedStream, KGroupedTable, > > >> > > TimeWindowedKStream, and SessionWindowedKStream operations) > > >> > > 2. Any Materialized KTable operation > > >> > > 3. Repartition operations, when we need to send both prior and new > > >> results > > >> > > > >> > Notice that in my proposed wording, we neither limit ourselves to > just > > >> the > > >> > situations enumerated, nor promise to implement the optimization in > > >> every > > >> > possible situation. IMHO, this is the best way to propose such a > > >> feature. > > >> > That way, we have the flexibility to implement it in stages, and > also > > >> to add > > >> > on to the implementation in the future. > > >> > > > >> > > > >> > Re: Metrics > > >> > > > >> > I agree with Bruno, although, I think it might just be a confusing > > >> statement. > > >> > It might be clearer to drop all the "discussion", and just say: "We > > >> will add a > > >> > metric to count the number of idempotent updates that we have > dropped". > > >> > > > >> > Also, with respect to the metric, I'm wondering if the metric > should be > > >> task- > > >> > level or processor-node-level. Since the interesting action takes > place > > >> inside > > >> > individual processor nodes, I _think_ it would be higher leverage to > > >> just > > >> > measure it at the node level. WDYT? > > >> > > > >> > Re: Design Reasoning > > >> > > > >> > This section seems to be a little bit outdated. I also just noticed > a > > >> "surprise" > > >> > configuration "timestamp.aggregation.selection.policy" hidden in > point > > >> 1.a. > > >> > Is that part of the proposal? We haven't discussed it, and I think > we > > >> were > > >> > talking about this KIP being "configuration free". > > >> > > > >> > There is also some discussion of discarded alternative in the Design > > >> Reasoning > > >> > section, which is confusing. Finally, there was a point there I > didn't > > >> understand > > >> > at all, about stateless operators not being intended to load prior > > >> results. > > >> > This statement doesn't seem to be true, but it also doesn't seem to > be > > >> relevant, > > >> > so maybe we can just drop it. > > >> > > > >> > Overall, it might help if you make a pass on this section, and just > > >> discuss as > > >> > briefly as possible the justification for the proposed behavior > change, > > >> and > > >> > not adding a configuration. Try to avoid talking about things that > we > > >> are not > > >> > proposing, since that will just lead to confusion. > > >> > > > >> > Similarly, I'd just completely remove the "Implementation > [discarded]" > > >> section. > > >> > It was good to have this as part of the discussion initially, but > as we > > >> move > > >> > toward a vote, it's better to just streamline the KIP document as > much > > >> as > > >> > possible. Keeping a "discarded" section in the document will just > make > > >> it > > >> > harder for new people to understand the proposal. We did the same > thing > > >> > with KIP-441, where there were two prior drafts included at the end > of > > >> the > > >> > document, and we just deleted them for clarity. > > >> > > > >> > I liked the "Compatibility" and "Rejected Alternatives" section. > Very > > >> clear > > >> > and to the point. > > >> > > > >> > Thanks again for the contribution! I think once the KIP document is > > >> cleaned > > >> > up, we'll be in good shape to finalize the discussion. > > >> > -John > > >> > > > >> > > > >> > On Wed, Feb 26, 2020, at 07:27, Bruno Cadonna wrote: > > >> > > Hi Richard, > > >> > > > > >> > > 1. Could you change "idempotent update operations will only be > dropped > > >> > > from KTables, not from other classes." -> idempotent update > operations > > >> > > will only be dropped from materialized KTables? For > non-materialized > > >> > > KTables -- as they can occur after optimization of the topology > -- we > > >> > > cannot drop idempotent updates. > > >> > > > > >> > > 2. I cannot completely follow the metrics section. Do you want to > > >> > > record all idempotent updates or only the dropped ones? In > particular, > > >> > > I do not understand the following sentences: > > >> > > "For that matter, even if we don't drop idempotent updates, we > should > > >> > > at the very least record the number of idempotent updates that has > > >> > > been seen go through a particular processor." > > >> > > "Therefore, we should add some metrics which will count the > number of > > >> > > idempotent updates that each node has seen." > > >> > > I do not see how we can record idempotent updates that we do not > drop. > > >> > > If we see them, we should drop them. If we do not see them, we > cannot > > >> > > drop them and we cannot record them. > > >> > > > > >> > > Best, > > >> > > Bruno > > >> > > > > >> > > On Wed, Feb 26, 2020 at 4:57 AM Richard Yu < > > >> yohan.richard...@gmail.com> wrote: > > >> > > > > > >> > > > Hi John, > > >> > > > > > >> > > > Sounds goods. It looks like we are close to wrapping things up. > If > > >> there > > >> > > > isn't any other revisions which needs to be made. (If so, please > > >> comment in > > >> > > > the thread) > > >> > > > I will start the voting process this Thursday (Pacific Standard > > >> Time). > > >> > > > > > >> > > > Cheers, > > >> > > > Richard > > >> > > > > > >> > > > On Tue, Feb 25, 2020 at 11:59 AM John Roesler < > vvcep...@apache.org> > > >> wrote: > > >> > > > > > >> > > > > Hi Richard, > > >> > > > > > > >> > > > > Sorry for the slow reply. I actually think we should avoid > > >> checking > > >> > > > > equals() for now. Your reasoning is good, but the truth is > that > > >> > > > > depending on the implementation of equals() is non-trivial, > > >> > > > > semantically, and (though I proposed it before), I'm not > convinced > > >> > > > > it's worth the risk. Much better to start with exactly one > kind of > > >> > > > > "idempotence detection". > > >> > > > > > > >> > > > > Even if someone does update their serdes, we know that the new > > >> > > > > serde would still be able to _de_serialize the old format, or > the > > >> whole > > >> > > > > app would break. The situation is that the new result gets > encoded > > >> > > > > in the new binary format, which means we don't detect an > > >> idempotent > > >> > > > > update for what it is. In this case, we'd write the new binary > > >> format to > > >> > > > > disk and the changelog, and forward it downstream. However, we > > >> only > > >> > > > > do this once. Now that the binary format for that record has > been > > >> updated, > > >> > > > > we would correctly detect idempotence of any subsequent > updates. > > >> > > > > > > >> > > > > Plus, we would still be able to filter out idempotent updates > in > > >> > > > > repartition > > >> > > > > sinks, since for those, we use the new serde to serialize both > > >> the "old" > > >> > > > > and > > >> > > > > "new" result. > > >> > > > > > > >> > > > > It's certainly a good observation, but I think we can just > make a > > >> note of > > >> > > > > it > > >> > > > > in "rejected alternatives" for now, and plan to refine it > later, > > >> if it does > > >> > > > > pose a big performance problem. > > >> > > > > > > >> > > > > Thanks! > > >> > > > > -John > > >> > > > > > > >> > > > > On Sat, Feb 22, 2020, at 18:14, Richard Yu wrote: > > >> > > > > > Hi all, > > >> > > > > > > > >> > > > > > Updated the KIP. > > >> > > > > > > > >> > > > > > Just a question: do you think it would be a good idea if we > > >> check for > > >> > > > > both > > >> > > > > > Object#equals() and binary equality? > > >> > > > > > Because there might be some subtle changes in the > serialization > > >> (for > > >> > > > > > example, if the user decides to upgrade their serialization > > >> procedure to > > >> > > > > a > > >> > > > > > new one), but the underlying values of the result might be > the > > >> same. > > >> > > > > > (therefore equals() might return true) > > >> > > > > > > > >> > > > > > Do you think this would be plausible? > > >> > > > > > > > >> > > > > > Cheers, > > >> > > > > > Richard > > >> > > > > > > > >> > > > > > On Fri, Feb 21, 2020 at 2:37 PM Richard Yu < > > >> yohan.richard...@gmail.com> > > >> > > > > > wrote: > > >> > > > > > > > >> > > > > > > Hello, > > >> > > > > > > > > >> > > > > > > Just to make some updates. I changed the name of the > metric > > >> so that it > > >> > > > > was > > >> > > > > > > more in line with usual Kafka naming conventions for > metrics > > >> / sensors. > > >> > > > > > > Below is the updated description of the metric: > > >> > > > > > > > > >> > > > > > > dropped-idempotent-updates : (Level 2 - Per Task) DEBUG > (rate > > >> | total) > > >> > > > > > > > > >> > > > > > > Description: This metric will record the number of updates > > >> that have > > >> > > > > been > > >> > > > > > > dropped since they are essentially re-performing an > earlier > > >> operation. > > >> > > > > > > > > >> > > > > > > Note: > > >> > > > > > > > > >> > > > > > > - The rate option indicates the ratio of records > dropped > > >> to actual > > >> > > > > > > volume of records passing through the task. > > >> > > > > > > - The total option will just give a raw count of the > > >> number of > > >> > > > > records > > >> > > > > > > dropped. > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > I hope that this is more on point. > > >> > > > > > > > > >> > > > > > > Best, > > >> > > > > > > Richard > > >> > > > > > > > > >> > > > > > > On Fri, Feb 21, 2020 at 2:20 PM Richard Yu < > > >> yohan.richard...@gmail.com > > >> > > > > > > > >> > > > > > > wrote: > > >> > > > > > > > > >> > > > > > >> Hi all, > > >> > > > > > >> > > >> > > > > > >> Thanks for the clarification. I was just confused a > little > > >> on what was > > >> > > > > > >> going on. > > >> > > > > > >> > > >> > > > > > >> So I guess then that for the actual proposal. We got the > > >> following: > > >> > > > > > >> > > >> > > > > > >> 1. We check for binary equality, and perform no extra > look > > >> ups. > > >> > > > > > >> 2. Emphasize that this applies only to materialized > tables. > > >> > > > > > >> 3. We drop aggregation updates if key, value and > timestamp > > >> is the > > >> > > > > same. > > >> > > > > > >> > > >> > > > > > >> Then that settles the behavior changes. So it looks like > the > > >> Metric > > >> > > > > that > > >> > > > > > >> is the only thing that is left. In this case, I think the > > >> metric > > >> > > > > would be > > >> > > > > > >> named the following: IdempotentUpdateMetric. This is > mostly > > >> off the > > >> > > > > top of > > >> > > > > > >> my head. So if you think that we change it, feel free to > say > > >> so. > > >> > > > > > >> The metric will report the number of dropped operations > > >> inherently. > > >> > > > > > >> > > >> > > > > > >> It will probably be added as a Sensor, similar to the > > >> dropped records > > >> > > > > > >> sensor we already have. > > >> > > > > > >> > > >> > > > > > >> If there isn't anything else, I will probably start the > > >> voting process > > >> > > > > > >> next week! > > >> > > > > > >> > > >> > > > > > >> Cheers, > > >> > > > > > >> Richard > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> On Fri, Feb 21, 2020 at 11:23 AM John Roesler < > > >> vvcep...@apache.org> > > >> > > > > > >> wrote: > > >> > > > > > >> > > >> > > > > > >>> Hi Bruno, > > >> > > > > > >>> > > >> > > > > > >>> Thanks for the clarification. Indeed, I was thinking two > > >> things: > > >> > > > > > >>> 1. For the initial implementation, we can just avoid > adding > > >> any extra > > >> > > > > > >>> lookups, but only do the comparison when we already > happen > > >> to have > > >> > > > > > >>> the prior value. > > >> > > > > > >>> 2. I think, as a result of the timestamp semantics, we > > >> actually _do_ > > >> > > > > look > > >> > > > > > >>> up the prior value approximately all the time, so the > > >> idempotence > > >> > > > > check > > >> > > > > > >>> should be quite effective. > > >> > > > > > >>> > > >> > > > > > >>> I think that second point is the same thing you're > > >> referring to > > >> > > > > > >>> potentially > > >> > > > > > >>> being unnecessary. It does mean that we do fetch the > whole > > >> value in a > > >> > > > > > >>> lot of cases where we really only need the timestamp, > so it > > >> could > > >> > > > > > >>> certainly > > >> > > > > > >>> be optimized in the future. In that future, we would > need > > >> to weigh > > >> > > > > that > > >> > > > > > >>> optimization against losing the idempotence check. But, > > >> that's a > > >> > > > > problem > > >> > > > > > >>> for tomorrow :) > > >> > > > > > >>> > > >> > > > > > >>> I'm 100% on board with scrutinizing the performance as > we > > >> implement > > >> > > > > > >>> this feature. > > >> > > > > > >>> > > >> > > > > > >>> Thanks again, > > >> > > > > > >>> -John > > >> > > > > > >>> > > >> > > > > > >>> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote: > > >> > > > > > >>> > Hi John, > > >> > > > > > >>> > > > >> > > > > > >>> > I am glad to help you with your imagination. With > > >> overhead, I > > >> > > > > mainly > > >> > > > > > >>> > meant the additional lookup into the state store to > get > > >> the current > > >> > > > > > >>> > value, but I see now in the code that we do that > lookup > > >> anyways > > >> > > > > > >>> > (although I think we could avoid that in the cases > where > > >> we do not > > >> > > > > > >>> > need the old value). With or without config, we need > to > > >> evaluate > > >> > > > > the > > >> > > > > > >>> > performance benefits of this change, in any case. > > >> > > > > > >>> > > > >> > > > > > >>> > Best, > > >> > > > > > >>> > Bruno > > >> > > > > > >>> > > > >> > > > > > >>> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler < > > >> vvcep...@apache.org> > > >> > > > > > >>> wrote: > > >> > > > > > >>> > > > > >> > > > > > >>> > > Thanks for your remarks, Bruno! > > >> > > > > > >>> > > > > >> > > > > > >>> > > I'm in favor of standardizing on terminology like > "not > > >> forwarding > > >> > > > > > >>> > > idempotent updates" or "dropping idempotent > updates". > > >> Maybe we > > >> > > > > > >>> > > should make a pass on the KIP and just convert > > >> everything to this > > >> > > > > > >>> > > phrasing. In retrospect, even the term > "emit-on-change" > > >> has too > > >> > > > > much > > >> > > > > > >>> > > semantic baggage, since it implies the semantics > from > > >> the SECRET > > >> > > > > > >>> > > paper, which we don't really want to imply here. > > >> > > > > > >>> > > > > >> > > > > > >>> > > I'm also in favor of the metric as you propose. > > >> > > > > > >>> > > > > >> > > > > > >>> > > Likewise with stream aggregations, I was also under > the > > >> > > > > impression > > >> > > > > > >>> > > that we agreed on dropping idempotent updates to the > > >> aggregation > > >> > > > > > >>> > > result, any time we find that our "new" (key, value, > > >> timestamp) > > >> > > > > > >>> result > > >> > > > > > >>> > > is identical to the prior one. > > >> > > > > > >>> > > > > >> > > > > > >>> > > Also, I'm +1 on all your recommendations for > updating > > >> the KIP > > >> > > > > > >>> document > > >> > > > > > >>> > > for clarity. > > >> > > > > > >>> > > > > >> > > > > > >>> > > Regarding the opt-out config. Perhaps I'm suffering > > >> from a > > >> > > > > failure of > > >> > > > > > >>> > > imagination, but I don't see how the current > proposal > > >> could > > >> > > > > really > > >> > > > > > >>> have > > >> > > > > > >>> > > a measurable impact on latency. If all we do is > make a > > >> single > > >> > > > > extra > > >> > > > > > >>> pass > > >> > > > > > >>> > > to compare two byte arrays for equality, only in the > > >> cases where > > >> > > > > we > > >> > > > > > >>> already > > >> > > > > > >>> > > have the byte arrays available, it seems unlikely to > > >> measurably > > >> > > > > > >>> affect the > > >> > > > > > >>> > > processing of non-idempotent updates. It seems > > >> guaranteed to > > >> > > > > > >>> _decrease_ > > >> > > > > > >>> > > the latency of processing idempotent updates, since > we > > >> get to > > >> > > > > skip a > > >> > > > > > >>> > > store#put, at least one producer#send, and also all > > >> downstream > > >> > > > > > >>> processing, > > >> > > > > > >>> > > including all the disk and network operations > > >> associated with > > >> > > > > > >>> downstream > > >> > > > > > >>> > > operations. > > >> > > > > > >>> > > > > >> > > > > > >>> > > It seems like if we're pretty sure this change would > > >> only > > >> > > > > _help_, we > > >> > > > > > >>> shouldn't > > >> > > > > > >>> > > introduce the operational burden of an extra > > >> configuration. If we > > >> > > > > > >>> want to > > >> > > > > > >>> > > be more aggressive about dropping idempotent > operations > > >> in the > > >> > > > > > >>> future, > > >> > > > > > >>> > > such as depending on equals() or adding a > ChangeDetector > > >> > > > > interface, > > >> > > > > > >>> then > > >> > > > > > >>> > > we should consider adding a configuration as part of > > >> that future > > >> > > > > > >>> work. In > > >> > > > > > >>> > > fact, if we add a simple "opt-in/opt-out" switch > right > > >> now, we > > >> > > > > might > > >> > > > > > >>> find > > >> > > > > > >>> > > that it's actually insufficient for whatever future > > >> feature we > > >> > > > > might > > >> > > > > > >>> propose, > > >> > > > > > >>> > > then we have a mess of deprecating the opt-out > config > > >> and > > >> > > > > replacing > > >> > > > > > >>> it. > > >> > > > > > >>> > > > > >> > > > > > >>> > > What do you think? > > >> > > > > > >>> > > -John > > >> > > > > > >>> > > > > >> > > > > > >>> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna wrote: > > >> > > > > > >>> > > > Hi all, > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > Sorry for the late reply! > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > I am also in favour of baby steps. > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > I am undecided whether the KIP should contain a > > >> opt-out config > > >> > > > > or > > >> > > > > > >>> not. > > >> > > > > > >>> > > > The overhead of emit-on-change might affect > latency. > > >> For > > >> > > > > > >>> applications > > >> > > > > > >>> > > > where low latency is crucial and there are not too > > >> many > > >> > > > > idempotent > > >> > > > > > >>> > > > updates, it would be better to fall back to > > >> emit-on-update. > > >> > > > > > >>> However, > > >> > > > > > >>> > > > we do not know how much emit-on-change impacts > > >> latency. We > > >> > > > > would > > >> > > > > > >>> first > > >> > > > > > >>> > > > need to benchmark that before we can decide about > the > > >> > > > > > >>> opt-out-config. > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > A metric of dropped idempotent updates seems > useful > > >> to me to be > > >> > > > > > >>> > > > informed about potential upstream applications or > > >> upstream > > >> > > > > > >>> operators > > >> > > > > > >>> > > > that produce too many idempotent updates. The KIP > > >> should state > > >> > > > > the > > >> > > > > > >>> > > > name of the metric, its group, its tags, and its > > >> recording > > >> > > > > level > > >> > > > > > >>> (see > > >> > > > > > >>> > > > KIP-444 or KIP-471 for examples). I propose DEBUG > as > > >> reporting > > >> > > > > > >>> level. > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > Richard, what competing proposals for > emit-on-change > > >> for > > >> > > > > > >>> aggregations > > >> > > > > > >>> > > > do you mean? I have the feeling that we agreed to > get > > >> rid of > > >> > > > > > >>> > > > idempotent updates if the aggregate is updated > with > > >> the same > > >> > > > > key, > > >> > > > > > >>> > > > value, AND timestamp. I am also fine if we do not > > >> include this > > >> > > > > into > > >> > > > > > >>> > > > this KIP (remember: baby steps). > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > You write that "emit-on-change is more correct". > > >> Since we > > >> > > > > agreed > > >> > > > > > >>> that > > >> > > > > > >>> > > > this is an optimization, IMO you cannot argue this > > >> way. > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > Please put "Alternative Approaches" under > "Rejected > > >> > > > > Alternatives", > > >> > > > > > >>> so > > >> > > > > > >>> > > > that it becomes clear that we are not going to > > >> implement them. > > >> > > > > In > > >> > > > > > >>> > > > general, I think the KIP needs a bit of clean-up > > >> (probably, you > > >> > > > > > >>> > > > already planned for it). "Design Reasoning" is a > bit > > >> of > > >> > > > > behavior > > >> > > > > > >>> > > > changes, rejected alternatives and duplicates a > bit > > >> the > > >> > > > > content in > > >> > > > > > >>> > > > those sections. > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > I do not like the name "no-op operations" or > > >> "no-ops", because > > >> > > > > they > > >> > > > > > >>> > > > are rather generic. I like more "idempotent > updates". > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > Best, > > >> > > > > > >>> > > > Bruno > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu < > > >> > > > > > >>> yohan.richard...@gmail.com> wrote: > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > Hi all, > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > We are definitely making progress! > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > @John should I emphasize in the proposed > behavior > > >> changes > > >> > > > > that > > >> > > > > > >>> we are only > > >> > > > > > >>> > > > > doing binary equality checks for stateful > operators? > > >> > > > > > >>> > > > > It looks like we have come close to finalizing > this > > >> part of > > >> > > > > the > > >> > > > > > >>> KIP. (I > > >> > > > > > >>> > > > > will note in the KIP that this proposal is > intended > > >> for > > >> > > > > > >>> optimization, not > > >> > > > > > >>> > > > > semantics correctness) > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > I do think maybe we still have one other detail > we > > >> need to > > >> > > > > > >>> discuss. So far, > > >> > > > > > >>> > > > > there has been quite a bit of back and forth > about > > >> what the > > >> > > > > > >>> behavior of > > >> > > > > > >>> > > > > aggregations should look like in emit on > change. I > > >> have seen > > >> > > > > > >>> > > > > multiple competing proposals, so I am not > > >> completely certain > > >> > > > > > >>> which one we > > >> > > > > > >>> > > > > should go with, or how we will be able to > > >> compromise in > > >> > > > > between > > >> > > > > > >>> them. > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > Let me know what your thoughts are on this > matter, > > >> since we > > >> > > > > are > > >> > > > > > >>> probably > > >> > > > > > >>> > > > > close to wrapping up most other stuff. > > >> > > > > > >>> > > > > @Matthias J. Sax <matth...@confluent.io> and > > >> @Bruno, see > > >> > > > > what > > >> > > > > > >>> you think > > >> > > > > > >>> > > > > about this. > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > Best, > > >> > > > > > >>> > > > > Richard > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler < > > >> > > > > > >>> vvcep...@apache.org> wrote: > > >> > > > > > >>> > > > > > > >> > > > > > >>> > > > > > Thanks, Matthias! > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > Regarding numbers, it would be hard to know > how > > >> many > > >> > > > > > >>> applications > > >> > > > > > >>> > > > > > would benefit, since we don't know how many > > >> applications > > >> > > > > there > > >> > > > > > >>> are, > > >> > > > > > >>> > > > > > or anything about their data sets or > topologies. > > >> We could > > >> > > > > do a > > >> > > > > > >>> survey, > > >> > > > > > >>> > > > > > but it seems overkill if we take the > conservative > > >> approach. > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > I have my own practical stream processing > > >> experience that > > >> > > > > > >>> tells me this > > >> > > > > > >>> > > > > > is absolutely critical for any > moderate-to-large > > >> relational > > >> > > > > > >>> stream > > >> > > > > > >>> > > > > > processing use cases. I'll leave it to you to > > >> decide if you > > >> > > > > > >>> find that > > >> > > > > > >>> > > > > > convincing, but it's definitely not an > > >> _assumption_. I've > > >> > > > > also > > >> > > > > > >>> heard from > > >> > > > > > >>> > > > > > a few Streams users who have already had to > > >> implement > > >> > > > > their own > > >> > > > > > >>> > > > > > noop-suppression transformers in order to get > to > > >> production > > >> > > > > > >>> scale. > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > Regardless, it sounds like we can agree on > taking > > >> an > > >> > > > > > >>> opportunistic approach > > >> > > > > > >>> > > > > > and targeting the optimization just to use a > > >> > > > > binary-equality > > >> > > > > > >>> check at > > >> > > > > > >>> > > > > > stateful operators. (I'd also suggest in sink > > >> nodes, when > > >> > > > > we > > >> > > > > > >>> are about to > > >> > > > > > >>> > > > > > send old and new values, since they are also > > >> already > > >> > > > > present > > >> > > > > > >>> and serialized > > >> > > > > > >>> > > > > > at that point.) We could make the KIP even > more > > >> vague, and > > >> > > > > > >>> just say that > > >> > > > > > >>> > > > > > we'll drop no-op updates "when possible". > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > I'm curious what Bruno and the others think > about > > >> this. If > > >> > > > > it > > >> > > > > > >>> seems like > > >> > > > > > >>> > > > > > a good starting point, perhaps we could move > to a > > >> vote soon > > >> > > > > > >>> and get to > > >> > > > > > >>> > > > > > work on the implementation! > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > Thanks, > > >> > > > > > >>> > > > > > -John > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J. > Sax > > >> wrote: > > >> > > > > > >>> > > > > > > Talking about optimizations and reducing > > >> downstream load: > > >> > > > > > >>> > > > > > > > > >> > > > > > >>> > > > > > > Do we actually have any numbers? I have the > > >> impression > > >> > > > > that > > >> > > > > > >>> this KIP is > > >> > > > > > >>> > > > > > > more or less build on the _assumption_ that > > >> there is a > > >> > > > > > >>> problem. Yes, > > >> > > > > > >>> > > > > > > there are some use cases that would benefit > > >> from this; > > >> > > > > But > > >> > > > > > >>> how many > > >> > > > > > >>> > > > > > > applications would actually benefit? And how > > >> much load > > >> > > > > > >>> reduction would > > >> > > > > > >>> > > > > > > they get? > > >> > > > > > >>> > > > > > > > > >> > > > > > >>> > > > > > > The simplest approach (following John idea > to > > >> make baby > > >> > > > > > >>> steps) would be > > >> > > > > > >>> > > > > > > to apply the emit-on-change pattern only if > > >> there is a > > >> > > > > > >>> store. For this > > >> > > > > > >>> > > > > > > case we need to serialize old and new result > > >> anyway and > > >> > > > > thus > > >> > > > > > >>> a simple > > >> > > > > > >>> > > > > > > byte-array comparison is no overhead. > > >> > > > > > >>> > > > > > > > > >> > > > > > >>> > > > > > > Sending `oldValues` by default would become > > >> expensive > > >> > > > > > >>> because we would > > >> > > > > > >>> > > > > > > need to serialize the recomputed old > result, as > > >> well as > > >> > > > > the > > >> > > > > > >>> new result, > > >> > > > > > >>> > > > > > > to make the comparison (and we now the > > >> serialization is > > >> > > > > not > > >> > > > > > >>> cheap). We > > >> > > > > > >>> > > > > > > are facing a trade-off between CPU overhead > and > > >> > > > > downstream > > >> > > > > > >>> load and I am > > >> > > > > > >>> > > > > > > not sure if we should hard code this. My > > >> original > > >> > > > > argument > > >> > > > > > >>> for sending > > >> > > > > > >>> > > > > > > `oldValues` was about semantics; but for an > > >> > > > > optimization, I > > >> > > > > > >>> am not sure > > >> > > > > > >>> > > > > > > if this would be the right choice. > > >> > > > > > >>> > > > > > > > > >> > > > > > >>> > > > > > > For now, users who want to opt-in can force > a > > >> > > > > > >>> materialization. A > > >> > > > > > >>> > > > > > > materialization may be expensive and if we > see > > >> future > > >> > > > > > >>> demand, we could > > >> > > > > > >>> > > > > > > still add an option to send `oldValues` > instead > > >> of > > >> > > > > > >>> materialization (this > > >> > > > > > >>> > > > > > > would at least save the store overhead). As > we > > >> consider > > >> > > > > the > > >> > > > > > >>> KIP an > > >> > > > > > >>> > > > > > > optimization, a "config" seems to make > sense. > > >> > > > > > >>> > > > > > > > > >> > > > > > >>> > > > > > > > > >> > > > > > >>> > > > > > > -Matthias > > >> > > > > > >>> > > > > > > > > >> > > > > > >>> > > > > > > > > >> > > > > > >>> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote: > > >> > > > > > >>> > > > > > > > Hi John! > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > Thanks for the reply. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > About the changes we have discussed so > far. I > > >> think > > >> > > > > upon > > >> > > > > > >>> further > > >> > > > > > >>> > > > > > > > consideration, we have been mostly talking > > >> about this > > >> > > > > from > > >> > > > > > >>> the > > >> > > > > > >>> > > > > > perspective > > >> > > > > > >>> > > > > > > > that no stop-gap effort is acceptable. > > >> However, in > > >> > > > > recent > > >> > > > > > >>> discussion, > > >> > > > > > >>> > > > > > > if we > > >> > > > > > >>> > > > > > > > consider optimization, then it appears > that > > >> the > > >> > > > > > >>> perspective I > > >> > > > > > >>> > > > > > mentioned no > > >> > > > > > >>> > > > > > > > longer applies. After all, we are no > longer > > >> concerned > > >> > > > > so > > >> > > > > > >>> much about > > >> > > > > > >>> > > > > > > > semantics correctness, then reducing > traffic > > >> as much as > > >> > > > > > >>> possible > > >> > > > > > >>> > > > > > without > > >> > > > > > >>> > > > > > > > performance tradeoffs. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > In this case, I think a cache would be a > good > > >> idea for > > >> > > > > > >>> stateless > > >> > > > > > >>> > > > > > > > operations. This cache will not be backed > by > > >> a store > > >> > > > > > >>> obviously. We can > > >> > > > > > >>> > > > > > > > probably use Kafka's ThreadCache. We > should > > >> be able to > > >> > > > > > >>> catch a large > > >> > > > > > >>> > > > > > > > portion of the no-ops if we at least store > > >> some > > >> > > > > results in > > >> > > > > > >>> the cache. > > >> > > > > > >>> > > > > > Not > > >> > > > > > >>> > > > > > > > all will be caught, but I think the impact > > >> will be > > >> > > > > > >>> significant. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > On another note, I think that we should > > >> implement > > >> > > > > > >>> competing proposals > > >> > > > > > >>> > > > > > i.e. > > >> > > > > > >>> > > > > > > > one where we forward both old and new > values > > >> with a > > >> > > > > > >>> reasonable > > >> > > > > > >>> > > > > > proportion > > >> > > > > > >>> > > > > > > > of artificial no-ops (we do not > necessarily > > >> have to > > >> > > > > rely > > >> > > > > > >>> on equals so > > >> > > > > > >>> > > > > > much > > >> > > > > > >>> > > > > > > > as comparing the serialized binary data > after > > >> the > > >> > > > > > >>> operation), and in > > >> > > > > > >>> > > > > > > > another scenario, the cache for stateless > > >> ops. It > > >> > > > > would be > > >> > > > > > >>> > > > > > unreasonable if > > >> > > > > > >>> > > > > > > > we completely disregard either approach, > > >> since they > > >> > > > > both > > >> > > > > > >>> have merit. > > >> > > > > > >>> > > > > > The > > >> > > > > > >>> > > > > > > > reason for implementing both is to perform > > >> benchmark > > >> > > > > tests > > >> > > > > > >>> on them, and > > >> > > > > > >>> > > > > > > > compare them with the original. This way, > we > > >> can more > > >> > > > > > >>> clearly see what > > >> > > > > > >>> > > > > > is > > >> > > > > > >>> > > > > > > > the drawbacks and the gains. So far, we > have > > >> been > > >> > > > > > >>> discussing only > > >> > > > > > >>> > > > > > > > hypotheticals, and if we continue to do > so, I > > >> think it > > >> > > > > is > > >> > > > > > >>> likely no > > >> > > > > > >>> > > > > > ground > > >> > > > > > >>> > > > > > > > will be gained. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > After all, what we seek is optimization, > and > > >> > > > > performance > > >> > > > > > >>> benchmarks > > >> > > > > > >>> > > > > > > will be > > >> > > > > > >>> > > > > > > > mandatory for a KIP of this nature. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > Hope this helps, > > >> > > > > > >>> > > > > > > > Richard > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John > Roesler < > > >> > > > > > >>> vvcep...@apache.org> > > >> > > > > > >>> > > > > > wrote: > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > >> Hi again, all, > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> Sorry on my part for my silence. > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> I've just taken another look over the > recent > > >> history > > >> > > > > of > > >> > > > > > >>> this > > >> > > > > > >>> > > > > > > discussion. It > > >> > > > > > >>> > > > > > > >> seems like the #1 point to clarify > (because > > >> it affect > > >> > > > > > >>> everything > > >> > > > > > >>> > > > > > else) is > > >> > > > > > >>> > > > > > > >> that, > > >> > > > > > >>> > > > > > > >> yes, I was 100% envisioning this as an > > >> _optimization_. > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> As a consequence, I don't think it's > > >> critical to make > > >> > > > > any > > >> > > > > > >>> hard > > >> > > > > > >>> > > > > > guarantees > > >> > > > > > >>> > > > > > > >> about > > >> > > > > > >>> > > > > > > >> what results get forwarded and what > (no-op > > >> updates) > > >> > > > > get > > >> > > > > > >>> dropped. I'd > > >> > > > > > >>> > > > > > > >> initially > > >> > > > > > >>> > > > > > > >> just been thinking about doing this > > >> opportunistically > > >> > > > > in > > >> > > > > > >>> cases where > > >> > > > > > >>> > > > > > we > > >> > > > > > >>> > > > > > > >> already > > >> > > > > > >>> > > > > > > >> had the "old" and "new" result in memory, > > >> thanks to a > > >> > > > > > >>> request to > > >> > > > > > >>> > > > > > > "emit old > > >> > > > > > >>> > > > > > > >> values", or to the implementation of > > >> timestamp > > >> > > > > semantics. > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> However, whether or not it's semantically > > >> critical, I > > >> > > > > do > > >> > > > > > >>> think that > > >> > > > > > >>> > > > > > > >> Matthias's > > >> > > > > > >>> > > > > > > >> idea to use the change-forwarding > mechanism > > >> to check > > >> > > > > for > > >> > > > > > >>> no-ops even > > >> > > > > > >>> > > > > > on > > >> > > > > > >>> > > > > > > >> stateless operations is pretty > interesting. > > >> > > > > Specifically, > > >> > > > > > >>> this would > > >> > > > > > >>> > > > > > > >> _really_ > > >> > > > > > >>> > > > > > > >> let you pare down useless updates by > using > > >> mapValues > > >> > > > > to > > >> > > > > > >>> strip down > > >> > > > > > >>> > > > > > > records > > >> > > > > > >>> > > > > > > >> only > > >> > > > > > >>> > > > > > > >> to what you really need. However, the > > >> dependence on > > >> > > > > the > > >> > > > > > >>> > > > > > implementation of > > >> > > > > > >>> > > > > > > >> equals() is troubling. > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> It might make sense to table this idea, > as > > >> well as my > > >> > > > > > >>> complicated > > >> > > > > > >>> > > > > > no-op > > >> > > > > > >>> > > > > > > >> detection algorithm, and initially > propose > > >> just a > > >> > > > > > >>> nonconfigurable > > >> > > > > > >>> > > > > > feature > > >> > > > > > >>> > > > > > > >> to > > >> > > > > > >>> > > > > > > >> check "old" and "new" results for binary > > >> equality > > >> > > > > before > > >> > > > > > >>> forwarding. > > >> > > > > > >>> > > > > > > I.e., > > >> > > > > > >>> > > > > > > >> if > > >> > > > > > >>> > > > > > > >> any operation determines that the old and > > >> new results > > >> > > > > are > > >> > > > > > >>> > > > > > > >> binary-identical, we > > >> > > > > > >>> > > > > > > >> would not forward. > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> I'll admit that this doesn't serve > Tommy's > > >> use case > > >> > > > > very > > >> > > > > > >>> well, but it > > >> > > > > > >>> > > > > > > >> might be > > >> > > > > > >>> > > > > > > >> better to take baby steps with an > > >> optimization like > > >> > > > > this > > >> > > > > > >>> and not risk > > >> > > > > > >>> > > > > > > >> over-reaching in a way that actually > harms > > >> > > > > performance or > > >> > > > > > >>> > > > > > correctness. We > > >> > > > > > >>> > > > > > > >> could > > >> > > > > > >>> > > > > > > >> always expand the feature to use > equals() or > > >> some > > >> > > > > kind of > > >> > > > > > >>> > > > > > ChangeDetector > > >> > > > > > >>> > > > > > > >> later > > >> > > > > > >>> > > > > > > >> on, in a more focused discussion. > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> Regarding metrics or debug logs, I guess > I > > >> don't feel > > >> > > > > > >>> strongly, but it > > >> > > > > > >>> > > > > > > >> feels > > >> > > > > > >>> > > > > > > >> like two things will happen that make it > > >> nicer to add > > >> > > > > > >>> them: > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> 1. This feature is going to > surprise/annoy > > >> _somebody_, > > >> > > > > > >>> and it would be > > >> > > > > > >>> > > > > > > >> nice to > > >> > > > > > >>> > > > > > > >> be able to definitively say the reason > that > > >> updates > > >> > > > > are > > >> > > > > > >>> dropped is > > >> > > > > > >>> > > > > > that > > >> > > > > > >>> > > > > > > >> they > > >> > > > > > >>> > > > > > > >> were no-ops. The easiest smoking gun is > if > > >> there are > > >> > > > > > >>> debug-logs that > > >> > > > > > >>> > > > > > > can be > > >> > > > > > >>> > > > > > > >> enabled. This person might just be > looking > > >> at the > > >> > > > > > >>> dashboards, > > >> > > > > > >>> > > > > > > wondering why > > >> > > > > > >>> > > > > > > >> there are 100K updates per second going > into > > >> their > > >> > > > > app, > > >> > > > > > >>> but only 1K > > >> > > > > > >>> > > > > > > >> results per > > >> > > > > > >>> > > > > > > >> second coming out. Having the metric > there > > >> makes the > > >> > > > > > >>> accounting > > >> > > > > > >>> > > > > > easier. > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> 2. Somebody is going to struggle with > > >> high-volume > > >> > > > > > >>> updates, and it > > >> > > > > > >>> > > > > > > would be > > >> > > > > > >>> > > > > > > >> nice > > >> > > > > > >>> > > > > > > >> for them to know that this feature is > saving > > >> them > > >> > > > > > >>> X-thousand updates > > >> > > > > > >>> > > > > > per > > >> > > > > > >>> > > > > > > >> second, > > >> > > > > > >>> > > > > > > >> etc. > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> What does everyone think about this? > Note, > > >> as I read > > >> > > > > it, > > >> > > > > > >>> what I've > > >> > > > > > >>> > > > > > said > > >> > > > > > >>> > > > > > > >> above is > > >> > > > > > >>> > > > > > > >> already reflected in the text of the KIP. > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> Thanks, > > >> > > > > > >>> > > > > > > >> -John > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard > Yu > > >> wrote: > > >> > > > > > >>> > > > > > > >>> Hi all, > > >> > > > > > >>> > > > > > > >>> > > >> > > > > > >>> > > > > > > >>> Bumping this. If you feel that this KIP > is > > >> not too > > >> > > > > > >>> urgent. Then let > > >> > > > > > >>> > > > > > me > > >> > > > > > >>> > > > > > > >>> know. :) > > >> > > > > > >>> > > > > > > >>> > > >> > > > > > >>> > > > > > > >>> Cheers, > > >> > > > > > >>> > > > > > > >>> Richard > > >> > > > > > >>> > > > > > > >>> > > >> > > > > > >>> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard > Yu < > > >> > > > > > >>> > > > > > yohan.richard...@gmail.com> > > >> > > > > > >>> > > > > > > >>> wrote: > > >> > > > > > >>> > > > > > > >>> > > >> > > > > > >>> > > > > > > >>>> Hi all, > > >> > > > > > >>> > > > > > > >>>> > > >> > > > > > >>> > > > > > > >>>> I've had just a few thoughts regarding > the > > >> > > > > forwarding > > >> > > > > > >>> of <key, > > >> > > > > > >>> > > > > > > >>>> change<old_value, new_value>>. As > Matthias > > >> already > > >> > > > > > >>> mentioned, there > > >> > > > > > >>> > > > > > > >> are two > > >> > > > > > >>> > > > > > > >>>> separate priorities by which we can > judge > > >> this KIP: > > >> > > > > > >>> > > > > > > >>>> > > >> > > > > > >>> > > > > > > >>>> 1. A optimization perspective: In this > > >> case, the > > >> > > > > user > > >> > > > > > >>> would prefer > > >> > > > > > >>> > > > > > the > > >> > > > > > >>> > > > > > > >>>> impact of this KIP to be as minimal as > > >> possible. By > > >> > > > > > >>> such logic, if > > >> > > > > > >>> > > > > > > >>>> stateless operations are performed > twice, > > >> that could > > >> > > > > > >>> prove > > >> > > > > > >>> > > > > > > >> unacceptable for > > >> > > > > > >>> > > > > > > >>>> them. (since operations can prove > > >> expensive) > > >> > > > > > >>> > > > > > > >>>> > > >> > > > > > >>> > > > > > > >>>> 2. Semantics correctness perspective: > > >> Unlike the > > >> > > > > > >>> optimization > > >> > > > > > >>> > > > > > > >> approach, we > > >> > > > > > >>> > > > > > > >>>> are more concerned with all KTable > > >> operations > > >> > > > > obeying > > >> > > > > > >>> the same > > >> > > > > > >>> > > > > > emission > > >> > > > > > >>> > > > > > > >>>> policy. i.e. emit on change. In this > case, > > >> a > > >> > > > > > >>> discrepancy would not > > >> > > > > > >>> > > > > > be > > >> > > > > > >>> > > > > > > >>>> tolerated, even though an extra > > >> performance cost > > >> > > > > will > > >> > > > > > >>> be incurred. > > >> > > > > > >>> > > > > > > >>>> Therefore, we will follow Matthias's > > >> approach, and > > >> > > > > then > > >> > > > > > >>> perform the > > >> > > > > > >>> > > > > > > >>>> operation once on the old value, and > once > > >> on the > > >> > > > > new. > > >> > > > > > >>> > > > > > > >>>> > > >> > > > > > >>> > > > > > > >>>> The issue here I think is more black > and > > >> white than > > >> > > > > in > > >> > > > > > >>> between. The > > >> > > > > > >>> > > > > > > >> second > > >> > > > > > >>> > > > > > > >>>> option in particular would be favorable > > >> for users > > >> > > > > with > > >> > > > > > >>> inexpensive > > >> > > > > > >>> > > > > > > >>>> stateless operations, while for the > former > > >> option, > > >> > > > > we > > >> > > > > > >>> are probably > > >> > > > > > >>> > > > > > > >> dealing > > >> > > > > > >>> > > > > > > >>>> with more expensive ones. So the > simplest > > >> solution > > >> > > > > is > > >> > > > > > >>> probably to > > >> > > > > > >>> > > > > > > >> allow the > > >> > > > > > >>> > > > > > > >>>> user to choose one of the behaviors, > and > > >> have a > > >> > > > > config > > >> > > > > > >>> which can > > >> > > > > > >>> > > > > > > >> switch in > > >> > > > > > >>> > > > > > > >>>> between them. > > >> > > > > > >>> > > > > > > >>>> > > >> > > > > > >>> > > > > > > >>>> Its the simplest compromise I can come > up > > >> with at > > >> > > > > the > > >> > > > > > >>> moment, but if > > >> > > > > > >>> > > > > > > >> you > > >> > > > > > >>> > > > > > > >>>> think you have a better plan which > could > > >> better > > >> > > > > balance > > >> > > > > > >>> tradeoffs. > > >> > > > > > >>> > > > > > Then > > >> > > > > > >>> > > > > > > >>>> please let us know. :) > > >> > > > > > >>> > > > > > > >>>> > > >> > > > > > >>> > > > > > > >>>> Best, > > >> > > > > > >>> > > > > > > >>>> Richard > > >> > > > > > >>> > > > > > > >>>> > > >> > > > > > >>> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John > > >> Roesler < > > >> > > > > > >>> vvcep...@apache.org> > > >> > > > > > >>> > > > > > > >> wrote: > > >> > > > > > >>> > > > > > > >>>> > > >> > > > > > >>> > > > > > > >>>>> Hi all, > > >> > > > > > >>> > > > > > > >>>>> > > >> > > > > > >>> > > > > > > >>>>> Thanks for the thoughtful comments! > > >> > > > > > >>> > > > > > > >>>>> > > >> > > > > > >>> > > > > > > >>>>> I need more time to reflect on your > > >> thoughts, but > > >> > > > > just > > >> > > > > > >>> wanted to > > >> > > > > > >>> > > > > > offer > > >> > > > > > >>> > > > > > > >>>>> a quick clarification about equals(). > > >> > > > > > >>> > > > > > > >>>>> > > >> > > > > > >>> > > > > > > >>>>> I only meant that we can't be sure if > a > > >> class's > > >> > > > > > >>> equals() > > >> > > > > > >>> > > > > > > >> implementation > > >> > > > > > >>> > > > > > > >>>>> returns true for two semantically > > >> identical > > >> > > > > instances. > > >> > > > > > >>> I.e., if a > > >> > > > > > >>> > > > > > > >> class > > >> > > > > > >>> > > > > > > >>>>> doesn't > > >> > > > > > >>> > > > > > > >>>>> override the default equals() > > >> implementation, then > > >> > > > > we > > >> > > > > > >>> would see > > >> > > > > > >>> > > > > > > >> behavior > > >> > > > > > >>> > > > > > > >>>>> like: > > >> > > > > > >>> > > > > > > >>>>> > > >> > > > > > >>> > > > > > > >>>>> new MyPair("A", 1).equals(new > MyPair("A", > > >> 1)) > > >> > > > > returns > > >> > > > > > >>> false > > >> > > > > > >>> > > > > > > >>>>> > > >> > > > > > >>> > > > > > > >>>>> In that case, I would still like to > catch > > >> no-op > > >> > > > > > >>> updates by > > >> > > > > > >>> > > > > > comparing > > >> > > > > > >>> > > > > > > >> the > > >> > > > > > >>> > > > > > > >>>>> serialized form of the records when we > > >> happen to > > >> > > > > have > > >> > > > > > >>> it serialized > > >> > > > > > >>> > > > > > > >> anyway > > >> > > > > > >>> > > > > > > >>>>> (such as when the operation is > stateful, > > >> or when > > >> > > > > we're > > >> > > > > > >>> sending to a > > >> > > > > > >>> > > > > > > >>>>> repartition topic and we have both the > > >> "new" and > > >> > > > > "old" > > >> > > > > > >>> value from > > >> > > > > > >>> > > > > > > >>>>> upstream). > > >> > > > > > >>> > > > > > > >>>>> > > >> > > > > > >>> > > > > > > >>>>> I didn't mean to suggest we'd try to > use > > >> > > > > reflection to > > >> > > > > > >>> detect > > >> > > > > > >>> > > > > > whether > > >> > > > > > >>> > > > > > > >>>>> equals > > >> > > > > > >>> > > > > > > >>>>> is implemented, although that is a > neat > > >> trick. I > > >> > > > > was > > >> > > > > > >>> thinking more > > >> > > > > > >>> > > > > > of > > >> > > > > > >>> > > > > > > >> a > > >> > > > > > >>> > > > > > > >>>>> belt-and-suspenders algorithm where > we do > > >> the check > > >> > > > > > >>> for no-ops > > >> > > > > > >>> > > > > > based > > >> > > > > > >>> > > > > > > >> on > > >> > > > > > >>> > > > > > > >>>>> equals() and then _also_ check the > > >> serialized bytes > > >> > > > > > >>> for equality. > > >> > > > > > >>> > > > > > > >>>>> > > >> > > > > > >>> > > > > > > >>>>> Thanks, > > >> > > > > > >>> > > > > > > >>>>> -John > > >> > > > > > >>> > > > > > > >>>>> > > >> > > > > > >>> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu > > >> wrote: > > >> > > > > > >>> > > > > > > >>>>>> Thanks for the comments, Matthias. > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > >>>>>> w.r.t. requirement of an `equals()` > > >> > > > > implementation, > > >> > > > > > >>> each template > > >> > > > > > >>> > > > > > > >> type > > >> > > > > > >>> > > > > > > >>>>>> would have an equals() method. We can > > >> use the > > >> > > > > > >>> following code to > > >> > > > > > >>> > > > > > know > > >> > > > > > >>> > > > > > > >>>>>> whether it is provided by JVM or > > >> provided by user. > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > >>>>>> boolean customEquals = false; > > >> > > > > > >>> > > > > > > >>>>>> try { > > >> > > > > > >>> > > > > > > >>>>>> Class cls = > > >> > > > > value.getClass().getMethod("equals", > > >> > > > > > >>> > > > > > > >>>>>> Object.class).getDeclaringClass(); > > >> > > > > > >>> > > > > > > >>>>>> if (!Object.class.equals(cls)) { > > >> > > > > > >>> > > > > > > >>>>>> customEquals = true; > > >> > > > > > >>> > > > > > > >>>>>> } > > >> > > > > > >>> > > > > > > >>>>>> } catch (NoSuchMethodException nsme) > { > > >> > > > > > >>> > > > > > > >>>>>> // equals is always defined, this > > >> wouldn't hit > > >> > > > > > >>> > > > > > > >>>>>> } > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > >>>>>> The next question is: what if the > user > > >> doesn't > > >> > > > > > >>> provide equals() > > >> > > > > > >>> > > > > > > >> method ? > > >> > > > > > >>> > > > > > > >>>>>> Would we automatically fall back to > > >> > > > > emit-on-update ? > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > >>>>>> Cheers > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM > Matthias > > >> J. Sax < > > >> > > > > > >>> mj...@apache.org> > > >> > > > > > >>> > > > > > > >>>>> wrote: > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > > First a high level comment: > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > Overall, I would like to make one step > back, > > >> and make > > >> > > > > sure > > >> > > > > > >>> we are > > >> > > > > > >>> > > > > > > > discussion on the same level. Originally, > I > > >> understood > > >> > > > > > >>> this KIP > > >> > > > > > >>> > > > > > > >>> as a > > >> > > > > > >>> > > > > > > > proposed change of _semantics_, however, > > >> given the > > >> > > > > latest > > >> > > > > > >>> > > > > > > >>> discussion > > >> > > > > > >>> > > > > > > > it seems it's actually not -- it's more an > > >> > > > > _optimization_ > > >> > > > > > >>> > > > > > > >>> proposal. > > >> > > > > > >>> > > > > > > > Hence, we only need to make sure that this > > >> optimization > > >> > > > > > >>> does not > > >> > > > > > >>> > > > > > > >>> break > > >> > > > > > >>> > > > > > > > existing semantics. It this the right way > to > > >> think > > >> > > > > about > > >> > > > > > >>> it? > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > If yes, than it might actually be ok to > have > > >> different > > >> > > > > > >>> behavior > > >> > > > > > >>> > > > > > > > depending if there is a materialized > KTable > > >> or not. So > > >> > > > > > >>> far, we > > >> > > > > > >>> > > > > > > >>> never > > >> > > > > > >>> > > > > > > > defined a public contract about our emit > > >> strategy and > > >> > > > > it > > >> > > > > > >>> seems > > >> > > > > > >>> > > > > > > >>> this > > >> > > > > > >>> > > > > > > > KIP does not define one either. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > Hence, I don't have as strong of an > opinion > > >> about > > >> > > > > sending > > >> > > > > > >>> > > > > > > >>> oldValues > > >> > > > > > >>> > > > > > > > for example any longer. I guess the > question > > >> is really, > > >> > > > > > >>> what can > > >> > > > > > >>> > > > > > > >>> we > > >> > > > > > >>> > > > > > > > implement in a reasonable way. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > Other comments: > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > @Richard: > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > Can you please add the KIP to the KIP > > >> overview table: > > >> > > > > It's > > >> > > > > > >>> missing > > >> > > > > > >>> > > > > > > > ( > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > >>> > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > >> > > > > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro > > >> > > > > > >>> > > > > > > > posals). > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > @Bruno: > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > You mentioned caching. I think it's > irrelevant > > >> > > > > > >>> (orthogonal) and > > >> > > > > > >>> > > > > > > >>> we can > > >> > > > > > >>> > > > > > > > discuss this KIP without considering it. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > @John: > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > >>>>>>>>> Even in the source table, we > forward > > >> the > > >> > > > > updated > > >> > > > > > >>> record with > > >> > > > > > >>> > > > > > the > > >> > > > > > >>> > > > > > > >>>>>>>>> higher of the two timestamps. So > the > > >> example is > > >> > > > > > >>> more like: > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > That is not correct. Currently, we forward > > >> with the > > >> > > > > smaller > > >> > > > > > >>> > > > > > > > out-of-order timestamp (changing the > > >> timestamp would > > >> > > > > > >>> corrupt the > > >> > > > > > >>> > > > > > > >>> data > > >> > > > > > >>> > > > > > > > -- we don't know, because we don't check, > if > > >> the value > > >> > > > > is > > >> > > > > > >>> the > > >> > > > > > >>> > > > > > > >>> same > > >> > > > > > >>> > > > > > > >>>>>> or > > >> > > > > > >>> > > > > > > > a different one, hence, we must emit the > > >> out-of-order > > >> > > > > > >>> record > > >> > > > > > >>> > > > > > > >>> as-is). > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > If we start to do emit-on-change, we also > > >> need to emit > > >> > > > > a > > >> > > > > > >>> new > > >> > > > > > >>> > > > > > > >>> record if > > >> > > > > > >>> > > > > > > > the timestamp changes due to out-of-order > > >> data, hence, > > >> > > > > we > > >> > > > > > >>> would > > >> > > > > > >>> > > > > > > >>> still > > >> > > > > > >>> > > > > > > > need to emit <K,V,T1> because that give us > > >> correct > > >> > > > > > >>> semantics: > > >> > > > > > >>> > > > > > > >>> assume > > >> > > > > > >>> > > > > > > > you have a filter() and afterward use the > > >> filter > > >> > > > > KTable in > > >> > > > > > >>> a > > >> > > > > > >>> > > > > > > > stream-table join -- the lower T1 > timestamp > > >> must be > > >> > > > > > >>> propagated to > > >> > > > > > >>> > > > > > > >>> the > > >> > > > > > >>> > > > > > > > filtered KTable to ensure that that the > > >> stream-table > > >> > > > > join > > >> > > > > > >>> compute > > >> > > > > > >>> > > > > > > >>> the > > >> > > > > > >>> > > > > > > > correct result. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > Your point about requiring an `equals()` > > >> > > > > implementation is > > >> > > > > > >>> > > > > > > >>> actually a > > >> > > > > > >>> > > > > > > > quite interesting one and boils down to my > > >> statement > > >> > > > > from > > >> > > > > > >>> above > > >> > > > > > >>> > > > > > > >>> about > > >> > > > > > >>> > > > > > > > "what can we actually implement". What I > don't > > >> > > > > understand > > >> > > > > > >>> is: > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > >>>>>>>>> This way, we still don't have to > rely > > >> on the > > >> > > > > > >>> existence of an > > >> > > > > > >>> > > > > > > >>>>>>>>> equals() method, but if it is > there, > > >> we can > > >> > > > > > >>> benefit from it. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > Your bullet point (2) says it uses > `equals()` > > >> -- > > >> > > > > hence, it > > >> > > > > > >>> seems > > >> > > > > > >>> > > > > > > >>> we > > >> > > > > > >>> > > > > > > > actually to rely on it? Also, how can we > > >> detect if > > >> > > > > there > > >> > > > > > >>> is an > > >> > > > > > >>> > > > > > > > `equals()` method to do the comparison? > Would > > >> be fail > > >> > > > > if > > >> > > > > > >>> we don't > > >> > > > > > >>> > > > > > > >>> have > > >> > > > > > >>> > > > > > > > `equals()` nor corresponding serializes > to do > > >> the > > >> > > > > > >>> comparison? > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we > > >> absolutely need > > >> > > > > > >>> metrics and > > >> > > > > > >>> > > > > > > >>> logs if > > >> > > > > > >>> > > > > > > >>>>>>>>> we're going to drop any records. > And, > > >> yes, we > > >> > > > > > >>> should propose > > >> > > > > > >>> > > > > > > >>>>>>>>> metrics and logs that are similar > to > > >> the > > >> > > > > existing > > >> > > > > > >>> ones when we > > >> > > > > > >>> > > > > > > >>> drop > > >> > > > > > >>> > > > > > > >>>>>>>>> records for other reasons. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > I am not sure about this point. In fact, > we > > >> have > > >> > > > > already > > >> > > > > > >>> some > > >> > > > > > >>> > > > > > > >>> no-ops > > >> > > > > > >>> > > > > > > > in Kafka Streams in our join-operators and > > >> don't report > > >> > > > > > >>> any of > > >> > > > > > >>> > > > > > > >>> those > > >> > > > > > >>> > > > > > > > either. Emit-on-change is operator > semantics > > >> and I > > >> > > > > don't > > >> > > > > > >>> see why > > >> > > > > > >>> > > > > > > >>> we > > >> > > > > > >>> > > > > > > > would need to have a metric for it? It > seems > > >> to be > > >> > > > > quite > > >> > > > > > >>> different > > >> > > > > > >>> > > > > > > > compared to dropping late or malformed > > >> records. > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > -Matthias > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote: > > >> > > > > > >>> > > > > > > >>>>>>>>> Thanks John for your thoughtful > > >> reply. Some > > >> > > > > > >>> comments inline. > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 -0600, > > >> John Roesler > > >> > > > > > >>> wrote: > > >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This > > >> email was > > >> > > > > sent > > >> > > > > > >>> from outside > > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or > > >> attachments > > >> > > > > > >>> unless you > > >> > > > > > >>> > > > > > expected > > >> > > > > > >>> > > > > > > >>>>>>>>>> them. > > >> ________________________________ > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Tommy, > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> Thanks for the context. I can > see the > > >> > > > > attraction > > >> > > > > > >>> of > > >> > > > > > >>> > > > > > considering > > >> > > > > > >>> > > > > > > >>>>>>>>>> these use cases together. > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> To answer your question, if a > part > > >> of the > > >> > > > > record > > >> > > > > > >>> is not > > >> > > > > > >>> > > > > > > >>> relevant > > >> > > > > > >>> > > > > > > >>>>>>>>>> to downstream consumers, I was > > >> thinking you > > >> > > > > could > > >> > > > > > >>> just use a > > >> > > > > > >>> > > > > > > >>>>>>>>>> mapValue to remove it. > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a > > >> join between > > >> > > > > two > > >> > > > > > >>> tables. > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> employeeInfo.join( > employeePayroll, > > >> (info, > > >> > > > > > >>> payroll) -> new > > >> > > > > > >>> > > > > > > >>>>>>>>>> Result(info.name(), > > >> payroll.salary()) ) > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> We only care about one attribute > > >> from the Info > > >> > > > > > >>> table (name), > > >> > > > > > >>> > > > > > > >>> and > > >> > > > > > >>> > > > > > > >>>>>>>>>> one from the Payroll table > (salary), > > >> and these > > >> > > > > > >>> attributes > > >> > > > > > >>> > > > > > > >>> change > > >> > > > > > >>> > > > > > > >>>>>>>>>> rarely. On the other hand, there > > >> might be many > > >> > > > > > >>> other > > >> > > > > > >>> > > > > > attributes > > >> > > > > > >>> > > > > > > >>>>>>>>>> that change frequently of these > > >> tables. We can > > >> > > > > > >>> avoid > > >> > > > > > >>> > > > > > triggering > > >> > > > > > >>> > > > > > > >>>>>>>>>> the join unnecessarily by mapping > > >> the input > > >> > > > > > >>> tables to drop the > > >> > > > > > >>> > > > > > > >>>>>>>>>> unnecessary information before > the > > >> join: > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> names = > employeeInfo.mapValues(info > > >> -> > > >> > > > > info.name()) > > >> > > > > > >>> salaries > > >> > > > > > >>> > > > > > = > > >> > > > > > >>> > > > > > > >>>>>>>>>> > employeePayroll.mapValues(payroll -> > > >> > > > > > >>> payroll.salary()) > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> names.join( salaries, (name, > salary) > > >> -> new > > >> > > > > > >>> Result(name, > > >> > > > > > >>> > > > > > > >>> salary) > > >> > > > > > >>> > > > > > > >>>>>>>>>> ) > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> Ahh yes I see. This works, but in > the > > >> case > > >> > > > > where > > >> > > > > > >>> you're using > > >> > > > > > >>> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it > > >> seems like > > >> > > > > this > > >> > > > > > >>> approach > > >> > > > > > >>> > > > > > could > > >> > > > > > >>> > > > > > > >>>>>>>>> lead to a proliferation of > "skinny" > > >> record > > >> > > > > types > > >> > > > > > >>> that just drop > > >> > > > > > >>> > > > > > > >>>>>>>>> various fields. > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> Especially if we take Matthias's > > >> idea to drop > > >> > > > > > >>> non-changes even > > >> > > > > > >>> > > > > > > >>>>>>>>>> for stateless operations, this > would > > >> be quite > > >> > > > > > >>> efficient and is > > >> > > > > > >>> > > > > > > >>>>>>>>>> also a very straightforward > > >> optimization to > > >> > > > > > >>> understand once > > >> > > > > > >>> > > > > > you > > >> > > > > > >>> > > > > > > >>>>>>>>>> know that Streams provides > > >> emit-on-change. > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> From the context that you > provided, > > >> it seems > > >> > > > > like > > >> > > > > > >>> a slightly > > >> > > > > > >>> > > > > > > >>>>>>>>>> different situation, though. > Reading > > >> between > > >> > > > > the > > >> > > > > > >>> lines a > > >> > > > > > >>> > > > > > > >>> little, > > >> > > > > > >>> > > > > > > >>>>>>>>>> it sounds like: in contrast to > the > > >> example > > >> > > > > above, > > >> > > > > > >>> in which we > > >> > > > > > >>> > > > > > > >>> are > > >> > > > > > >>> > > > > > > >>>>>>>>>> filtering out extra _data_, you > have > > >> some > > >> > > > > extra > > >> > > > > > >>> _metadata_ > > >> > > > > > >>> > > > > > that > > >> > > > > > >>> > > > > > > >>>>>>>>>> you still wish to pass down with > the > > >> data when > > >> > > > > > >>> there is a > > >> > > > > > >>> > > > > > > >>> "real" > > >> > > > > > >>> > > > > > > >>>>>>>>>> update, but you don't want the > > >> metadata > > >> > > > > itself to > > >> > > > > > >>> cause an > > >> > > > > > >>> > > > > > > >>>>>>>>>> update. > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> Despite my lack of clarity, yes > > >> you've got it > > >> > > > > > >>> right ;) This > > >> > > > > > >>> > > > > > > >>>>>>>>> particular processor is the first > > >> stop for this > > >> > > > > > >>> data after > > >> > > > > > >>> > > > > > > >>> coming > > >> > > > > > >>> > > > > > > >>>>>>>>> in from external users, who often > > >> simply post > > >> > > > > the > > >> > > > > > >>> same content > > >> > > > > > >>> > > > > > > >>> each > > >> > > > > > >>> > > > > > > >>>>>>>>> time and we're trying to shield > > >> downstream > > >> > > > > > >>> consumers from > > >> > > > > > >>> > > > > > > >>>>>>>>> unnecessary churn. > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> It does seem handy to be able to > > >> plug in a > > >> > > > > custom > > >> > > > > > >>> > > > > > > >>> ChangeDetector > > >> > > > > > >>> > > > > > > >>>>>>>>>> for this purpose, but I worry > about > > >> the API > > >> > > > > > >>> complexity. Maybe > > >> > > > > > >>> > > > > > > >>> you > > >> > > > > > >>> > > > > > > >>>>>>>>>> can help think though how to > provide > > >> the same > > >> > > > > > >>> benefit while > > >> > > > > > >>> > > > > > > >>>>>>>>>> limiting user-facing complexity. > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> Here's some extra context to > > >> consider: > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> We currently don't make any extra > > >> requirements > > >> > > > > > >>> about the > > >> > > > > > >>> > > > > > nature > > >> > > > > > >>> > > > > > > >>>>>>>>>> of data that you can use in > Streams. > > >> For > > >> > > > > example, > > >> > > > > > >>> you don't > > >> > > > > > >>> > > > > > > >>> have > > >> > > > > > >>> > > > > > > >>>>>>>>>> to implement hashCode and > equals, or > > >> > > > > compareTo, > > >> > > > > > >>> etc. With the > > >> > > > > > >>> > > > > > > >>>>>>>>>> current proposal, we can do an > > >> airtight > > >> > > > > > >>> comparison based only > > >> > > > > > >>> > > > > > > >>> on > > >> > > > > > >>> > > > > > > >>>>>>>>>> the serialized form of the > values, > > >> and we > > >> > > > > > >>> actually don't have > > >> > > > > > >>> > > > > > > >>> to > > >> > > > > > >>> > > > > > > >>>>>>>>>> deserialize the "prior" value at > all > > >> for a > > >> > > > > large > > >> > > > > > >>> number of > > >> > > > > > >>> > > > > > > >>>>>>>>>> operations. Admitedly, if we > extend > > >> the > > >> > > > > proposal > > >> > > > > > >>> to include > > >> > > > > > >>> > > > > > > >>> no-op > > >> > > > > > >>> > > > > > > >>>>>>>>>> detection for stateless > operations, > > >> we'd > > >> > > > > probably > > >> > > > > > >>> need to rely > > >> > > > > > >>> > > > > > > >>> on > > >> > > > > > >>> > > > > > > >>>>>>>>>> equals() for no-op checking, > > >> otherwise we'd > > >> > > > > wind > > >> > > > > > >>> up requiring > > >> > > > > > >>> > > > > > > >>>>>>>>>> serdes for stateless operations > as > > >> well. > > >> > > > > > >>> Actually, I'd > > >> > > > > > >>> > > > > > probably > > >> > > > > > >>> > > > > > > >>>>>>>>>> argue for doing exactly that: > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> 1. In stateful operations, drop > if > > >> the > > >> > > > > serialized > > >> > > > > > >>> byte[]s are > > >> > > > > > >>> > > > > > > >>> the > > >> > > > > > >>> > > > > > > >>>>>>>>>> same. After deserializing, also > drop > > >> if the > > >> > > > > > >>> objects are equal > > >> > > > > > >>> > > > > > > >>>>>>>>>> according to Object#equals(). > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> 2. In stateless operations, > compare > > >> the "new" > > >> > > > > and > > >> > > > > > >>> "old" values > > >> > > > > > >>> > > > > > > >>>>>>>>>> (if "old" is available) based on > > >> > > > > Object#equals(). > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> 3. As a final optimization, after > > >> serializing > > >> > > > > and > > >> > > > > > >>> before > > >> > > > > > >>> > > > > > > >>> sending > > >> > > > > > >>> > > > > > > >>>>>>>>>> repartition records, compare the > > >> serialized > > >> > > > > data > > >> > > > > > >>> and drop > > >> > > > > > >>> > > > > > > >>>>>>>>>> no-ops. > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> This way, we still don't have to > > >> rely on the > > >> > > > > > >>> existence of an > > >> > > > > > >>> > > > > > > >>>>>>>>>> equals() method, but if it is > there, > > >> we can > > >> > > > > > >>> benefit from it. > > >> > > > > > >>> > > > > > > >>>>>>>>>> Also, we don't require a serde in > > >> any new > > >> > > > > > >>> situations, but we > > >> > > > > > >>> > > > > > > >>> can > > >> > > > > > >>> > > > > > > >>>>>>>>>> still leverage it when it is > > >> available. > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> For clarity, in my example above, > > >> even if the > > >> > > > > > >>> employeeInfo and > > >> > > > > > >>> > > > > > > >>>>>>>>>> employeePayroll and Result > records > > >> all have > > >> > > > > > >>> serdes, we need > > >> > > > > > >>> > > > > > the > > >> > > > > > >>> > > > > > > >>>>>>>>>> "name" field (presumably String) > and > > >> the > > >> > > > > "salary" > > >> > > > > > >>> field > > >> > > > > > >>> > > > > > > >>>>>>>>>> (presumable a Double) to have > serdes > > >> as well > > >> > > > > in > > >> > > > > > >>> the naive > > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation. But if we can > > >> leverage > > >> > > > > equals(), > > >> > > > > > >>> then the > > >> > > > > > >>> > > > > > > >>> "right > > >> > > > > > >>> > > > > > > >>>>>>>>>> thing" happens automatically. > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> I still don't totally follow why > the > > >> individual > > >> > > > > > >>> components > > >> > > > > > >>> > > > > > > >>> (name, > > >> > > > > > >>> > > > > > > >>>>>>>>> salary) would have to have serdes > > >> here. If > > >> > > > > Result > > >> > > > > > >>> has one, we > > >> > > > > > >>> > > > > > > >>>>>>>>> compare bytes, and if Result > > >> additionally has > > >> > > > > an > > >> > > > > > >>> equals() > > >> > > > > > >>> > > > > > method > > >> > > > > > >>> > > > > > > >>>>>>>>> (which presumably includes equals > > >> comparisons > > >> > > > > on > > >> > > > > > >>> the > > >> > > > > > >>> > > > > > constituent > > >> > > > > > >>> > > > > > > >>>>>>>>> fields), have we not covered our > > >> bases? > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> This dovetails in with my > primary UX > > >> concern; > > >> > > > > > >>> where would the > > >> > > > > > >>> > > > > > > >>>>>>>>>> ChangeDetector actually be > > >> registered? None of > > >> > > > > > >>> the operators > > >> > > > > > >>> > > > > > in > > >> > > > > > >>> > > > > > > >>>>>>>>>> my example have names or topics > or > > >> any other > > >> > > > > > >>> identifiable > > >> > > > > > >>> > > > > > > >>>>>>>>>> characteristic that could be > passed > > >> to a > > >> > > > > > >>> ChangeDetector class > > >> > > > > > >>> > > > > > > >>>>>>>>>> registered via config. You could > say > > >> that we > > >> > > > > make > > >> > > > > > >>> > > > > > > >>> ChangeDetector > > >> > > > > > >>> > > > > > > >>>>>>>>>> an optional parameter to every > > >> operation in > > >> > > > > > >>> Streams, but this > > >> > > > > > >>> > > > > > > >>>>>>>>>> seems to carry quite a bit of > mental > > >> burden > > >> > > > > with > > >> > > > > > >>> it. People > > >> > > > > > >>> > > > > > > >>> will > > >> > > > > > >>> > > > > > > >>>>>>>>>> wonder what it's for and whether > or > > >> not they > > >> > > > > > >>> should be using > > >> > > > > > >>> > > > > > > >>> it. > > >> > > > > > >>> > > > > > > >>>>>>>>>> There would almost certainly be a > > >> > > > > misconception > > >> > > > > > >>> that it's > > >> > > > > > >>> > > > > > > >>>>>>>>>> preferable to implement it > always, > > >> which > > >> > > > > would be > > >> > > > > > >>> unfortunate. > > >> > > > > > >>> > > > > > > >>>>>>>>>> Plus, to actually implment > metadata > > >> flowing > > >> > > > > > >>> through the > > >> > > > > > >>> > > > > > > >>> topology > > >> > > > > > >>> > > > > > > >>>>>>>>>> as in your use case, you'd have > to > > >> do two > > >> > > > > things: > > >> > > > > > >>> 1. make sure > > >> > > > > > >>> > > > > > > >>>>>>>>>> that all operations actually > > >> preserve the > > >> > > > > > >>> metadata alongside > > >> > > > > > >>> > > > > > > >>> the > > >> > > > > > >>> > > > > > > >>>>>>>>>> data (e.g., don't accidentally > add a > > >> mapValues > > >> > > > > > >>> like I did, or > > >> > > > > > >>> > > > > > > >>> you > > >> > > > > > >>> > > > > > > >>>>>>>>>> drop the metadata). 2. implement > a > > >> > > > > ChangeDetector > > >> > > > > > >>> for every > > >> > > > > > >>> > > > > > > >>>>>>>>>> single operation in the > topology, or > > >> you don't > > >> > > > > > >>> get the benefit > > >> > > > > > >>> > > > > > > >>> of > > >> > > > > > >>> > > > > > > >>>>>>>>>> dropping non-changes internally > 2b. > > >> > > > > > >>> Alternatively, you could > > >> > > > > > >>> > > > > > > >>> just > > >> > > > > > >>> > > > > > > >>>>>>>>>> add the ChangeDetector to one > > >> operation toward > > >> > > > > > >>> the end of the > > >> > > > > > >>> > > > > > > >>>>>>>>>> topology. This would not drop > > >> redundant > > >> > > > > > >>> computation > > >> > > > > > >>> > > > > > internally, > > >> > > > > > >>> > > > > > > >>>>>>>>>> but only drop redundant > _outputs_. > > >> But this is > > >> > > > > > >>> just about the > > >> > > > > > >>> > > > > > > >>>>>>>>>> same as your current solution. > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> I definitely see your point > regarding > > >> > > > > > >>> configuration. I was > > >> > > > > > >>> > > > > > > >>>>>>>>> originally thinking about this > when > > >> the > > >> > > > > > >>> deduplication was going > > >> > > > > > >>> > > > > > > >>> to > > >> > > > > > >>> > > > > > > >>>>>>>>> be opt-in, and it seemed very > natural > > >> to say > > >> > > > > > >>> something like: > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> employeeInfo.join(employeePayroll, > > >> (info, > > >> > > > > payroll) > > >> > > > > > >>> -> new > > >> > > > > > >>> > > > > > > >>>>>>>>> Result(info.name(), > > >> payroll.salary())) > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> .suppress(duplicatesAccordingTo(someChangeDetector)) > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> Alternatively you can imagine a > > >> similar method > > >> > > > > > >>> being on > > >> > > > > > >>> > > > > > > >>>>>>>>> Materialized, though obviously > this > > >> makes less > > >> > > > > > >>> sense if we > > >> > > > > > >>> > > > > > don't > > >> > > > > > >>> > > > > > > >>>>>>>>> want to require materialization. > If > > >> we're now > > >> > > > > > >>> talking about > > >> > > > > > >>> > > > > > > >>>>>>>>> changing the default behavior and > not > > >> having > > >> > > > > any > > >> > > > > > >>> configuration > > >> > > > > > >>> > > > > > > >>>>>>>>> options, it's harder to find a > place > > >> for this. > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> A final thought; if it really is > a > > >> metadata > > >> > > > > > >>> question, can we > > >> > > > > > >>> > > > > > > >>> just > > >> > > > > > >>> > > > > > > >>>>>>>>>> plan to finish up the support for > > >> headers in > > >> > > > > > >>> Streams? I.e., > > >> > > > > > >>> > > > > > > >>> give > > >> > > > > > >>> > > > > > > >>>>>>>>>> you a way to control the way that > > >> headers flow > > >> > > > > > >>> through the > > >> > > > > > >>> > > > > > > >>>>>>>>>> topology? Then, we could treat > > >> headers the > > >> > > > > same > > >> > > > > > >>> way we treat > > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamps in the no-op > checking... > > >> We > > >> > > > > completely > > >> > > > > > >>> ignore them > > >> > > > > > >>> > > > > > > >>>>>>>>>> for the sake of comparison. Thus, > > >> neither the > > >> > > > > > >>> timestamp nor > > >> > > > > > >>> > > > > > the > > >> > > > > > >>> > > > > > > >>>>>>>>>> headers would get updated in > > >> internal state > > >> > > > > or in > > >> > > > > > >>> downstream > > >> > > > > > >>> > > > > > > >>>>>>>>>> views as long as the value itself > > >> doesn't > > >> > > > > change. > > >> > > > > > >>> This seems > > >> > > > > > >>> > > > > > to > > >> > > > > > >>> > > > > > > >>>>>>>>>> give us a way to support your use > > >> case without > > >> > > > > > >>> adding to the > > >> > > > > > >>> > > > > > > >>>>>>>>>> mental overhead of using Streams > for > > >> simple > > >> > > > > > >>> things. > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> Agree headers could be a decent > fit > > >> for this > > >> > > > > > >>> particular case > > >> > > > > > >>> > > > > > > >>>>>>>>> because it's mostly metadata, > though > > >> to be > > >> > > > > honest > > >> > > > > > >>> we haven't > > >> > > > > > >>> > > > > > > >>> looked > > >> > > > > > >>> > > > > > > >>>>>>>>> at headers much (mostly because, > and > > >> to your > > >> > > > > > >>> point, support > > >> > > > > > >>> > > > > > > >>> seems > > >> > > > > > >>> > > > > > > >>>>>>>>> to be lacking). I feel like there > > >> would be > > >> > > > > other > > >> > > > > > >>> cases where > > >> > > > > > >>> > > > > > > >>> this > > >> > > > > > >>> > > > > > > >>>>>>>>> feature could be valuable, but I > > >> admit I can't > > >> > > > > > >>> come up with > > >> > > > > > >>> > > > > > > >>>>>>>>> anything right this second. > Perhaps > > >> yuzhihong > > >> > > > > had > > >> > > > > > >>> an example in > > >> > > > > > >>> > > > > > > >>>>>>>>> mind? > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., simple things should be > easy, > > >> and > > >> > > > > complex > > >> > > > > > >>> things should > > >> > > > > > >>> > > > > > > >>> be > > >> > > > > > >>> > > > > > > >>>>>>>>>> possible. > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> What are your thoughts? Thanks, > -John > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19, > > >> Thomas Becker > > >> > > > > > >>> wrote: > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi John, Can you describe how > you'd > > >> use > > >> > > > > > >>> filtering/mapping to > > >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate records? To give some > > >> background > > >> > > > > on > > >> > > > > > >>> my suggestion > > >> > > > > > >>> > > > > > > >>> we > > >> > > > > > >>> > > > > > > >>>>>>>>>> currently have a small stream > > >> processor that > > >> > > > > > >>> exists solely to > > >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate, which we do using a > > >> process that > > >> > > > > I > > >> > > > > > >>> assume would > > >> > > > > > >>> > > > > > be > > >> > > > > > >>> > > > > > > >>>>>>>>>> similar to what would be done > here > > >> (with a > > >> > > > > store > > >> > > > > > >>> of keys and > > >> > > > > > >>> > > > > > > >>> hash > > >> > > > > > >>> > > > > > > >>>>>>>>>> values). But the records we are > > >> deduplicating > > >> > > > > > >>> have some > > >> > > > > > >>> > > > > > > >>> metadata > > >> > > > > > >>> > > > > > > >>>>>>>>>> fields (such as timestamps of > when > > >> the record > > >> > > > > was > > >> > > > > > >>> posted) that > > >> > > > > > >>> > > > > > > >>> we > > >> > > > > > >>> > > > > > > >>>>>>>>>> don't consider semantically > > >> meaningful for > > >> > > > > > >>> downstream > > >> > > > > > >>> > > > > > > >>> consumers, > > >> > > > > > >>> > > > > > > >>>>>>>>>> and therefore we also suppress > > >> updates that > > >> > > > > only > > >> > > > > > >>> touch those > > >> > > > > > >>> > > > > > > >>>>>>>>>> fields. > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> -Tommy > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30 > -0600, > > >> John > > >> > > > > Roesler > > >> > > > > > >>> wrote: > > >> > > > > > >>> > > > > > > >>> [EXTERNAL > > >> > > > > > >>> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was > > >> sent from > > >> > > > > > >>> outside TiVo. DO > > >> > > > > > >>> > > > > > NOT > > >> > > > > > >>> > > > > > > >>>>>>>>>> CLICK any links or attachments > > >> unless you > > >> > > > > > >>> expected them. > > >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________ > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s > an > > >> interesting > > >> > > > > > >>> idea. Can you > > >> > > > > > >>> > > > > > > >>> help > > >> > > > > > >>> > > > > > > >>>>>>>>>> think of a use case that isn’t > also > > >> served by > > >> > > > > > >>> filtering or > > >> > > > > > >>> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for > > >> helping to > > >> > > > > design > > >> > > > > > >>> this feature! > > >> > > > > > >>> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at > 18:56, > > >> > > > > > >>> yuzhih...@gmail.com > > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com> > wrote: > > >> I think > > >> > > > > this > > >> > > > > > >>> is good > > >> > > > > > >>> > > > > > > >>> idea. On > > >> > > > > > >>> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas > > >> Becker < > > >> > > > > > >>> > > > > > > >>> thomas.bec...@tivo.com > > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com>> > > >> wrote: How > > >> > > > > do > > >> > > > > > >>> folks feel > > >> > > > > > >>> > > > > > > >>> about > > >> > > > > > >>> > > > > > > >>>>>>>>>> allowing the mechanism by which > > >> no-ops are > > >> > > > > > >>> detected to be > > >> > > > > > >>> > > > > > > >>>>>>>>>> pluggable? Meaning use something > > >> like a hash > > >> > > > > by > > >> > > > > > >>> default, but > > >> > > > > > >>> > > > > > > >>> you > > >> > > > > > >>> > > > > > > >>>>>>>>>> could optionally provide an > > >> implementation of > > >> > > > > > >>> something to use > > >> > > > > > >>> > > > > > > >>>>>>>>>> instead, like a ChangeDetector. > This > > >> could be > > >> > > > > > >>> useful for > > >> > > > > > >>> > > > > > > >>> example > > >> > > > > > >>> > > > > > > >>>>>>>>>> to ignore changes to certain > fields, > > >> which may > > >> > > > > > >>> not be relevant > > >> > > > > > >>> > > > > > > >>> to > > >> > > > > > >>> > > > > > > >>>>>>>>>> the operation being performed. > > >> > > > > > >>> > > > > > ________________________________ > > >> > > > > > >>> > > > > > > >>>>>>>>>> From: John Roesler < > > >> vvcep...@apache.org > > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>> > Sent: > > >> Friday, > > >> > > > > > >>> January 31, 2020 > > >> > > > > > >>> > > > > > > >>> 4:51 > > >> > > > > > >>> > > > > > > >>>>>>>>>> PM To: dev@kafka.apache.org > <mailto: > > >> > > > > > >>> dev@kafka.apache.org> > > >> > > > > > >>> > > > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto: > > >> > > > > > >>> dev@kafka.apache.org>> Subject: > > >> > > > > > >>> > > > > > > >>> Re: > > >> > > > > > >>> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change > > >> support for > > >> > > > > Kafka > > >> > > > > > >>> Streams > > >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This > > >> email was > > >> > > > > sent > > >> > > > > > >>> from outside > > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or > > >> attachments > > >> > > > > > >>> unless you > > >> > > > > > >>> > > > > > expected > > >> > > > > > >>> > > > > > > >>>>>>>>>> them. > > >> ________________________________ > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> Hello all, Sorry for my silence. > It > > >> seems > > >> > > > > like we > > >> > > > > > >>> are getting > > >> > > > > > >>> > > > > > > >>>>>>>>>> close to consensus. Hopefully, we > > >> could move > > >> > > > > to a > > >> > > > > > >>> vote soon! > > >> > > > > > >>> > > > > > > >>> All > > >> > > > > > >>> > > > > > > >>>>>>>>>> of the reasoning from Matthias > and > > >> Bruno > > >> > > > > around > > >> > > > > > >>> timestamp is > > >> > > > > > >>> > > > > > > >>>>>>>>>> compelling. I would be strongly > in > > >> favor of > > >> > > > > > >>> stating a few > > >> > > > > > >>> > > > > > > >>> things > > >> > > > > > >>> > > > > > > >>>>>>>>>> very clearly in the KIP: 1. > Streams > > >> will drop > > >> > > > > > >>> no-op updates > > >> > > > > > >>> > > > > > > >>> only > > >> > > > > > >>> > > > > > > >>>>>>>>>> for KTable operations. That is, > we > > >> won't make > > >> > > > > any > > >> > > > > > >>> changes to > > >> > > > > > >>> > > > > > > >>>>>>>>>> KStream aggregations at the > moment. > > >> It does > > >> > > > > seem > > >> > > > > > >>> like we can > > >> > > > > > >>> > > > > > > >>>>>>>>>> potentially revisit the time > > >> semantics of that > > >> > > > > > >>> operation in > > >> > > > > > >>> > > > > > the > > >> > > > > > >>> > > > > > > >>>>>>>>>> future, but we don't need to do > it > > >> now. On the > > >> > > > > > >>> other hand, the > > >> > > > > > >>> > > > > > > >>>>>>>>>> proposed semantics for KTable > > >> timestamps > > >> > > > > (marking > > >> > > > > > >>> the > > >> > > > > > >>> > > > > > beginning > > >> > > > > > >>> > > > > > > >>>>>>>>>> of the validity of that record) > > >> makes sense to > > >> > > > > > >>> me. 2. Streams > > >> > > > > > >>> > > > > > > >>>>>>>>>> will only drop no-op updates for > > >> _stateful_ > > >> > > > > > >>> KTable operations. > > >> > > > > > >>> > > > > > > >>> We > > >> > > > > > >>> > > > > > > >>>>>>>>>> don't want to add a hard > guarantee > > >> that > > >> > > > > Streams > > >> > > > > > >>> will _never_ > > >> > > > > > >>> > > > > > > >>>>>>>>>> emit a no-op table update > because it > > >> would > > >> > > > > > >>> require adding > > >> > > > > > >>> > > > > > state > > >> > > > > > >>> > > > > > > >>>>>>>>>> to otherwise stateless > operations. > > >> If someone > > >> > > > > is > > >> > > > > > >>> really > > >> > > > > > >>> > > > > > > >>> concerned > > >> > > > > > >>> > > > > > > >>>>>>>>>> about a particular stateless > > >> operation > > >> > > > > producing > > >> > > > > > >>> a lot of > > >> > > > > > >>> > > > > > no-op > > >> > > > > > >>> > > > > > > >>>>>>>>>> results, all they have to do is > > >> materialize > > >> > > > > it, > > >> > > > > > >>> and Streams > > >> > > > > > >>> > > > > > > >>> would > > >> > > > > > >>> > > > > > > >>>>>>>>>> automatically drop the no-ops. > > >> Additionally, > > >> > > > > I'm > > >> > > > > > >>> +1 on not > > >> > > > > > >>> > > > > > > >>> adding > > >> > > > > > >>> > > > > > > >>>>>>>>>> an opt-out at this time. > Regarding > > >> the KIP > > >> > > > > > >>> itself, I would > > >> > > > > > >>> > > > > > > >>> clean > > >> > > > > > >>> > > > > > > >>>>>>>>>> it up a bit before calling for a > > >> vote. There > > >> > > > > is a > > >> > > > > > >>> lot of > > >> > > > > > >>> > > > > > > >>>>>>>>>> "discussion"-type language there, > > >> which is > > >> > > > > very > > >> > > > > > >>> natural to > > >> > > > > > >>> > > > > > > >>> read, > > >> > > > > > >>> > > > > > > >>>>>>>>>> but makes it a bit hard to see > what > > >> _exactly_ > > >> > > > > the > > >> > > > > > >>> kip is > > >> > > > > > >>> > > > > > > >>>>>>>>>> proposing. Richard, would you > mind > > >> just making > > >> > > > > > >>> the "proposed > > >> > > > > > >>> > > > > > > >>>>>>>>>> behavior change" a simple and > > >> succinct list of > > >> > > > > > >>> bullet points? > > >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., please drop glue phrases > like > > >> "there has > > >> > > > > > >>> been some > > >> > > > > > >>> > > > > > > >>>>>>>>>> discussion" or "possibly we > could do > > >> X". For > > >> > > > > the > > >> > > > > > >>> final version > > >> > > > > > >>> > > > > > > >>> of > > >> > > > > > >>> > > > > > > >>>>>>>>>> the KIP, it should just say, > > >> "Streams will do > > >> > > > > X, > > >> > > > > > >>> Streams will > > >> > > > > > >>> > > > > > > >>> do > > >> > > > > > >>> > > > > > > >>>>>>>>>> Y". Feel free to add an > elaboration > > >> section to > > >> > > > > > >>> explain more > > >> > > > > > >>> > > > > > > >>> about > > >> > > > > > >>> > > > > > > >>>>>>>>>> what X and Y mean, but we don't > need > > >> to talk > > >> > > > > about > > >> > > > > > >>> > > > > > > >>> possibilities > > >> > > > > > >>> > > > > > > >>>>>>>>>> or alternatives except in the > > >> "rejected > > >> > > > > > >>> alternatives" section. > > >> > > > > > >>> > > > > > > >>>>>>>>>> Accordingly, can you also move > the > > >> options you > > >> > > > > > >>> presented in > > >> > > > > > >>> > > > > > the > > >> > > > > > >>> > > > > > > >>>>>>>>>> intro to the "rejected > alternatives" > > >> section > > >> > > > > and > > >> > > > > > >>> only mention > > >> > > > > > >>> > > > > > > >>> the > > >> > > > > > >>> > > > > > > >>>>>>>>>> final proposal itself? This just > > >> really helps > > >> > > > > > >>> reviewers to > > >> > > > > > >>> > > > > > know > > >> > > > > > >>> > > > > > > >>>>>>>>>> what they are voting for, and it > > >> helps > > >> > > > > everyone > > >> > > > > > >>> after the fact > > >> > > > > > >>> > > > > > > >>>>>>>>>> when they are trying to get > clarity > > >> on what > > >> > > > > > >>> exactly the > > >> > > > > > >>> > > > > > > >>> proposal > > >> > > > > > >>> > > > > > > >>>>>>>>>> is, versus all the things it > could > > >> have been. > > >> > > > > > >>> Thanks, -John > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14, > > >> Richard Yu > > >> > > > > wrote: > > >> > > > > > >>> Hello to > > >> > > > > > >>> > > > > > all, > > >> > > > > > >>> > > > > > > >>>>>>>>>> I've finished making some initial > > >> > > > > modifications > > >> > > > > > >>> to the KIP. I > > >> > > > > > >>> > > > > > > >>>>>>>>>> have decided to keep the > > >> implementation > > >> > > > > section > > >> > > > > > >>> in the KIP for > > >> > > > > > >>> > > > > > > >>>>>>>>>> record-keeping purposes. For > now, we > > >> should > > >> > > > > focus > > >> > > > > > >>> on only the > > >> > > > > > >>> > > > > > > >>>>>>>>>> proposed behavior changes > instead. > > >> See if you > > >> > > > > > >>> have any > > >> > > > > > >>> > > > > > > >>> comments! > > >> > > > > > >>> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25, > 2020 > > >> at 11:12 > > >> > > > > AM > > >> > > > > > >>> Richard Yu > > >> > > > > > >>> > > > > > > >>>>>>>>>> <yohan.richard...@gmail.com > <mailto: > > >> > > > > > >>> > > > > > yohan.richard...@gmail.com > > >> > > > > > >>> > > > > > > >>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all the > > >> discussion! > > >> > > > > > >>> @John and @Bruno > > >> > > > > > >>> > > > > > > >>> I > > >> > > > > > >>> > > > > > > >>>>>>>>>> will survey other possible > systems > > >> and see > > >> > > > > what I > > >> > > > > > >>> can do. Just > > >> > > > > > >>> > > > > > > >>> a > > >> > > > > > >>> > > > > > > >>>>>>>>>> question, by systems, I suppose > you > > >> would mean > > >> > > > > > >>> the pros and > > >> > > > > > >>> > > > > > > >>> cons > > >> > > > > > >>> > > > > > > >>>>>>>>>> of different reporting > strategies? > > >> I'm not > > >> > > > > > >>> completely certain > > >> > > > > > >>> > > > > > > >>> on > > >> > > > > > >>> > > > > > > >>>>>>>>>> this point, so it would be great > if > > >> you can > > >> > > > > > >>> clarify on that. > > >> > > > > > >>> > > > > > So > > >> > > > > > >>> > > > > > > >>>>>>>>>> here's what I got from all the > > >> discussion so > > >> > > > > far: > > >> > > > > > >>> - Since both > > >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias and John seems to have > come > > >> to a > > >> > > > > > >>> consensus on this, > > >> > > > > > >>> > > > > > > >>> then > > >> > > > > > >>> > > > > > > >>>>>>>>>> we will go for an all-round > > >> behavorial change > > >> > > > > for > > >> > > > > > >>> KTables. > > >> > > > > > >>> > > > > > > >>> After > > >> > > > > > >>> > > > > > > >>>>>>>>>> some thought, I decided that for > > >> now, an > > >> > > > > opt-out > > >> > > > > > >>> config will > > >> > > > > > >>> > > > > > > >>> not > > >> > > > > > >>> > > > > > > >>>>>>>>>> be added. As John have pointed > out, > > >> no-op > > >> > > > > changes > > >> > > > > > >>> tend to > > >> > > > > > >>> > > > > > > >>>>>>>>>> explode further down the > topology as > > >> they are > > >> > > > > > >>> forwarded to > > >> > > > > > >>> > > > > > more > > >> > > > > > >>> > > > > > > >>>>>>>>>> and more processor nodes > downstream. > > >> - About > > >> > > > > > >>> using hash codes, > > >> > > > > > >>> > > > > > > >>>>>>>>>> after some explanation from > John, it > > >> looks > > >> > > > > like > > >> > > > > > >>> hash codes > > >> > > > > > >>> > > > > > > >>> might > > >> > > > > > >>> > > > > > > >>>>>>>>>> not be as ideal (for > > >> implementation). For > > >> > > > > now, we > > >> > > > > > >>> will omit > > >> > > > > > >>> > > > > > > >>> that > > >> > > > > > >>> > > > > > > >>>>>>>>>> detail, and save it for the PR. - > > >> @Bruno You > > >> > > > > do > > >> > > > > > >>> have valid > > >> > > > > > >>> > > > > > > >>>>>>>>>> concerns. Though, I am not > > >> completely certain > > >> > > > > if > > >> > > > > > >>> we want to do > > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change only for > materialized > > >> KTables. > > >> > > > > I > > >> > > > > > >>> will put it > > >> > > > > > >>> > > > > > > >>> down > > >> > > > > > >>> > > > > > > >>>>>>>>>> in the KIP regardless. I will do > my > > >> best to > > >> > > > > > >>> address all points > > >> > > > > > >>> > > > > > > >>>>>>>>>> raised so far on the discussion. > > >> Hope we could > > >> > > > > > >>> keep this > > >> > > > > > >>> > > > > > going! > > >> > > > > > >>> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24, > 2020 > > >> at 6:07 PM > > >> > > > > > >>> Bruno Cadonna > > >> > > > > > >>> > > > > > > >>>>>>>>>> <br...@confluent.io <mailto: > > >> > > > > br...@confluent.io>> > > >> > > > > > >>> wrote: Thank > > >> > > > > > >>> > > > > > > >>> you > > >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias for the use cases! > Looking > > >> at both > > >> > > > > use > > >> > > > > > >>> cases, I think > > >> > > > > > >>> > > > > > > >>>>>>>>>> you need to elaborate on them in > the > > >> KIP, > > >> > > > > > >>> Richard. Emit from > > >> > > > > > >>> > > > > > > >>>>>>>>>> plain KTable: I agree with > Matthias > > >> that the > > >> > > > > > >>> lower timestamp > > >> > > > > > >>> > > > > > > >>>>>>>>>> makes sense because it marks the > > >> start of the > > >> > > > > > >>> validity of the > > >> > > > > > >>> > > > > > > >>>>>>>>>> record. Idempotent records with a > > >> higher > > >> > > > > > >>> timestamp can be > > >> > > > > > >>> > > > > > > >>> safely > > >> > > > > > >>> > > > > > > >>>>>>>>>> ignored. A corner case that I > > >> discussed with > > >> > > > > > >>> Matthias offline > > >> > > > > > >>> > > > > > > >>> is > > >> > > > > > >>> > > > > > > >>>>>>>>>> when we do not materialize a > KTable > > >> due to > > >> > > > > > >>> optimization. Then > > >> > > > > > >>> > > > > > > >>> we > > >> > > > > > >>> > > > > > > >>>>>>>>>> cannot avoid the idempotent > records > > >> because > > >> > > > > we do > > >> > > > > > >>> not keep the > > >> > > > > > >>> > > > > > > >>>>>>>>>> first record with the lower > > >> timestamp to > > >> > > > > compare > > >> > > > > > >>> to. Emit from > > >> > > > > > >>> > > > > > > >>>>>>>>>> KTable with aggregations: If we > > >> specify that > > >> > > > > an > > >> > > > > > >>> aggregation > > >> > > > > > >>> > > > > > > >>>>>>>>>> result should have the highest > > >> timestamp of > > >> > > > > the > > >> > > > > > >>> records that > > >> > > > > > >>> > > > > > > >>>>>>>>>> participated in the aggregation, > we > > >> cannot > > >> > > > > ignore > > >> > > > > > >>> any > > >> > > > > > >>> > > > > > > >>> idempotent > > >> > > > > > >>> > > > > > > >>>>>>>>>> records. Admittedly, the result > of an > > >> > > > > aggregation > > >> > > > > > >>> usually > > >> > > > > > >>> > > > > > > >>>>>>>>>> changes, but there are > aggregations > > >> where the > > >> > > > > > >>> result may not > > >> > > > > > >>> > > > > > > >>>>>>>>>> change like min and max, or sum > when > > >> the > > >> > > > > incoming > > >> > > > > > >>> records have > > >> > > > > > >>> > > > > > > >>> a > > >> > > > > > >>> > > > > > > >>>>>>>>>> value of zero. In those cases, we > > >> could > > >> > > > > benefit > > >> > > > > > >>> of the emit on > > >> > > > > > >>> > > > > > > >>>>>>>>>> change, but only if we define the > > >> semantics > > >> > > > > of the > > >> > > > > > >>> > > > > > aggregations > > >> > > > > > >>> > > > > > > >>>>>>>>>> to not use the highest timestamp > of > > >> the > > >> > > > > > >>> participating records > > >> > > > > > >>> > > > > > > >>> for > > >> > > > > > >>> > > > > > > >>>>>>>>>> the result. In Kafka Streams, we > do > > >> not have > > >> > > > > min, > > >> > > > > > >>> max, and sum > > >> > > > > > >>> > > > > > > >>> as > > >> > > > > > >>> > > > > > > >>>>>>>>>> explicit aggregations, but we > need > > >> to provide > > >> > > > > an > > >> > > > > > >>> API to define > > >> > > > > > >>> > > > > > > >>>>>>>>>> what timestamp should be used for > > >> the result > > >> > > > > of > > >> > > > > > >>> an aggregation > > >> > > > > > >>> > > > > > > >>> if > > >> > > > > > >>> > > > > > > >>>>>>>>>> we want to go down this path. > All of > > >> this does > > >> > > > > > >>> not block this > > >> > > > > > >>> > > > > > > >>> KIP > > >> > > > > > >>> > > > > > > >>>>>>>>>> and I just wanted to put this > > >> aspects up for > > >> > > > > > >>> discussion. The > > >> > > > > > >>> > > > > > > >>> KIP > > >> > > > > > >>> > > > > > > >>>>>>>>>> can limit itself to emit from > > >> materialized > > >> > > > > > >>> KTables. However, > > >> > > > > > >>> > > > > > > >>> the > > >> > > > > > >>> > > > > > > >>>>>>>>>> limits should be explicitly > stated > > >> in the KIP. > > >> > > > > > >>> Best, Bruno > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM > > >> Matthias J. > > >> > > > > Sax > > >> > > > > > >>> > > > > > > >>>>>>>>>> <matth...@confluent.io <mailto: > > >> > > > > > >>> matth...@confluent.io>> wrote: > > >> > > > > > >>> > > > > > > >>>>>>>>>> IMHO, the question about > semantics > > >> depends on > > >> > > > > the > > >> > > > > > >>> use case, in > > >> > > > > > >>> > > > > > > >>>>>>>>>> particular on the origin of a > > >> KTable. If > > >> > > > > there is > > >> > > > > > >>> a changlog > > >> > > > > > >>> > > > > > > >>>>>>>>>> topic that one reads directly > into a > > >> KTable, > > >> > > > > > >>> emit-on-change > > >> > > > > > >>> > > > > > > >>> does > > >> > > > > > >>> > > > > > > >>>>>>>>>> actually make sense, because the > > >> timestamp > > >> > > > > > >>> indicates _when_ > > >> > > > > > >>> > > > > > the > > >> > > > > > >>> > > > > > > >>>>>>>>>> update was _effective_. For this > > >> case, it is > > >> > > > > > >>> semantically > > >> > > > > > >>> > > > > > sound > > >> > > > > > >>> > > > > > > >>>>>>>>>> to _not_ update the timestamp in > the > > >> store, > > >> > > > > > >>> because the second > > >> > > > > > >>> > > > > > > >>>>>>>>>> update is actually idempotent and > > >> advancing > > >> > > > > the > > >> > > > > > >>> timestamp is > > >> > > > > > >>> > > > > > > >>> not > > >> > > > > > >>> > > > > > > >>>>>>>>>> ideal (one could even consider > it to > > >> be wrong > > >> > > > > to > > >> > > > > > >>> advance the > > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp) because the "valid > time" > > >> of the > > >> > > > > record > > >> > > > > > >>> pair did not > > >> > > > > > >>> > > > > > > >>>>>>>>>> change. This reasoning also > applies > > >> to > > >> > > > > > >>> KTable-KTable joins. > > >> > > > > > >>> > > > > > > >>>>>>>>>> However, if the KTable is the > result > > >> of an > > >> > > > > > >>> aggregation, I > > >> > > > > > >>> > > > > > think > > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-update is more natural, > > >> because the > > >> > > > > > >>> timestamp reflects > > >> > > > > > >>> > > > > > > >>>>>>>>>> the _last_ time (ie, highest > > >> timestamp) of all > > >> > > > > > >>> input records > > >> > > > > > >>> > > > > > > >>> the > > >> > > > > > >>> > > > > > > >>>>>>>>>> contributed to the result. Hence, > > >> updating the > > >> > > > > > >>> timestamp and > > >> > > > > > >>> > > > > > > >>>>>>>>>> emitting a new record actually > > >> sounds correct > > >> > > > > to > > >> > > > > > >>> me. This > > >> > > > > > >>> > > > > > > >>> applies > > >> > > > > > >>> > > > > > > >>>>>>>>>> to windowed and non-windowed > > >> aggregations > > >> > > > > IMHO. > > >> > > > > > >>> However, > > >> > > > > > >>> > > > > > > >>>>>>>>>> considering the argument that the > > >> timestamp > > >> > > > > > >>> should not be > > >> > > > > > >>> > > > > > > >>> update > > >> > > > > > >>> > > > > > > >>>>>>>>>> in the first case in the store to > > >> begin with, > > >> > > > > > >>> both cases are > > >> > > > > > >>> > > > > > > >>>>>>>>>> actually the same, and both can > be > > >> modeled as > > >> > > > > > >>> emit-on-change: > > >> > > > > > >>> > > > > > > >>> if > > >> > > > > > >>> > > > > > > >>>>>>>>>> a `table()` operator does not > update > > >> the > > >> > > > > > >>> timestamp if the > > >> > > > > > >>> > > > > > value > > >> > > > > > >>> > > > > > > >>>>>>>>>> does not change, there is _no_ > > >> change and thus > > >> > > > > > >>> nothing is > > >> > > > > > >>> > > > > > > >>>>>>>>>> emitted. At the same time, if an > > >> aggregation > > >> > > > > > >>> operator does > > >> > > > > > >>> > > > > > > >>> update > > >> > > > > > >>> > > > > > > >>>>>>>>>> the timestamp (even if the value > > >> does not > > >> > > > > change) > > >> > > > > > >>> there _is_ a > > >> > > > > > >>> > > > > > > >>>>>>>>>> change and we emit. Note that > > >> handling > > >> > > > > > >>> out-of-order data for > > >> > > > > > >>> > > > > > > >>>>>>>>>> aggregations would also work > > >> seamlessly with > > >> > > > > this > > >> > > > > > >>> approach -- > > >> > > > > > >>> > > > > > > >>> for > > >> > > > > > >>> > > > > > > >>>>>>>>>> out-of-order records, the > timestamp > > >> does never > > >> > > > > > >>> change, and > > >> > > > > > >>> > > > > > > >>> thus, > > >> > > > > > >>> > > > > > > >>>>>>>>>> we only emit if the result itself > > >> changes. > > >> > > > > > >>> Therefore, I would > > >> > > > > > >>> > > > > > > >>>>>>>>>> argue that we might not even need > > >> any config, > > >> > > > > > >>> because the > > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change behavior is just > > >> correct and > > >> > > > > > >>> reduced the > > >> > > > > > >>> > > > > > > >>>>>>>>>> downstream load, while our > current > > >> behavior is > > >> > > > > > >>> not ideal (even > > >> > > > > > >>> > > > > > > >>> if > > >> > > > > > >>> > > > > > > >>>>>>>>>> it's also correct). Thoughts? > > >> -Matthias On > > >> > > > > > >>> 1/24/20 9:37 AM, > > >> > > > > > >>> > > > > > > >>> John > > >> > > > > > >>> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks > for > > >> that > > >> > > > > idea. I > > >> > > > > > >>> hadn't > > >> > > > > > >>> > > > > > > >>>>>>>>>> considered that option before, > and > > >> it does > > >> > > > > seem > > >> > > > > > >>> like that > > >> > > > > > >>> > > > > > would > > >> > > > > > >>> > > > > > > >>>>>>>>>> be the right place to put it if > we > > >> think it > > >> > > > > might > > >> > > > > > >>> be > > >> > > > > > >>> > > > > > > >>> semantically > > >> > > > > > >>> > > > > > > >>>>>>>>>> important to control on a > > >> table-by-table > > >> > > > > basis. I > > >> > > > > > >>> had been > > >> > > > > > >>> > > > > > > >>>>>>>>>> thinking of it less semantically > and > > >> more > > >> > > > > > >>> practically. In the > > >> > > > > > >>> > > > > > > >>>>>>>>>> context of a large topology, or > more > > >> > > > > generally, a > > >> > > > > > >>> large > > >> > > > > > >>> > > > > > > >>> software > > >> > > > > > >>> > > > > > > >>>>>>>>>> system that contains many > topologies > > >> and other > > >> > > > > > >>> event-driven > > >> > > > > > >>> > > > > > > >>>>>>>>>> systems, each no-op result > becomes > > >> an input > > >> > > > > that > > >> > > > > > >>> is destined > > >> > > > > > >>> > > > > > to > > >> > > > > > >>> > > > > > > >>>>>>>>>> itself become a no-op result, > and so > > >> on, all > > >> > > > > the > > >> > > > > > >>> way through > > >> > > > > > >>> > > > > > > >>> the > > >> > > > > > >>> > > > > > > >>>>>>>>>> system. Thus, a single pointless > > >> processing > > >> > > > > > >>> result becomes > > >> > > > > > >>> > > > > > > >>>>>>>>>> amplified into a large number of > > >> pointless > > >> > > > > > >>> computations, cache > > >> > > > > > >>> > > > > > > >>>>>>>>>> perturbations, and network and > disk > > >> I/O > > >> > > > > > >>> operations. If you > > >> > > > > > >>> > > > > > also > > >> > > > > > >>> > > > > > > >>>>>>>>>> consider operations with fan-out > > >> implications, > > >> > > > > > >>> like branching > > >> > > > > > >>> > > > > > > >>> or > > >> > > > > > >>> > > > > > > >>>>>>>>>> foreign-key joins, the wasted > > >> resources are > > >> > > > > > >>> amplified not just > > >> > > > > > >>> > > > > > > >>> in > > >> > > > > > >>> > > > > > > >>>>>>>>>> proportion to the size of the > > >> system, but the > > >> > > > > > >>> size of the > > >> > > > > > >>> > > > > > > >>> system > > >> > > > > > >>> > > > > > > >>>>>>>>>> times the average fan-out (to the > > >> power of the > > >> > > > > > >>> number of > > >> > > > > > >>> > > > > > > >>> fan-out > > >> > > > > > >>> > > > > > > >>>>>>>>>> operations on the path(s) > through the > > >> > > > > system). In > > >> > > > > > >>> my time > > >> > > > > > >>> > > > > > > >>>>>>>>>> operating such systems, I've > > >> observed these > > >> > > > > > >>> effects to be very > > >> > > > > > >>> > > > > > > >>>>>>>>>> real, and actually, the system > and > > >> use case > > >> > > > > > >>> doesn't have to be > > >> > > > > > >>> > > > > > > >>>>>>>>>> very large before the > amplification > > >> poses an > > >> > > > > > >>> existential > > >> > > > > > >>> > > > > > threat > > >> > > > > > >>> > > > > > > >>>>>>>>>> to the system as a whole. This is > > >> the basis > > >> > > > > of my > > >> > > > > > >>> advocating > > >> > > > > > >>> > > > > > > >>> for > > >> > > > > > >>> > > > > > > >>>>>>>>>> a simple behavior change, rather > > >> than an > > >> > > > > opt-in > > >> > > > > > >>> config of any > > >> > > > > > >>> > > > > > > >>>>>>>>>> kind. It seems like Streams > should > > >> "do the > > >> > > > > right > > >> > > > > > >>> thing" for > > >> > > > > > >>> > > > > > the > > >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case. My theory > (which > > >> may be > > >> > > > > wrong) > > >> > > > > > >>> is that the > > >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case is more like > > >> "relational > > >> > > > > > >>> queries" than "CEP > > >> > > > > > >>> > > > > > > >>>>>>>>>> queries". Even if you were doing > some > > >> > > > > > >>> event-sensitive > > >> > > > > > >>> > > > > > > >>>>>>>>>> computation, wouldn't you do > them as > > >> Stream > > >> > > > > > >>> operations (where > > >> > > > > > >>> > > > > > > >>>>>>>>>> this feature is inapplicable > > >> anyway)? In > > >> > > > > keeping > > >> > > > > > >>> with the > > >> > > > > > >>> > > > > > > >>>>>>>>>> "practical" perspective, I > suggested > > >> the > > >> > > > > opt-out > > >> > > > > > >>> config only > > >> > > > > > >>> > > > > > in > > >> > > > > > >>> > > > > > > >>>>>>>>>> the (I think unlikely) event that > > >> filtering > > >> > > > > out > > >> > > > > > >>> pointless > > >> > > > > > >>> > > > > > > >>> updates > > >> > > > > > >>> > > > > > > >>>>>>>>>> actually harms performance. I'd > also > > >> be > > >> > > > > perfectly > > >> > > > > > >>> fine without > > >> > > > > > >>> > > > > > > >>>>>>>>>> the opt-out config. I really > think > > >> that > > >> > > > > (because > > >> > > > > > >>> of the > > >> > > > > > >>> > > > > > > >>> timestamp > > >> > > > > > >>> > > > > > > >>>>>>>>>> semantics work already underway), > > >> we're > > >> > > > > already > > >> > > > > > >>> pre-fetching > > >> > > > > > >>> > > > > > > >>> the > > >> > > > > > >>> > > > > > > >>>>>>>>>> prior result most of the time, so > > >> there would > > >> > > > > > >>> actually be very > > >> > > > > > >>> > > > > > > >>>>>>>>>> little extra I/O involved in > > >> implementing > > >> > > > > > >>> emit-on-change. > > >> > > > > > >>> > > > > > > >>>>>>>>>> However, we should consider > whether > > >> my > > >> > > > > experience > > >> > > > > > >>> is likely to > > >> > > > > > >>> > > > > > > >>>>>>>>>> be general. Do you have some use > > >> case in mind > > >> > > > > for > > >> > > > > > >>> which you'd > > >> > > > > > >>> > > > > > > >>>>>>>>>> actually want some KTable > results to > > >> be > > >> > > > > > >>> emit-on-update for > > >> > > > > > >>> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02, > > >> Bruno Cadonna > > >> > > > > > >>> wrote: Hi > > >> > > > > > >>> > > > > > > >>> Richard, > > >> > > > > > >>> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree > with > > >> John that > > >> > > > > we > > >> > > > > > >>> should focus > > >> > > > > > >>> > > > > > > >>> on > > >> > > > > > >>> > > > > > > >>>>>>>>>> the interface and behavior > change in > > >> a KIP. We > > >> > > > > > >>> can discuss the > > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation later. I am also > +1 > > >> for the > > >> > > > > > >>> survey. I had a > > >> > > > > > >>> > > > > > > >>>>>>>>>> thought about this. Couldn't we > > >> consider > > >> > > > > > >>> emit-on-change to be > > >> > > > > > >>> > > > > > > >>> one > > >> > > > > > >>> > > > > > > >>>>>>>>>> config of suppress (like > > >> `untilWindowCloses`)? > > >> > > > > > >>> What you > > >> > > > > > >>> > > > > > > >>>>>>>>>> basically propose is to suppress > > >> updates if > > >> > > > > they > > >> > > > > > >>> do not change > > >> > > > > > >>> > > > > > > >>>>>>>>>> the result. Considering emit on > > >> change as a > > >> > > > > > >>> flavour of > > >> > > > > > >>> > > > > > suppress > > >> > > > > > >>> > > > > > > >>>>>>>>>> would be more flexible because it > > >> would > > >> > > > > specify > > >> > > > > > >>> the behavior > > >> > > > > > >>> > > > > > > >>>>>>>>>> locally for a KTable instead of > > >> globally for > > >> > > > > all > > >> > > > > > >>> KTables. > > >> > > > > > >>> > > > > > > >>>>>>>>>> Additionally, specifying the > > >> behavior in one > > >> > > > > > >>> place instead of > > >> > > > > > >>> > > > > > > >>>>>>>>>> multiple places feels more > intuitive > > >> and > > >> > > > > > >>> consistent to me. > > >> > > > > > >>> > > > > > > >>> Best, > > >> > > > > > >>> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at > 7:49 > > >> AM John > > >> > > > > Roesler > > >> > > > > > >>> > > > > > > >>>>>>>>>> <vvcep...@apache.org <mailto: > > >> > > > > vvcep...@apache.org>> > > >> > > > > > >>> wrote: Hi > > >> > > > > > >>> > > > > > > >>>>>>>>>> Richard, Thanks for picking this > up! > > >> I know > > >> > > > > of at > > >> > > > > > >>> least one > > >> > > > > > >>> > > > > > > >>> large > > >> > > > > > >>> > > > > > > >>>>>>>>>> community member for which this > > >> feature is > > >> > > > > > >>> absolutely > > >> > > > > > >>> > > > > > > >>> essential. > > >> > > > > > >>> > > > > > > >>>>>>>>>> If I understand your two > options, it > > >> seems > > >> > > > > like > > >> > > > > > >>> the proposal > > >> > > > > > >>> > > > > > is > > >> > > > > > >>> > > > > > > >>>>>>>>>> to implement it as a behavior > change > > >> > > > > regardless, > > >> > > > > > >>> and the > > >> > > > > > >>> > > > > > > >>> question > > >> > > > > > >>> > > > > > > >>>>>>>>>> is whether to provide an opt-out > > >> config or > > >> > > > > not. > > >> > > > > > >>> Given that any > > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation of this feature > would > > >> have some > > >> > > > > > >>> performance > > >> > > > > > >>> > > > > > > >>> impact > > >> > > > > > >>> > > > > > > >>>>>>>>>> under some workloads, and also > that > > >> we don't > > >> > > > > know > > >> > > > > > >>> if anyone > > >> > > > > > >>> > > > > > > >>>>>>>>>> really depends on emit-on-update > time > > >> > > > > semantics, > > >> > > > > > >>> it seems like > > >> > > > > > >>> > > > > > > >>> we > > >> > > > > > >>> > > > > > > >>>>>>>>>> should propose to add an opt-out > > >> config. Can > > >> > > > > you > > >> > > > > > >>> update the > > >> > > > > > >>> > > > > > KIP > > >> > > > > > >>> > > > > > > >>>>>>>>>> to mention the exact config key > and > > >> value(s) > > >> > > > > > >>> you'd propose? > > >> > > > > > >>> > > > > > > >>> Just > > >> > > > > > >>> > > > > > > >>>>>>>>>> to move the discussion forward, > maybe > > >> > > > > something > > >> > > > > > >>> like: emit.on > > >> > > > > > >>> > > > > > > >>> := > > >> > > > > > >>> > > > > > > >>>>>>>>>> change|update with the new > default > > >> being > > >> > > > > "change" > > >> > > > > > >>> Thanks for > > >> > > > > > >>> > > > > > > >>>>>>>>>> pointing out the timestamp issue > in > > >> > > > > particular. I > > >> > > > > > >>> agree that > > >> > > > > > >>> > > > > > if > > >> > > > > > >>> > > > > > > >>>>>>>>>> we discard the latter update as a > > >> no-op, then > > >> > > > > we > > >> > > > > > >>> also have to > > >> > > > > > >>> > > > > > > >>>>>>>>>> discard its timestamp > (obviously, we > > >> don't > > >> > > > > > >>> forward the > > >> > > > > > >>> > > > > > > >>> timestamp > > >> > > > > > >>> > > > > > > >>>>>>>>>> update, as that's the whole > point, > > >> but we also > > >> > > > > > >>> can't update > > >> > > > > > >>> > > > > > the > > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp in the store, as the > store > > >> must > > >> > > > > remain > > >> > > > > > >>> consistent > > >> > > > > > >>> > > > > > > >>> with > > >> > > > > > >>> > > > > > > >>>>>>>>>> what has been emitted). I have to > > >> confess > > >> > > > > that I > > >> > > > > > >>> disagree with > > >> > > > > > >>> > > > > > > >>>>>>>>>> your implementation proposal, but > > >> it's also > > >> > > > > not > > >> > > > > > >>> necessary to > > >> > > > > > >>> > > > > > > >>>>>>>>>> discuss implementation in the > KIP. > > >> Maybe it > > >> > > > > would > > >> > > > > > >>> be less > > >> > > > > > >>> > > > > > > >>>>>>>>>> controversial if you just drop > that > > >> section > > >> > > > > for > > >> > > > > > >>> now, so that > > >> > > > > > >>> > > > > > > >>> the > > >> > > > > > >>> > > > > > > >>>>>>>>>> KIP discussion can focus on the > > >> behavior > > >> > > > > change > > >> > > > > > >>> and config. > > >> > > > > > >>> > > > > > > >>> Just > > >> > > > > > >>> > > > > > > >>>>>>>>>> for reference, there is some > > >> research into > > >> > > > > this > > >> > > > > > >>> domain. For > > >> > > > > > >>> > > > > > > >>>>>>>>>> example, see the "Report" section > > >> (3.2.3) of > > >> > > > > the > > >> > > > > > >>> SECRET paper: > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > >> > > > > > > >> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop > > >> > > > > > >>> > > > > > > > le.csail.mit.edu > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&data > > >> > > > > > >>> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com > > >> > > > > > >>> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7 > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > >>> > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > >> > > > > > > >> > Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&sdata > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > >> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&reserved=0 > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > > It might help to round out the proposal if > > >> you take a > > >> > > > > brief > > >> > > > > > >>> > > > > > > >>> survey of > > >> > > > > > >>> > > > > > > >>>>>>>>>> the behaviors of other systems, > > >> along with > > >> > > > > pros > > >> > > > > > >>> and cons if > > >> > > > > > >>> > > > > > any > > >> > > > > > >>> > > > > > > >>>>>>>>>> are reported. Thanks, -John > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27, > > >> Richard Yu > > >> > > > > wrote: > > >> > > > > > >>> Hi > > >> > > > > > >>> > > > > > everybody! > > >> > > > > > >>> > > > > > > >>>>>>>>>> I'd like to propose a change > that we > > >> probably > > >> > > > > > >>> should've added > > >> > > > > > >>> > > > > > > >>> for > > >> > > > > > >>> > > > > > > >>>>>>>>>> a long time now. The key benefit > of > > >> this KIP > > >> > > > > > >>> would be reduced > > >> > > > > > >>> > > > > > > >>>>>>>>>> traffic in Kafka Streams since a > lot > > >> of no-op > > >> > > > > > >>> results would no > > >> > > > > > >>> > > > > > > >>>>>>>>>> longer be sent downstream. Here > is > > >> the KIP for > > >> > > > > > >>> reference. > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > >> > > > > > > >> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi > > >> > > > > > >>> > > > > > > > ki.apache.org > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > >>> > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > >> > > > > > > >> > %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&data=02%7C01%7CThom > > >> > > > > > >>> > > > > > > > as.Becker%40tivo.com > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > >>> > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > >> > > > > > > >> > 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&sdata=zYpCSFOsyN4%2B > > >> > > > > > >>> > > > > > > > > > >> 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&reserved=0 > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > > Currently, I seek to formalize our > approach > > >> for this > > >> > > > > KIP > > >> > > > > > >>> first > > >> > > > > > >>> > > > > > > >>> before > > >> > > > > > >>> > > > > > > >>>>>>>>>> we determine concrete API > additions / > > >> > > > > > >>> configurations. Some > > >> > > > > > >>> > > > > > > >>>>>>>>>> configs might warrant adding, > whiles > > >> others > > >> > > > > are > > >> > > > > > >>> not necessary > > >> > > > > > >>> > > > > > > >>>>>>>>>> since adding them would only > increase > > >> > > > > complexity > > >> > > > > > >>> of Kafka > > >> > > > > > >>> > > > > > > >>>>>>>>>> Streams. Cheers, Richard > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________ > > >> This email > > >> > > > > and > > >> > > > > > >>> any > > >> > > > > > >>> > > > > > attachments > > >> > > > > > >>> > > > > > > >>>>>>>>>> may contain confidential and > > >> privileged > > >> > > > > material > > >> > > > > > >>> for the sole > > >> > > > > > >>> > > > > > > >>> use > > >> > > > > > >>> > > > > > > >>>>>>>>>> of the intended recipient. Any > > >> review, > > >> > > > > copying, or > > >> > > > > > >>> > > > > > distribution > > >> > > > > > >>> > > > > > > >>>>>>>>>> of this email (or any > attachments) > > >> by others > > >> > > > > is > > >> > > > > > >>> prohibited. If > > >> > > > > > >>> > > > > > > >>>>>>>>>> you are not the intended > recipient, > > >> please > > >> > > > > > >>> contact the sender > > >> > > > > > >>> > > > > > > >>>>>>>>>> immediately and permanently > delete > > >> this email > > >> > > > > and > > >> > > > > > >>> any > > >> > > > > > >>> > > > > > > >>>>>>>>>> attachments. No employee or > agent of > > >> TiVo is > > >> > > > > > >>> authorized to > > >> > > > > > >>> > > > > > > >>>>>>>>>> conclude any binding agreement on > > >> behalf of > > >> > > > > TiVo > > >> > > > > > >>> by email. > > >> > > > > > >>> > > > > > > >>>>>>>>>> Binding agreements with TiVo may > > >> only be made > > >> > > > > by > > >> > > > > > >>> a signed > > >> > > > > > >>> > > > > > > >>> written > > >> > > > > > >>> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker* > > >> *Principal > > >> > > > > Engineer * > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> *Personalized Content Discovery* > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* < > > >> > > > > > >>> http://www.tivo.com/> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>>> This email and any attachments > may > > >> contain > > >> > > > > > >>> confidential and > > >> > > > > > >>> > > > > > > >>>>>>>>>> privileged material for the sole > use > > >> of the > > >> > > > > > >>> intended > > >> > > > > > >>> > > > > > recipient. > > >> > > > > > >>> > > > > > > >>>>>>>>>> Any review, copying, or > distribution > > >> of this > > >> > > > > > >>> email (or any > > >> > > > > > >>> > > > > > > >>>>>>>>>> attachments) by others is > > >> prohibited. If you > > >> > > > > are > > >> > > > > > >>> not the > > >> > > > > > >>> > > > > > > >>> intended > > >> > > > > > >>> > > > > > > >>>>>>>>>> recipient, please contact the > sender > > >> > > > > immediately > > >> > > > > > >>> and > > >> > > > > > >>> > > > > > > >>> permanently > > >> > > > > > >>> > > > > > > >>>>>>>>>> delete this email and any > > >> attachments. No > > >> > > > > > >>> employee or agent of > > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo is authorized to conclude > any > > >> binding > > >> > > > > > >>> agreement on behalf > > >> > > > > > >>> > > > > > > >>> of > > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo by email. Binding agreements > > >> with TiVo > > >> > > > > may > > >> > > > > > >>> only be made > > >> > > > > > >>> > > > > > > >>> by a > > >> > > > > > >>> > > > > > > >>>>>>>>>> signed written agreement. > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> -- > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> *Tommy Becker* /Principal > Engineer / > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> /Personalized Content Discovery/ > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* < > > >> > > > > > >>> http://www.tivo.com/> > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>>>>> > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > >> > > > > > > >> ---------------------------------------------------------------------- > > >> > > > > > >>> > > > > > > >>>>>>> > > >> > > > > > >>> > > > > > > >>>>>> > > >> > > > > > >>> > > > > > > >>>>> > > >> > > > > > >>> > > > > > > >>>> > > >> > > > > > >>> > > > > > > >>> > > >> > > > > > >>> > > > > > > >> > > >> > > > > > >>> > > > > > > > > > >> > > > > > >>> > > > > > > > > >> > > > > > >>> > > > > > > > > >> > > > > > >>> > > > > > > Attachments: > > >> > > > > > >>> > > > > > > * signature.asc > > >> > > > > > >>> > > > > > > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > >> > > > > > >>> > > >> > > > > > >> > > >> > > > > > > > >> > > > > > > >> > > > > >> > > > > >> > > > > > >