Hi John,

I agree with you. It is better to measure the metric on processor node
level. The users can do the rollup to task-level by themselves.

Best,
Bruno

On Thu, Feb 27, 2020 at 12:09 AM John Roesler <vvcep...@apache.org> wrote:
>
> Hi Richard,
>
> I've been making a final pass over the KIP.
>
> Re: Proposed Behavior Change:
>
> I think this point is controversial and probably doesn't need to be there at 
> all:
> > 2.b. In certain situations where there is a high volume of idempotent
> > updates throughout the Streams DAG, it will be recommended practice
> > to materialize all operations to reduce traffic overall across the entire
> >  network of nodes.
>
> Re-reading all the points, it seems like we can sum them up in a way that's
> a little more straight to the point, and gives us the right amount of 
> flexibility:
>
> > Proposed Behavior Changes
> >
> > Definition: "idempotent update" is one in which the new result and prior
> > result,  when serialized, are identical byte arrays
> >
> > Note: an "update" is a concept that only applies to Table operations, so
> > the concept of an "idempotent update" also only applies to Table operations.
> > See 
> > https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
> > for more information.
> >
> > Given that definition, we propose for Streams to drop idempotent updates
> > in any situation where it's possible and convenient to do so. For example,
> > any time we already have both the prior and new results serialized, we
> > may compare them, and drop the update if it is idempotent.
> >
> > Note that under this proposal, we can implement idempotence checking
> > in the following situations:
> > 1. Any aggregation (for example, KGroupedStream, KGroupedTable,
> >     TimeWindowedKStream, and SessionWindowedKStream operations)
> > 2. Any Materialized KTable operation
> > 3. Repartition operations, when we need to send both prior and new results
>
> Notice that in my proposed wording, we neither limit ourselves to just the
> situations enumerated, nor promise to implement the optimization in every
> possible situation. IMHO, this is the best way to propose such a feature.
> That way, we have the flexibility to implement it in stages, and also to add
> on to the implementation in the future.
>
>
> Re: Metrics
>
> I agree with Bruno, although, I think it might just be a confusing statement.
> It might be clearer to drop all the "discussion", and just say: "We will add a
> metric to count the number of idempotent updates that we have dropped".
>
> Also, with respect to the metric, I'm wondering if the metric should be task-
> level or processor-node-level. Since the interesting action takes place inside
> individual processor nodes, I _think_ it would be higher leverage to just
> measure it at the node level. WDYT?
>
> Re: Design Reasoning
>
> This section seems to be a little bit outdated. I also just noticed a 
> "surprise"
> configuration "timestamp.aggregation.selection.policy" hidden in point 1.a.
> Is that part of the proposal? We haven't discussed it, and I think we were
> talking about this KIP being "configuration free".
>
> There is also some discussion of discarded alternative in the Design Reasoning
> section, which is confusing. Finally, there was a point there I didn't 
> understand
> at all, about stateless operators not being intended to load prior results.
> This statement doesn't seem to be true, but it also doesn't seem to be 
> relevant,
> so maybe we can just drop it.
>
> Overall, it might help if you make a pass on this section, and just discuss as
> briefly as possible the justification for the proposed behavior change, and
> not adding a configuration. Try to avoid talking about things that we are not
> proposing, since that will just lead to confusion.
>
> Similarly, I'd just completely remove the "Implementation [discarded]" 
> section.
> It was good to have this as part of the discussion initially, but as we move
> toward a vote, it's better to just streamline the KIP document as much as
> possible. Keeping a "discarded" section in the document will just make it
> harder for new people to understand the proposal. We did the same thing
> with KIP-441, where there were two prior drafts included at the end of the
> document, and we just deleted them for clarity.
>
> I liked the "Compatibility" and "Rejected Alternatives" section. Very clear
> and to the point.
>
> Thanks again for the contribution! I think once the KIP document is cleaned
> up, we'll be in good shape to finalize the discussion.
> -John
>
>
> On Wed, Feb 26, 2020, at 07:27, Bruno Cadonna wrote:
> > Hi Richard,
> >
> > 1. Could you change "idempotent update operations will only be dropped
> > from KTables, not from other classes." -> idempotent update operations
> > will only be dropped from materialized KTables? For non-materialized
> > KTables -- as they can occur after optimization of the topology -- we
> > cannot drop idempotent updates.
> >
> > 2. I cannot completely follow the metrics section. Do you want to
> > record all idempotent updates or only the dropped ones? In particular,
> > I do not understand the following sentences:
> > "For that matter, even if we don't drop idempotent updates, we should
> > at the very least record the number of idempotent updates that has
> > been seen go through a particular processor."
> > "Therefore, we should add some metrics which will count the number of
> > idempotent updates that each node has seen."
> > I do not see how we can record idempotent updates that we do not drop.
> > If we see them, we should drop them. If we do not see them, we cannot
> > drop them and we cannot record them.
> >
> > Best,
> > Bruno
> >
> > On Wed, Feb 26, 2020 at 4:57 AM Richard Yu <yohan.richard...@gmail.com> 
> > wrote:
> > >
> > > Hi John,
> > >
> > > Sounds goods. It looks like we are close to wrapping things up. If there
> > > isn't any other revisions which needs to be made. (If so, please comment 
> > > in
> > > the thread)
> > > I will start the voting process this Thursday (Pacific Standard Time).
> > >
> > > Cheers,
> > > Richard
> > >
> > > On Tue, Feb 25, 2020 at 11:59 AM John Roesler <vvcep...@apache.org> wrote:
> > >
> > > > Hi Richard,
> > > >
> > > > Sorry for the slow reply. I actually think we should avoid checking
> > > > equals() for now. Your reasoning is good, but the truth is that
> > > > depending on the implementation of equals() is non-trivial,
> > > > semantically, and (though I proposed it before), I'm not convinced
> > > > it's worth the risk. Much better to start with exactly one kind of
> > > > "idempotence detection".
> > > >
> > > > Even if someone does update their serdes, we know that the new
> > > > serde would still be able to _de_serialize the old format, or the whole
> > > > app would break. The situation is that the new result gets encoded
> > > > in the new binary format, which means we don't detect an idempotent
> > > > update for what it is. In this case, we'd write the new binary format to
> > > > disk and the changelog, and forward it downstream. However, we only
> > > > do this once. Now that the binary format for that record has been 
> > > > updated,
> > > > we would correctly detect idempotence of any subsequent updates.
> > > >
> > > > Plus, we would still be able to filter out idempotent updates in
> > > > repartition
> > > > sinks, since for those, we use the new serde to serialize both the "old"
> > > > and
> > > > "new" result.
> > > >
> > > > It's certainly a good observation, but I think we can just make a note 
> > > > of
> > > > it
> > > > in "rejected alternatives" for now, and plan to refine it later, if it 
> > > > does
> > > > pose a big performance problem.
> > > >
> > > > Thanks!
> > > > -John
> > > >
> > > > On Sat, Feb 22, 2020, at 18:14, Richard Yu wrote:
> > > > > Hi all,
> > > > >
> > > > > Updated the KIP.
> > > > >
> > > > > Just a question: do you think it would be a good idea if we check for
> > > > both
> > > > > Object#equals() and binary equality?
> > > > > Because there might be some subtle changes in the serialization (for
> > > > > example, if the user decides to upgrade their serialization procedure 
> > > > > to
> > > > a
> > > > > new one), but the underlying values of the result might be the same.
> > > > > (therefore equals() might return true)
> > > > >
> > > > > Do you think this would be plausible?
> > > > >
> > > > > Cheers,
> > > > > Richard
> > > > >
> > > > > On Fri, Feb 21, 2020 at 2:37 PM Richard Yu 
> > > > > <yohan.richard...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > Just to make some updates. I changed the name of the metric so that 
> > > > > > it
> > > > was
> > > > > > more in line with usual Kafka naming conventions for metrics / 
> > > > > > sensors.
> > > > > > Below is the updated description of the metric:
> > > > > >
> > > > > > dropped-idempotent-updates : (Level 2 - Per Task) DEBUG (rate | 
> > > > > > total)
> > > > > >
> > > > > > Description: This metric will record the number of updates that have
> > > > been
> > > > > > dropped since they are essentially re-performing an earlier 
> > > > > > operation.
> > > > > >
> > > > > > Note:
> > > > > >
> > > > > >    - The rate option indicates the ratio of records dropped to 
> > > > > > actual
> > > > > >    volume of records passing through the task.
> > > > > >    - The total option will just give a raw count of the number of
> > > > records
> > > > > >    dropped.
> > > > > >
> > > > > >
> > > > > > I hope that this is more on point.
> > > > > >
> > > > > > Best,
> > > > > > Richard
> > > > > >
> > > > > > On Fri, Feb 21, 2020 at 2:20 PM Richard Yu 
> > > > > > <yohan.richard...@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Hi all,
> > > > > >>
> > > > > >> Thanks for the clarification. I was just confused a little on what 
> > > > > >> was
> > > > > >> going on.
> > > > > >>
> > > > > >> So I guess then that for the actual proposal. We got the following:
> > > > > >>
> > > > > >> 1. We check for binary equality, and perform no extra look ups.
> > > > > >> 2. Emphasize that this applies only to materialized tables.
> > > > > >> 3. We drop aggregation updates if key, value and timestamp is the
> > > > same.
> > > > > >>
> > > > > >> Then that settles the behavior changes. So it looks like the Metric
> > > > that
> > > > > >> is the only thing that is left. In this case, I think the metric
> > > > would be
> > > > > >> named the following: IdempotentUpdateMetric. This is mostly off the
> > > > top of
> > > > > >> my head. So if you think that we change it, feel free to say so.
> > > > > >> The metric will report the number of dropped operations inherently.
> > > > > >>
> > > > > >> It will probably be added as a Sensor, similar to the dropped 
> > > > > >> records
> > > > > >> sensor we already have.
> > > > > >>
> > > > > >> If there isn't anything else, I will probably start the voting 
> > > > > >> process
> > > > > >> next week!
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Richard
> > > > > >>
> > > > > >>
> > > > > >> On Fri, Feb 21, 2020 at 11:23 AM John Roesler <vvcep...@apache.org>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> Hi Bruno,
> > > > > >>>
> > > > > >>> Thanks for the clarification. Indeed, I was thinking two things:
> > > > > >>> 1. For the initial implementation, we can just avoid adding any 
> > > > > >>> extra
> > > > > >>> lookups, but only do the comparison when we already happen to have
> > > > > >>> the prior value.
> > > > > >>> 2. I think, as a result of the timestamp semantics, we actually 
> > > > > >>> _do_
> > > > look
> > > > > >>> up the prior value approximately all the time, so the idempotence
> > > > check
> > > > > >>> should be quite effective.
> > > > > >>>
> > > > > >>> I think that second point is the same thing you're referring to
> > > > > >>> potentially
> > > > > >>> being unnecessary. It does mean that we do fetch the whole value 
> > > > > >>> in a
> > > > > >>> lot of cases where we really only need the timestamp, so it could
> > > > > >>> certainly
> > > > > >>> be optimized in the future. In that future, we would need to weigh
> > > > that
> > > > > >>> optimization against losing the idempotence check. But, that's a
> > > > problem
> > > > > >>> for tomorrow :)
> > > > > >>>
> > > > > >>> I'm 100% on board with scrutinizing the performance as we 
> > > > > >>> implement
> > > > > >>> this feature.
> > > > > >>>
> > > > > >>> Thanks again,
> > > > > >>> -John
> > > > > >>>
> > > > > >>> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote:
> > > > > >>> > Hi John,
> > > > > >>> >
> > > > > >>> > I am glad to help you with your imagination. With overhead, I
> > > > mainly
> > > > > >>> > meant the additional lookup into the state store to get the 
> > > > > >>> > current
> > > > > >>> > value, but I see now in the code that we do that lookup anyways
> > > > > >>> > (although I think we could avoid that in the cases where we do 
> > > > > >>> > not
> > > > > >>> > need the old value). With or without config, we need to evaluate
> > > > the
> > > > > >>> > performance benefits of this change, in any case.
> > > > > >>> >
> > > > > >>> > Best,
> > > > > >>> > Bruno
> > > > > >>> >
> > > > > >>> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler 
> > > > > >>> > <vvcep...@apache.org>
> > > > > >>> wrote:
> > > > > >>> > >
> > > > > >>> > > Thanks for your remarks, Bruno!
> > > > > >>> > >
> > > > > >>> > > I'm in favor of standardizing on terminology like "not 
> > > > > >>> > > forwarding
> > > > > >>> > > idempotent updates" or "dropping idempotent updates". Maybe we
> > > > > >>> > > should make a pass on the KIP and just convert everything to 
> > > > > >>> > > this
> > > > > >>> > > phrasing. In retrospect, even the term "emit-on-change" has 
> > > > > >>> > > too
> > > > much
> > > > > >>> > > semantic baggage, since it implies the semantics from the 
> > > > > >>> > > SECRET
> > > > > >>> > > paper, which we don't really want to imply here.
> > > > > >>> > >
> > > > > >>> > > I'm also in favor of the metric as you propose.
> > > > > >>> > >
> > > > > >>> > > Likewise with stream aggregations, I was also under the
> > > > impression
> > > > > >>> > > that we agreed on dropping idempotent updates to the 
> > > > > >>> > > aggregation
> > > > > >>> > > result, any time we find that our "new" (key, value, 
> > > > > >>> > > timestamp)
> > > > > >>> result
> > > > > >>> > > is identical to the prior one.
> > > > > >>> > >
> > > > > >>> > > Also, I'm +1 on all your recommendations for updating the KIP
> > > > > >>> document
> > > > > >>> > > for clarity.
> > > > > >>> > >
> > > > > >>> > > Regarding the opt-out config. Perhaps I'm suffering from a
> > > > failure of
> > > > > >>> > > imagination, but I don't see how the current proposal could
> > > > really
> > > > > >>> have
> > > > > >>> > > a measurable impact on latency. If all we do is make a single
> > > > extra
> > > > > >>> pass
> > > > > >>> > > to compare two byte arrays for equality, only in the cases 
> > > > > >>> > > where
> > > > we
> > > > > >>> already
> > > > > >>> > > have the byte arrays available, it seems unlikely to 
> > > > > >>> > > measurably
> > > > > >>> affect the
> > > > > >>> > > processing of non-idempotent updates. It seems guaranteed to
> > > > > >>> _decrease_
> > > > > >>> > > the latency of processing idempotent updates, since we get to
> > > > skip a
> > > > > >>> > > store#put, at least one producer#send, and also all downstream
> > > > > >>> processing,
> > > > > >>> > > including all the disk and network operations associated with
> > > > > >>> downstream
> > > > > >>> > > operations.
> > > > > >>> > >
> > > > > >>> > > It seems like if we're pretty sure this change would only
> > > > _help_, we
> > > > > >>> shouldn't
> > > > > >>> > > introduce the operational burden of an extra configuration. 
> > > > > >>> > > If we
> > > > > >>> want to
> > > > > >>> > > be more aggressive about dropping idempotent operations in the
> > > > > >>> future,
> > > > > >>> > > such as depending on equals() or adding a ChangeDetector
> > > > interface,
> > > > > >>> then
> > > > > >>> > > we should consider adding a configuration as part of that 
> > > > > >>> > > future
> > > > > >>> work. In
> > > > > >>> > > fact, if we add a simple "opt-in/opt-out" switch right now, we
> > > > might
> > > > > >>> find
> > > > > >>> > > that it's actually insufficient for whatever future feature we
> > > > might
> > > > > >>> propose,
> > > > > >>> > > then we have a mess of deprecating the opt-out config and
> > > > replacing
> > > > > >>> it.
> > > > > >>> > >
> > > > > >>> > > What do you think?
> > > > > >>> > > -John
> > > > > >>> > >
> > > > > >>> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna wrote:
> > > > > >>> > > > Hi all,
> > > > > >>> > > >
> > > > > >>> > > > Sorry for the late reply!
> > > > > >>> > > >
> > > > > >>> > > > I am also in favour of baby steps.
> > > > > >>> > > >
> > > > > >>> > > > I am undecided whether the KIP should contain a opt-out 
> > > > > >>> > > > config
> > > > or
> > > > > >>> not.
> > > > > >>> > > > The overhead of emit-on-change might affect latency. For
> > > > > >>> applications
> > > > > >>> > > > where low latency is crucial and there are not too many
> > > > idempotent
> > > > > >>> > > > updates, it would be better to fall back to emit-on-update.
> > > > > >>> However,
> > > > > >>> > > > we do not know how much emit-on-change impacts latency. We
> > > > would
> > > > > >>> first
> > > > > >>> > > > need to benchmark that before we can decide about the
> > > > > >>> opt-out-config.
> > > > > >>> > > >
> > > > > >>> > > > A metric of dropped idempotent updates seems useful to me 
> > > > > >>> > > > to be
> > > > > >>> > > > informed about potential upstream applications or upstream
> > > > > >>> operators
> > > > > >>> > > > that produce too many idempotent updates. The KIP should 
> > > > > >>> > > > state
> > > > the
> > > > > >>> > > > name of the metric, its group, its tags, and its recording
> > > > level
> > > > > >>> (see
> > > > > >>> > > > KIP-444 or KIP-471 for examples). I propose DEBUG as 
> > > > > >>> > > > reporting
> > > > > >>> level.
> > > > > >>> > > >
> > > > > >>> > > > Richard, what competing proposals for emit-on-change for
> > > > > >>> aggregations
> > > > > >>> > > > do you mean? I have the feeling that we agreed to get rid of
> > > > > >>> > > > idempotent updates if the aggregate is updated with the same
> > > > key,
> > > > > >>> > > > value, AND timestamp. I am also fine if we do not include 
> > > > > >>> > > > this
> > > > into
> > > > > >>> > > > this KIP (remember: baby steps).
> > > > > >>> > > >
> > > > > >>> > > > You write that "emit-on-change is more correct". Since we
> > > > agreed
> > > > > >>> that
> > > > > >>> > > > this is an optimization, IMO you cannot argue this way.
> > > > > >>> > > >
> > > > > >>> > > > Please put "Alternative Approaches" under "Rejected
> > > > Alternatives",
> > > > > >>> so
> > > > > >>> > > > that it becomes clear that we are not going to implement 
> > > > > >>> > > > them.
> > > > In
> > > > > >>> > > > general, I think the KIP needs a bit of clean-up (probably, 
> > > > > >>> > > > you
> > > > > >>> > > > already planned for it). "Design Reasoning" is a bit of
> > > > behavior
> > > > > >>> > > > changes, rejected alternatives and duplicates a bit the
> > > > content in
> > > > > >>> > > > those sections.
> > > > > >>> > > >
> > > > > >>> > > > I do not like the name "no-op operations" or "no-ops", 
> > > > > >>> > > > because
> > > > they
> > > > > >>> > > > are rather generic. I like more "idempotent updates".
> > > > > >>> > > >
> > > > > >>> > > > Best,
> > > > > >>> > > > Bruno
> > > > > >>> > > >
> > > > > >>> > > >
> > > > > >>> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu <
> > > > > >>> yohan.richard...@gmail.com> wrote:
> > > > > >>> > > > >
> > > > > >>> > > > > Hi all,
> > > > > >>> > > > >
> > > > > >>> > > > > We are definitely making progress!
> > > > > >>> > > > >
> > > > > >>> > > > > @John should I emphasize in the proposed behavior changes
> > > > that
> > > > > >>> we are only
> > > > > >>> > > > > doing binary equality checks for stateful operators?
> > > > > >>> > > > > It looks like we have come close to finalizing this part 
> > > > > >>> > > > > of
> > > > the
> > > > > >>> KIP. (I
> > > > > >>> > > > > will note in the KIP that this proposal is intended for
> > > > > >>> optimization, not
> > > > > >>> > > > > semantics correctness)
> > > > > >>> > > > >
> > > > > >>> > > > > I do think maybe we still have one other detail we need to
> > > > > >>> discuss. So far,
> > > > > >>> > > > > there has been quite a bit of back and forth about what 
> > > > > >>> > > > > the
> > > > > >>> behavior of
> > > > > >>> > > > > aggregations should look like in emit on change. I have 
> > > > > >>> > > > > seen
> > > > > >>> > > > > multiple competing proposals, so I am not completely 
> > > > > >>> > > > > certain
> > > > > >>> which one we
> > > > > >>> > > > > should go with, or how we will be able to compromise in
> > > > between
> > > > > >>> them.
> > > > > >>> > > > >
> > > > > >>> > > > > Let me know what your thoughts are on this matter, since 
> > > > > >>> > > > > we
> > > > are
> > > > > >>> probably
> > > > > >>> > > > > close to wrapping up most other stuff.
> > > > > >>> > > > > @Matthias J. Sax <matth...@confluent.io>  and @Bruno, see
> > > > what
> > > > > >>> you think
> > > > > >>> > > > > about this.
> > > > > >>> > > > >
> > > > > >>> > > > > Best,
> > > > > >>> > > > > Richard
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler <
> > > > > >>> vvcep...@apache.org> wrote:
> > > > > >>> > > > >
> > > > > >>> > > > > > Thanks, Matthias!
> > > > > >>> > > > > >
> > > > > >>> > > > > > Regarding numbers, it would be hard to know how many
> > > > > >>> applications
> > > > > >>> > > > > > would benefit, since we don't know how many applications
> > > > there
> > > > > >>> are,
> > > > > >>> > > > > > or anything about their data sets or topologies. We 
> > > > > >>> > > > > > could
> > > > do a
> > > > > >>> survey,
> > > > > >>> > > > > > but it seems overkill if we take the conservative 
> > > > > >>> > > > > > approach.
> > > > > >>> > > > > >
> > > > > >>> > > > > > I have my own practical stream processing experience 
> > > > > >>> > > > > > that
> > > > > >>> tells me this
> > > > > >>> > > > > > is absolutely critical for any moderate-to-large 
> > > > > >>> > > > > > relational
> > > > > >>> stream
> > > > > >>> > > > > > processing use cases. I'll leave it to you to decide if 
> > > > > >>> > > > > > you
> > > > > >>> find that
> > > > > >>> > > > > > convincing, but it's definitely not an _assumption_. 
> > > > > >>> > > > > > I've
> > > > also
> > > > > >>> heard from
> > > > > >>> > > > > > a few Streams users who have already had to implement
> > > > their own
> > > > > >>> > > > > > noop-suppression transformers in order to get to 
> > > > > >>> > > > > > production
> > > > > >>> scale.
> > > > > >>> > > > > >
> > > > > >>> > > > > > Regardless, it sounds like we can agree on taking an
> > > > > >>> opportunistic approach
> > > > > >>> > > > > > and targeting the optimization just to use a
> > > > binary-equality
> > > > > >>> check at
> > > > > >>> > > > > > stateful operators. (I'd also suggest in sink nodes, 
> > > > > >>> > > > > > when
> > > > we
> > > > > >>> are about to
> > > > > >>> > > > > > send old and new values, since they are also already
> > > > present
> > > > > >>> and serialized
> > > > > >>> > > > > > at that point.) We could make the KIP even more vague, 
> > > > > >>> > > > > > and
> > > > > >>> just say that
> > > > > >>> > > > > > we'll drop no-op updates "when possible".
> > > > > >>> > > > > >
> > > > > >>> > > > > > I'm curious what Bruno and the others think about this. 
> > > > > >>> > > > > > If
> > > > it
> > > > > >>> seems like
> > > > > >>> > > > > > a good starting point, perhaps we could move to a vote 
> > > > > >>> > > > > > soon
> > > > > >>> and get to
> > > > > >>> > > > > > work on the implementation!
> > > > > >>> > > > > >
> > > > > >>> > > > > > Thanks,
> > > > > >>> > > > > > -John
> > > > > >>> > > > > >
> > > > > >>> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J. Sax wrote:
> > > > > >>> > > > > > > Talking about optimizations and reducing downstream 
> > > > > >>> > > > > > > load:
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > Do we actually have any numbers? I have the impression
> > > > that
> > > > > >>> this KIP is
> > > > > >>> > > > > > > more or less build on the _assumption_ that there is a
> > > > > >>> problem. Yes,
> > > > > >>> > > > > > > there are some use cases that would benefit from this;
> > > > But
> > > > > >>> how many
> > > > > >>> > > > > > > applications would actually benefit? And how much load
> > > > > >>> reduction would
> > > > > >>> > > > > > > they get?
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > The simplest approach (following John idea to make 
> > > > > >>> > > > > > > baby
> > > > > >>> steps) would be
> > > > > >>> > > > > > > to apply the emit-on-change pattern only if there is a
> > > > > >>> store. For this
> > > > > >>> > > > > > > case we need to serialize old and new result anyway 
> > > > > >>> > > > > > > and
> > > > thus
> > > > > >>> a simple
> > > > > >>> > > > > > > byte-array comparison is no overhead.
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > Sending `oldValues` by default would become expensive
> > > > > >>> because we would
> > > > > >>> > > > > > > need to serialize the recomputed old result, as well 
> > > > > >>> > > > > > > as
> > > > the
> > > > > >>> new result,
> > > > > >>> > > > > > > to make the comparison (and we now the serialization 
> > > > > >>> > > > > > > is
> > > > not
> > > > > >>> cheap). We
> > > > > >>> > > > > > > are facing a trade-off between CPU overhead and
> > > > downstream
> > > > > >>> load and I am
> > > > > >>> > > > > > > not sure if we should hard code this. My original
> > > > argument
> > > > > >>> for sending
> > > > > >>> > > > > > > `oldValues` was about semantics; but for an
> > > > optimization, I
> > > > > >>> am not sure
> > > > > >>> > > > > > > if this would be the right choice.
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > For now, users who want to opt-in can force a
> > > > > >>> materialization. A
> > > > > >>> > > > > > > materialization may be expensive and if we see future
> > > > > >>> demand, we could
> > > > > >>> > > > > > > still add an option to send `oldValues` instead of
> > > > > >>> materialization (this
> > > > > >>> > > > > > > would at least save the store overhead). As we 
> > > > > >>> > > > > > > consider
> > > > the
> > > > > >>> KIP an
> > > > > >>> > > > > > > optimization, a "config" seems to make sense.
> > > > > >>> > > > > > >
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > -Matthias
> > > > > >>> > > > > > >
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote:
> > > > > >>> > > > > > > > Hi John!
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > Thanks for the reply.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > About the changes we have discussed so far. I think
> > > > upon
> > > > > >>> further
> > > > > >>> > > > > > > > consideration, we have been mostly talking about 
> > > > > >>> > > > > > > > this
> > > > from
> > > > > >>> the
> > > > > >>> > > > > > perspective
> > > > > >>> > > > > > > > that no stop-gap effort is acceptable. However, in
> > > > recent
> > > > > >>> discussion,
> > > > > >>> > > > > > > if we
> > > > > >>> > > > > > > > consider optimization, then it appears that the
> > > > > >>> perspective I
> > > > > >>> > > > > > mentioned no
> > > > > >>> > > > > > > > longer applies. After all, we are no longer 
> > > > > >>> > > > > > > > concerned
> > > > so
> > > > > >>> much about
> > > > > >>> > > > > > > > semantics correctness, then reducing traffic as 
> > > > > >>> > > > > > > > much as
> > > > > >>> possible
> > > > > >>> > > > > > without
> > > > > >>> > > > > > > > performance tradeoffs.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > In this case, I think a cache would be a good idea 
> > > > > >>> > > > > > > > for
> > > > > >>> stateless
> > > > > >>> > > > > > > > operations. This cache will not be backed by a store
> > > > > >>> obviously. We can
> > > > > >>> > > > > > > > probably use Kafka's ThreadCache. We should be able 
> > > > > >>> > > > > > > > to
> > > > > >>> catch a large
> > > > > >>> > > > > > > > portion of the no-ops if we at least store some
> > > > results in
> > > > > >>> the cache.
> > > > > >>> > > > > > Not
> > > > > >>> > > > > > > > all will be caught, but I think the impact will be
> > > > > >>> significant.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > On another note, I think that we should implement
> > > > > >>> competing proposals
> > > > > >>> > > > > > i.e.
> > > > > >>> > > > > > > > one where we forward both old and new values with a
> > > > > >>> reasonable
> > > > > >>> > > > > > proportion
> > > > > >>> > > > > > > > of artificial no-ops (we do not necessarily have to
> > > > rely
> > > > > >>> on equals so
> > > > > >>> > > > > > much
> > > > > >>> > > > > > > > as comparing the serialized binary data after the
> > > > > >>> operation), and in
> > > > > >>> > > > > > > > another scenario, the cache for stateless ops. It
> > > > would be
> > > > > >>> > > > > > unreasonable if
> > > > > >>> > > > > > > > we completely disregard either approach, since they
> > > > both
> > > > > >>> have merit.
> > > > > >>> > > > > > The
> > > > > >>> > > > > > > > reason for implementing both is to perform benchmark
> > > > tests
> > > > > >>> on them, and
> > > > > >>> > > > > > > > compare them with the original. This way, we can 
> > > > > >>> > > > > > > > more
> > > > > >>> clearly see what
> > > > > >>> > > > > > is
> > > > > >>> > > > > > > > the drawbacks and the gains. So far, we have been
> > > > > >>> discussing only
> > > > > >>> > > > > > > > hypotheticals, and if we continue to do so, I think 
> > > > > >>> > > > > > > > it
> > > > is
> > > > > >>> likely no
> > > > > >>> > > > > > ground
> > > > > >>> > > > > > > > will be gained.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > After all, what we seek is optimization, and
> > > > performance
> > > > > >>> benchmarks
> > > > > >>> > > > > > > will be
> > > > > >>> > > > > > > > mandatory for a KIP of this nature.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > Hope this helps,
> > > > > >>> > > > > > > > Richard
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John Roesler <
> > > > > >>> vvcep...@apache.org>
> > > > > >>> > > > > > wrote:
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >> Hi again, all,
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> Sorry on my part for my silence.
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> I've just taken another look over the recent 
> > > > > >>> > > > > > > >> history
> > > > of
> > > > > >>> this
> > > > > >>> > > > > > > discussion. It
> > > > > >>> > > > > > > >> seems like the #1 point to clarify (because it 
> > > > > >>> > > > > > > >> affect
> > > > > >>> everything
> > > > > >>> > > > > > else) is
> > > > > >>> > > > > > > >> that,
> > > > > >>> > > > > > > >> yes, I was 100% envisioning this as an 
> > > > > >>> > > > > > > >> _optimization_.
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> As a consequence, I don't think it's critical to 
> > > > > >>> > > > > > > >> make
> > > > any
> > > > > >>> hard
> > > > > >>> > > > > > guarantees
> > > > > >>> > > > > > > >> about
> > > > > >>> > > > > > > >> what results get forwarded and what (no-op updates)
> > > > get
> > > > > >>> dropped. I'd
> > > > > >>> > > > > > > >> initially
> > > > > >>> > > > > > > >> just been thinking about doing this 
> > > > > >>> > > > > > > >> opportunistically
> > > > in
> > > > > >>> cases where
> > > > > >>> > > > > > we
> > > > > >>> > > > > > > >> already
> > > > > >>> > > > > > > >> had the "old" and "new" result in memory, thanks 
> > > > > >>> > > > > > > >> to a
> > > > > >>> request to
> > > > > >>> > > > > > > "emit old
> > > > > >>> > > > > > > >> values", or to the implementation of timestamp
> > > > semantics.
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> However, whether or not it's semantically 
> > > > > >>> > > > > > > >> critical, I
> > > > do
> > > > > >>> think that
> > > > > >>> > > > > > > >> Matthias's
> > > > > >>> > > > > > > >> idea to use the change-forwarding mechanism to 
> > > > > >>> > > > > > > >> check
> > > > for
> > > > > >>> no-ops even
> > > > > >>> > > > > > on
> > > > > >>> > > > > > > >> stateless operations is pretty interesting.
> > > > Specifically,
> > > > > >>> this would
> > > > > >>> > > > > > > >> _really_
> > > > > >>> > > > > > > >> let you pare down useless updates by using 
> > > > > >>> > > > > > > >> mapValues
> > > > to
> > > > > >>> strip down
> > > > > >>> > > > > > > records
> > > > > >>> > > > > > > >> only
> > > > > >>> > > > > > > >> to what you really need. However, the dependence on
> > > > the
> > > > > >>> > > > > > implementation of
> > > > > >>> > > > > > > >> equals() is troubling.
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> It might make sense to table this idea, as well as 
> > > > > >>> > > > > > > >> my
> > > > > >>> complicated
> > > > > >>> > > > > > no-op
> > > > > >>> > > > > > > >> detection algorithm, and initially propose just a
> > > > > >>> nonconfigurable
> > > > > >>> > > > > > feature
> > > > > >>> > > > > > > >> to
> > > > > >>> > > > > > > >> check "old" and "new" results for binary equality
> > > > before
> > > > > >>> forwarding.
> > > > > >>> > > > > > > I.e.,
> > > > > >>> > > > > > > >> if
> > > > > >>> > > > > > > >> any operation determines that the old and new 
> > > > > >>> > > > > > > >> results
> > > > are
> > > > > >>> > > > > > > >> binary-identical, we
> > > > > >>> > > > > > > >> would not forward.
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> I'll admit that this doesn't serve Tommy's use case
> > > > very
> > > > > >>> well, but it
> > > > > >>> > > > > > > >> might be
> > > > > >>> > > > > > > >> better to take baby steps with an optimization like
> > > > this
> > > > > >>> and not risk
> > > > > >>> > > > > > > >> over-reaching in a way that actually harms
> > > > performance or
> > > > > >>> > > > > > correctness. We
> > > > > >>> > > > > > > >> could
> > > > > >>> > > > > > > >> always expand the feature to use equals() or some
> > > > kind of
> > > > > >>> > > > > > ChangeDetector
> > > > > >>> > > > > > > >> later
> > > > > >>> > > > > > > >> on, in a more focused discussion.
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> Regarding metrics or debug logs, I guess I don't 
> > > > > >>> > > > > > > >> feel
> > > > > >>> strongly, but it
> > > > > >>> > > > > > > >> feels
> > > > > >>> > > > > > > >> like two things will happen that make it nicer to 
> > > > > >>> > > > > > > >> add
> > > > > >>> them:
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> 1. This feature is going to surprise/annoy 
> > > > > >>> > > > > > > >> _somebody_,
> > > > > >>> and it would be
> > > > > >>> > > > > > > >> nice to
> > > > > >>> > > > > > > >> be able to definitively say the reason that updates
> > > > are
> > > > > >>> dropped is
> > > > > >>> > > > > > that
> > > > > >>> > > > > > > >> they
> > > > > >>> > > > > > > >> were no-ops. The easiest smoking gun is if there 
> > > > > >>> > > > > > > >> are
> > > > > >>> debug-logs that
> > > > > >>> > > > > > > can be
> > > > > >>> > > > > > > >> enabled. This person might just be looking at the
> > > > > >>> dashboards,
> > > > > >>> > > > > > > wondering why
> > > > > >>> > > > > > > >> there are 100K updates per second going into their
> > > > app,
> > > > > >>> but only 1K
> > > > > >>> > > > > > > >> results per
> > > > > >>> > > > > > > >> second coming out. Having the metric there makes 
> > > > > >>> > > > > > > >> the
> > > > > >>> accounting
> > > > > >>> > > > > > easier.
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> 2. Somebody is going to struggle with high-volume
> > > > > >>> updates, and it
> > > > > >>> > > > > > > would be
> > > > > >>> > > > > > > >> nice
> > > > > >>> > > > > > > >> for them to know that this feature is saving them
> > > > > >>> X-thousand updates
> > > > > >>> > > > > > per
> > > > > >>> > > > > > > >> second,
> > > > > >>> > > > > > > >> etc.
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> What does everyone think about this? Note, as I 
> > > > > >>> > > > > > > >> read
> > > > it,
> > > > > >>> what I've
> > > > > >>> > > > > > said
> > > > > >>> > > > > > > >> above is
> > > > > >>> > > > > > > >> already reflected in the text of the KIP.
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> Thanks,
> > > > > >>> > > > > > > >> -John
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard Yu wrote:
> > > > > >>> > > > > > > >>> Hi all,
> > > > > >>> > > > > > > >>>
> > > > > >>> > > > > > > >>> Bumping this. If you feel that this KIP is not too
> > > > > >>> urgent. Then let
> > > > > >>> > > > > > me
> > > > > >>> > > > > > > >>> know. :)
> > > > > >>> > > > > > > >>>
> > > > > >>> > > > > > > >>> Cheers,
> > > > > >>> > > > > > > >>> Richard
> > > > > >>> > > > > > > >>>
> > > > > >>> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard Yu <
> > > > > >>> > > > > > yohan.richard...@gmail.com>
> > > > > >>> > > > > > > >>> wrote:
> > > > > >>> > > > > > > >>>
> > > > > >>> > > > > > > >>>> Hi all,
> > > > > >>> > > > > > > >>>>
> > > > > >>> > > > > > > >>>> I've had just a few thoughts regarding the
> > > > forwarding
> > > > > >>> of <key,
> > > > > >>> > > > > > > >>>> change<old_value, new_value>>. As Matthias 
> > > > > >>> > > > > > > >>>> already
> > > > > >>> mentioned, there
> > > > > >>> > > > > > > >> are two
> > > > > >>> > > > > > > >>>> separate priorities by which we can judge this 
> > > > > >>> > > > > > > >>>> KIP:
> > > > > >>> > > > > > > >>>>
> > > > > >>> > > > > > > >>>> 1. A optimization perspective: In this case, the
> > > > user
> > > > > >>> would prefer
> > > > > >>> > > > > > the
> > > > > >>> > > > > > > >>>> impact of this KIP to be as minimal as possible. 
> > > > > >>> > > > > > > >>>> By
> > > > > >>> such logic, if
> > > > > >>> > > > > > > >>>> stateless operations are performed twice, that 
> > > > > >>> > > > > > > >>>> could
> > > > > >>> prove
> > > > > >>> > > > > > > >> unacceptable for
> > > > > >>> > > > > > > >>>> them. (since operations can prove expensive)
> > > > > >>> > > > > > > >>>>
> > > > > >>> > > > > > > >>>> 2. Semantics correctness perspective: Unlike the
> > > > > >>> optimization
> > > > > >>> > > > > > > >> approach, we
> > > > > >>> > > > > > > >>>> are more concerned with all KTable operations
> > > > obeying
> > > > > >>> the same
> > > > > >>> > > > > > emission
> > > > > >>> > > > > > > >>>> policy. i.e. emit on change. In this case, a
> > > > > >>> discrepancy would not
> > > > > >>> > > > > > be
> > > > > >>> > > > > > > >>>> tolerated, even though an extra performance cost
> > > > will
> > > > > >>> be incurred.
> > > > > >>> > > > > > > >>>> Therefore, we will follow Matthias's approach, 
> > > > > >>> > > > > > > >>>> and
> > > > then
> > > > > >>> perform the
> > > > > >>> > > > > > > >>>> operation once on the old value, and once on the
> > > > new.
> > > > > >>> > > > > > > >>>>
> > > > > >>> > > > > > > >>>> The issue here I think is more black and white 
> > > > > >>> > > > > > > >>>> than
> > > > in
> > > > > >>> between. The
> > > > > >>> > > > > > > >> second
> > > > > >>> > > > > > > >>>> option in particular would be favorable for users
> > > > with
> > > > > >>> inexpensive
> > > > > >>> > > > > > > >>>> stateless operations, while for the former 
> > > > > >>> > > > > > > >>>> option,
> > > > we
> > > > > >>> are probably
> > > > > >>> > > > > > > >> dealing
> > > > > >>> > > > > > > >>>> with more expensive ones. So the simplest 
> > > > > >>> > > > > > > >>>> solution
> > > > is
> > > > > >>> probably to
> > > > > >>> > > > > > > >> allow the
> > > > > >>> > > > > > > >>>> user to choose one of the behaviors, and have a
> > > > config
> > > > > >>> which can
> > > > > >>> > > > > > > >> switch in
> > > > > >>> > > > > > > >>>> between them.
> > > > > >>> > > > > > > >>>>
> > > > > >>> > > > > > > >>>> Its the simplest compromise I can come up with at
> > > > the
> > > > > >>> moment, but if
> > > > > >>> > > > > > > >> you
> > > > > >>> > > > > > > >>>> think you have a better plan which could better
> > > > balance
> > > > > >>> tradeoffs.
> > > > > >>> > > > > > Then
> > > > > >>> > > > > > > >>>> please let us know. :)
> > > > > >>> > > > > > > >>>>
> > > > > >>> > > > > > > >>>> Best,
> > > > > >>> > > > > > > >>>> Richard
> > > > > >>> > > > > > > >>>>
> > > > > >>> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John Roesler <
> > > > > >>> vvcep...@apache.org>
> > > > > >>> > > > > > > >> wrote:
> > > > > >>> > > > > > > >>>>
> > > > > >>> > > > > > > >>>>> Hi all,
> > > > > >>> > > > > > > >>>>>
> > > > > >>> > > > > > > >>>>> Thanks for the thoughtful comments!
> > > > > >>> > > > > > > >>>>>
> > > > > >>> > > > > > > >>>>> I need more time to reflect on your thoughts, 
> > > > > >>> > > > > > > >>>>> but
> > > > just
> > > > > >>> wanted to
> > > > > >>> > > > > > offer
> > > > > >>> > > > > > > >>>>> a quick clarification about equals().
> > > > > >>> > > > > > > >>>>>
> > > > > >>> > > > > > > >>>>> I only meant that we can't be sure if a class's
> > > > > >>> equals()
> > > > > >>> > > > > > > >> implementation
> > > > > >>> > > > > > > >>>>> returns true for two semantically identical
> > > > instances.
> > > > > >>> I.e., if a
> > > > > >>> > > > > > > >> class
> > > > > >>> > > > > > > >>>>> doesn't
> > > > > >>> > > > > > > >>>>> override the default equals() implementation, 
> > > > > >>> > > > > > > >>>>> then
> > > > we
> > > > > >>> would see
> > > > > >>> > > > > > > >> behavior
> > > > > >>> > > > > > > >>>>> like:
> > > > > >>> > > > > > > >>>>>
> > > > > >>> > > > > > > >>>>> new MyPair("A", 1).equals(new MyPair("A", 1))
> > > > returns
> > > > > >>> false
> > > > > >>> > > > > > > >>>>>
> > > > > >>> > > > > > > >>>>> In that case, I would still like to catch no-op
> > > > > >>> updates by
> > > > > >>> > > > > > comparing
> > > > > >>> > > > > > > >> the
> > > > > >>> > > > > > > >>>>> serialized form of the records when we happen to
> > > > have
> > > > > >>> it serialized
> > > > > >>> > > > > > > >> anyway
> > > > > >>> > > > > > > >>>>> (such as when the operation is stateful, or when
> > > > we're
> > > > > >>> sending to a
> > > > > >>> > > > > > > >>>>> repartition topic and we have both the "new" and
> > > > "old"
> > > > > >>> value from
> > > > > >>> > > > > > > >>>>> upstream).
> > > > > >>> > > > > > > >>>>>
> > > > > >>> > > > > > > >>>>> I didn't mean to suggest we'd try to use
> > > > reflection to
> > > > > >>> detect
> > > > > >>> > > > > > whether
> > > > > >>> > > > > > > >>>>> equals
> > > > > >>> > > > > > > >>>>> is implemented, although that is a neat trick. I
> > > > was
> > > > > >>> thinking more
> > > > > >>> > > > > > of
> > > > > >>> > > > > > > >> a
> > > > > >>> > > > > > > >>>>> belt-and-suspenders algorithm where we do the 
> > > > > >>> > > > > > > >>>>> check
> > > > > >>> for no-ops
> > > > > >>> > > > > > based
> > > > > >>> > > > > > > >> on
> > > > > >>> > > > > > > >>>>> equals() and then _also_ check the serialized 
> > > > > >>> > > > > > > >>>>> bytes
> > > > > >>> for equality.
> > > > > >>> > > > > > > >>>>>
> > > > > >>> > > > > > > >>>>> Thanks,
> > > > > >>> > > > > > > >>>>> -John
> > > > > >>> > > > > > > >>>>>
> > > > > >>> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu wrote:
> > > > > >>> > > > > > > >>>>>> Thanks for the comments, Matthias.
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > > > >>>>>> w.r.t. requirement of an `equals()`
> > > > implementation,
> > > > > >>> each template
> > > > > >>> > > > > > > >> type
> > > > > >>> > > > > > > >>>>>> would have an equals() method. We can use the
> > > > > >>> following code to
> > > > > >>> > > > > > know
> > > > > >>> > > > > > > >>>>>> whether it is provided by JVM or provided by 
> > > > > >>> > > > > > > >>>>>> user.
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > > > >>>>>> boolean customEquals = false;
> > > > > >>> > > > > > > >>>>>> try {
> > > > > >>> > > > > > > >>>>>>     Class cls =
> > > > value.getClass().getMethod("equals",
> > > > > >>> > > > > > > >>>>>> Object.class).getDeclaringClass();
> > > > > >>> > > > > > > >>>>>>     if (!Object.class.equals(cls)) {
> > > > > >>> > > > > > > >>>>>>         customEquals = true;
> > > > > >>> > > > > > > >>>>>>     }
> > > > > >>> > > > > > > >>>>>> } catch (NoSuchMethodException nsme) {
> > > > > >>> > > > > > > >>>>>>     // equals is always defined, this wouldn't 
> > > > > >>> > > > > > > >>>>>> hit
> > > > > >>> > > > > > > >>>>>> }
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > > > >>>>>> The next question is: what if the user doesn't
> > > > > >>> provide equals()
> > > > > >>> > > > > > > >> method ?
> > > > > >>> > > > > > > >>>>>> Would we automatically fall back to
> > > > emit-on-update ?
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > > > >>>>>> Cheers
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM Matthias J. Sax 
> > > > > >>> > > > > > > >>>>>> <
> > > > > >>> mj...@apache.org>
> > > > > >>> > > > > > > >>>>> wrote:
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > > > > First a high level comment:
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > Overall, I would like to make one step back, and 
> > > > > >>> > > > > > > > make
> > > > sure
> > > > > >>> we are
> > > > > >>> > > > > > > > discussion on the same level. Originally, I 
> > > > > >>> > > > > > > > understood
> > > > > >>> this KIP
> > > > > >>> > > > > > > >>> as a
> > > > > >>> > > > > > > > proposed change of _semantics_, however, given the
> > > > latest
> > > > > >>> > > > > > > >>> discussion
> > > > > >>> > > > > > > > it seems it's actually not -- it's more an
> > > > _optimization_
> > > > > >>> > > > > > > >>> proposal.
> > > > > >>> > > > > > > > Hence, we only need to make sure that this 
> > > > > >>> > > > > > > > optimization
> > > > > >>> does not
> > > > > >>> > > > > > > >>> break
> > > > > >>> > > > > > > > existing semantics. It this the right way to think
> > > > about
> > > > > >>> it?
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > If yes, than it might actually be ok to have 
> > > > > >>> > > > > > > > different
> > > > > >>> behavior
> > > > > >>> > > > > > > > depending if there is a materialized KTable or not. 
> > > > > >>> > > > > > > > So
> > > > > >>> far, we
> > > > > >>> > > > > > > >>> never
> > > > > >>> > > > > > > > defined a public contract about our emit strategy 
> > > > > >>> > > > > > > > and
> > > > it
> > > > > >>> seems
> > > > > >>> > > > > > > >>> this
> > > > > >>> > > > > > > > KIP does not define one either.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > Hence, I don't have as strong of an opinion about
> > > > sending
> > > > > >>> > > > > > > >>> oldValues
> > > > > >>> > > > > > > > for example any longer. I guess the question is 
> > > > > >>> > > > > > > > really,
> > > > > >>> what can
> > > > > >>> > > > > > > >>> we
> > > > > >>> > > > > > > > implement in a reasonable way.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > Other comments:
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > @Richard:
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > Can you please add the KIP to the KIP overview 
> > > > > >>> > > > > > > > table:
> > > > It's
> > > > > >>> missing
> > > > > >>> > > > > > > > (
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > > > >>>
> > > > > >>> > > > > >
> > > > > >>>
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro
> > > > > >>> > > > > > > > posals).
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > @Bruno:
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > You mentioned caching. I think it's irrelevant
> > > > > >>> (orthogonal) and
> > > > > >>> > > > > > > >>> we can
> > > > > >>> > > > > > > > discuss this KIP without considering it.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > @John:
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >>>>>>>>> Even in the source table, we forward the
> > > > updated
> > > > > >>> record with
> > > > > >>> > > > > > the
> > > > > >>> > > > > > > >>>>>>>>> higher of the two timestamps. So the 
> > > > > >>> > > > > > > >>>>>>>>> example is
> > > > > >>> more like:
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > That is not correct. Currently, we forward with the
> > > > smaller
> > > > > >>> > > > > > > > out-of-order timestamp (changing the timestamp would
> > > > > >>> corrupt the
> > > > > >>> > > > > > > >>> data
> > > > > >>> > > > > > > > -- we don't know, because we don't check, if the 
> > > > > >>> > > > > > > > value
> > > > is
> > > > > >>> the
> > > > > >>> > > > > > > >>> same
> > > > > >>> > > > > > > >>>>>> or
> > > > > >>> > > > > > > > a different one, hence, we must emit the 
> > > > > >>> > > > > > > > out-of-order
> > > > > >>> record
> > > > > >>> > > > > > > >>> as-is).
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > If we start to do emit-on-change, we also need to 
> > > > > >>> > > > > > > > emit
> > > > a
> > > > > >>> new
> > > > > >>> > > > > > > >>> record if
> > > > > >>> > > > > > > > the timestamp changes due to out-of-order data, 
> > > > > >>> > > > > > > > hence,
> > > > we
> > > > > >>> would
> > > > > >>> > > > > > > >>> still
> > > > > >>> > > > > > > > need to emit <K,V,T1> because that give us correct
> > > > > >>> semantics:
> > > > > >>> > > > > > > >>> assume
> > > > > >>> > > > > > > > you have a filter() and afterward use the filter
> > > > KTable in
> > > > > >>> a
> > > > > >>> > > > > > > > stream-table join -- the lower T1 timestamp must be
> > > > > >>> propagated to
> > > > > >>> > > > > > > >>> the
> > > > > >>> > > > > > > > filtered KTable to ensure that that the stream-table
> > > > join
> > > > > >>> compute
> > > > > >>> > > > > > > >>> the
> > > > > >>> > > > > > > > correct result.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > Your point about requiring an `equals()`
> > > > implementation is
> > > > > >>> > > > > > > >>> actually a
> > > > > >>> > > > > > > > quite interesting one and boils down to my statement
> > > > from
> > > > > >>> above
> > > > > >>> > > > > > > >>> about
> > > > > >>> > > > > > > > "what can we actually implement". What I don't
> > > > understand
> > > > > >>> is:
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >>>>>>>>> This way, we still don't have to rely on the
> > > > > >>> existence of an
> > > > > >>> > > > > > > >>>>>>>>> equals() method, but if it is there, we can
> > > > > >>> benefit from it.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > Your bullet point (2) says it uses `equals()` --
> > > > hence, it
> > > > > >>> seems
> > > > > >>> > > > > > > >>> we
> > > > > >>> > > > > > > > actually to rely on it? Also, how can we detect if
> > > > there
> > > > > >>> is an
> > > > > >>> > > > > > > > `equals()` method to do the comparison? Would be 
> > > > > >>> > > > > > > > fail
> > > > if
> > > > > >>> we don't
> > > > > >>> > > > > > > >>> have
> > > > > >>> > > > > > > > `equals()` nor corresponding serializes to do the
> > > > > >>> comparison?
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we absolutely 
> > > > > >>> > > > > > > >>>>>>>>> need
> > > > > >>> metrics and
> > > > > >>> > > > > > > >>> logs if
> > > > > >>> > > > > > > >>>>>>>>> we're going to drop any records. And, yes, 
> > > > > >>> > > > > > > >>>>>>>>> we
> > > > > >>> should propose
> > > > > >>> > > > > > > >>>>>>>>> metrics and logs that are similar to the
> > > > existing
> > > > > >>> ones when we
> > > > > >>> > > > > > > >>> drop
> > > > > >>> > > > > > > >>>>>>>>> records for other reasons.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > I am not sure about this point. In fact, we have
> > > > already
> > > > > >>> some
> > > > > >>> > > > > > > >>> no-ops
> > > > > >>> > > > > > > > in Kafka Streams in our join-operators and don't 
> > > > > >>> > > > > > > > report
> > > > > >>> any of
> > > > > >>> > > > > > > >>> those
> > > > > >>> > > > > > > > either. Emit-on-change is operator semantics and I
> > > > don't
> > > > > >>> see why
> > > > > >>> > > > > > > >>> we
> > > > > >>> > > > > > > > would need to have a metric for it? It seems to be
> > > > quite
> > > > > >>> different
> > > > > >>> > > > > > > > compared to dropping late or malformed records.
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > -Matthias
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote:
> > > > > >>> > > > > > > >>>>>>>>> Thanks John for your thoughtful reply. Some
> > > > > >>> comments inline.
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 -0600, John 
> > > > > >>> > > > > > > >>>>>>>>> Roesler
> > > > > >>> wrote:
> > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was
> > > > sent
> > > > > >>> from outside
> > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments
> > > > > >>> unless you
> > > > > >>> > > > > > expected
> > > > > >>> > > > > > > >>>>>>>>>> them. ________________________________
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> Hi Tommy,
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> Thanks for the context. I can see the
> > > > attraction
> > > > > >>> of
> > > > > >>> > > > > > considering
> > > > > >>> > > > > > > >>>>>>>>>> these use cases together.
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> To answer your question, if a part of the
> > > > record
> > > > > >>> is not
> > > > > >>> > > > > > > >>> relevant
> > > > > >>> > > > > > > >>>>>>>>>> to downstream consumers, I was thinking you
> > > > could
> > > > > >>> just use a
> > > > > >>> > > > > > > >>>>>>>>>> mapValue to remove it.
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a join 
> > > > > >>> > > > > > > >>>>>>>>>> between
> > > > two
> > > > > >>> tables.
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> employeeInfo.join( employeePayroll, (info,
> > > > > >>> payroll) -> new
> > > > > >>> > > > > > > >>>>>>>>>> Result(info.name(), payroll.salary()) )
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> We only care about one attribute from the 
> > > > > >>> > > > > > > >>>>>>>>>> Info
> > > > > >>> table (name),
> > > > > >>> > > > > > > >>> and
> > > > > >>> > > > > > > >>>>>>>>>> one from the Payroll table (salary), and 
> > > > > >>> > > > > > > >>>>>>>>>> these
> > > > > >>> attributes
> > > > > >>> > > > > > > >>> change
> > > > > >>> > > > > > > >>>>>>>>>> rarely. On the other hand, there might be 
> > > > > >>> > > > > > > >>>>>>>>>> many
> > > > > >>> other
> > > > > >>> > > > > > attributes
> > > > > >>> > > > > > > >>>>>>>>>> that change frequently of these tables. We 
> > > > > >>> > > > > > > >>>>>>>>>> can
> > > > > >>> avoid
> > > > > >>> > > > > > triggering
> > > > > >>> > > > > > > >>>>>>>>>> the join unnecessarily by mapping the input
> > > > > >>> tables to drop the
> > > > > >>> > > > > > > >>>>>>>>>> unnecessary information before the join:
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> names = employeeInfo.mapValues(info ->
> > > > info.name())
> > > > > >>> salaries
> > > > > >>> > > > > > =
> > > > > >>> > > > > > > >>>>>>>>>> employeePayroll.mapValues(payroll ->
> > > > > >>> payroll.salary())
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> names.join( salaries, (name, salary) -> new
> > > > > >>> Result(name,
> > > > > >>> > > > > > > >>> salary)
> > > > > >>> > > > > > > >>>>>>>>>> )
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> Ahh yes I see. This works, but in the case
> > > > where
> > > > > >>> you're using
> > > > > >>> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it seems like
> > > > this
> > > > > >>> approach
> > > > > >>> > > > > > could
> > > > > >>> > > > > > > >>>>>>>>> lead to a proliferation of "skinny" record
> > > > types
> > > > > >>> that just drop
> > > > > >>> > > > > > > >>>>>>>>> various fields.
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> Especially if we take Matthias's idea to 
> > > > > >>> > > > > > > >>>>>>>>>> drop
> > > > > >>> non-changes even
> > > > > >>> > > > > > > >>>>>>>>>> for stateless operations, this would be 
> > > > > >>> > > > > > > >>>>>>>>>> quite
> > > > > >>> efficient and is
> > > > > >>> > > > > > > >>>>>>>>>> also a very straightforward optimization to
> > > > > >>> understand once
> > > > > >>> > > > > > you
> > > > > >>> > > > > > > >>>>>>>>>> know that Streams provides emit-on-change.
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> From the context that you provided, it 
> > > > > >>> > > > > > > >>>>>>>>>> seems
> > > > like
> > > > > >>> a slightly
> > > > > >>> > > > > > > >>>>>>>>>> different situation, though. Reading 
> > > > > >>> > > > > > > >>>>>>>>>> between
> > > > the
> > > > > >>> lines a
> > > > > >>> > > > > > > >>> little,
> > > > > >>> > > > > > > >>>>>>>>>> it sounds like: in contrast to the example
> > > > above,
> > > > > >>> in which we
> > > > > >>> > > > > > > >>> are
> > > > > >>> > > > > > > >>>>>>>>>> filtering out extra _data_, you have some
> > > > extra
> > > > > >>> _metadata_
> > > > > >>> > > > > > that
> > > > > >>> > > > > > > >>>>>>>>>> you still wish to pass down with the data 
> > > > > >>> > > > > > > >>>>>>>>>> when
> > > > > >>> there is a
> > > > > >>> > > > > > > >>> "real"
> > > > > >>> > > > > > > >>>>>>>>>> update, but you don't want the metadata
> > > > itself to
> > > > > >>> cause an
> > > > > >>> > > > > > > >>>>>>>>>> update.
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> Despite my lack of clarity, yes you've got 
> > > > > >>> > > > > > > >>>>>>>>> it
> > > > > >>> right ;) This
> > > > > >>> > > > > > > >>>>>>>>> particular processor is the first stop for 
> > > > > >>> > > > > > > >>>>>>>>> this
> > > > > >>> data after
> > > > > >>> > > > > > > >>> coming
> > > > > >>> > > > > > > >>>>>>>>> in from external users, who often simply 
> > > > > >>> > > > > > > >>>>>>>>> post
> > > > the
> > > > > >>> same content
> > > > > >>> > > > > > > >>> each
> > > > > >>> > > > > > > >>>>>>>>> time and we're trying to shield downstream
> > > > > >>> consumers from
> > > > > >>> > > > > > > >>>>>>>>> unnecessary churn.
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> It does seem handy to be able to plug in a
> > > > custom
> > > > > >>> > > > > > > >>> ChangeDetector
> > > > > >>> > > > > > > >>>>>>>>>> for this purpose, but I worry about the API
> > > > > >>> complexity. Maybe
> > > > > >>> > > > > > > >>> you
> > > > > >>> > > > > > > >>>>>>>>>> can help think though how to provide the 
> > > > > >>> > > > > > > >>>>>>>>>> same
> > > > > >>> benefit while
> > > > > >>> > > > > > > >>>>>>>>>> limiting user-facing complexity.
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> Here's some extra context to consider:
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> We currently don't make any extra 
> > > > > >>> > > > > > > >>>>>>>>>> requirements
> > > > > >>> about the
> > > > > >>> > > > > > nature
> > > > > >>> > > > > > > >>>>>>>>>> of data that you can use in Streams. For
> > > > example,
> > > > > >>> you don't
> > > > > >>> > > > > > > >>> have
> > > > > >>> > > > > > > >>>>>>>>>> to implement hashCode and equals, or
> > > > compareTo,
> > > > > >>> etc. With the
> > > > > >>> > > > > > > >>>>>>>>>> current proposal, we can do an airtight
> > > > > >>> comparison based only
> > > > > >>> > > > > > > >>> on
> > > > > >>> > > > > > > >>>>>>>>>> the serialized form of the values, and we
> > > > > >>> actually don't have
> > > > > >>> > > > > > > >>> to
> > > > > >>> > > > > > > >>>>>>>>>> deserialize the "prior" value at all for a
> > > > large
> > > > > >>> number of
> > > > > >>> > > > > > > >>>>>>>>>> operations. Admitedly, if we extend the
> > > > proposal
> > > > > >>> to include
> > > > > >>> > > > > > > >>> no-op
> > > > > >>> > > > > > > >>>>>>>>>> detection for stateless operations, we'd
> > > > probably
> > > > > >>> need to rely
> > > > > >>> > > > > > > >>> on
> > > > > >>> > > > > > > >>>>>>>>>> equals() for no-op checking, otherwise we'd
> > > > wind
> > > > > >>> up requiring
> > > > > >>> > > > > > > >>>>>>>>>> serdes for stateless operations as well.
> > > > > >>> Actually, I'd
> > > > > >>> > > > > > probably
> > > > > >>> > > > > > > >>>>>>>>>> argue for doing exactly that:
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> 1. In stateful operations, drop if the
> > > > serialized
> > > > > >>> byte[]s are
> > > > > >>> > > > > > > >>> the
> > > > > >>> > > > > > > >>>>>>>>>> same. After deserializing, also drop if the
> > > > > >>> objects are equal
> > > > > >>> > > > > > > >>>>>>>>>> according to Object#equals().
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> 2. In stateless operations, compare the 
> > > > > >>> > > > > > > >>>>>>>>>> "new"
> > > > and
> > > > > >>> "old" values
> > > > > >>> > > > > > > >>>>>>>>>> (if "old" is available) based on
> > > > Object#equals().
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> 3. As a final optimization, after 
> > > > > >>> > > > > > > >>>>>>>>>> serializing
> > > > and
> > > > > >>> before
> > > > > >>> > > > > > > >>> sending
> > > > > >>> > > > > > > >>>>>>>>>> repartition records, compare the serialized
> > > > data
> > > > > >>> and drop
> > > > > >>> > > > > > > >>>>>>>>>> no-ops.
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> This way, we still don't have to rely on 
> > > > > >>> > > > > > > >>>>>>>>>> the
> > > > > >>> existence of an
> > > > > >>> > > > > > > >>>>>>>>>> equals() method, but if it is there, we can
> > > > > >>> benefit from it.
> > > > > >>> > > > > > > >>>>>>>>>> Also, we don't require a serde in any new
> > > > > >>> situations, but we
> > > > > >>> > > > > > > >>> can
> > > > > >>> > > > > > > >>>>>>>>>> still leverage it when it is available.
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> For clarity, in my example above, even if 
> > > > > >>> > > > > > > >>>>>>>>>> the
> > > > > >>> employeeInfo and
> > > > > >>> > > > > > > >>>>>>>>>> employeePayroll and Result records all have
> > > > > >>> serdes, we need
> > > > > >>> > > > > > the
> > > > > >>> > > > > > > >>>>>>>>>> "name" field (presumably String) and the
> > > > "salary"
> > > > > >>> field
> > > > > >>> > > > > > > >>>>>>>>>> (presumable a Double) to have serdes as 
> > > > > >>> > > > > > > >>>>>>>>>> well
> > > > in
> > > > > >>> the naive
> > > > > >>> > > > > > > >>>>>>>>>> implementation. But if we can leverage
> > > > equals(),
> > > > > >>> then the
> > > > > >>> > > > > > > >>> "right
> > > > > >>> > > > > > > >>>>>>>>>> thing" happens automatically.
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> I still don't totally follow why the 
> > > > > >>> > > > > > > >>>>>>>>> individual
> > > > > >>> components
> > > > > >>> > > > > > > >>> (name,
> > > > > >>> > > > > > > >>>>>>>>> salary) would have to have serdes here. If
> > > > Result
> > > > > >>> has one, we
> > > > > >>> > > > > > > >>>>>>>>> compare bytes, and if Result additionally 
> > > > > >>> > > > > > > >>>>>>>>> has
> > > > an
> > > > > >>> equals()
> > > > > >>> > > > > > method
> > > > > >>> > > > > > > >>>>>>>>> (which presumably includes equals 
> > > > > >>> > > > > > > >>>>>>>>> comparisons
> > > > on
> > > > > >>> the
> > > > > >>> > > > > > constituent
> > > > > >>> > > > > > > >>>>>>>>> fields), have we not covered our bases?
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> This dovetails in with my primary UX 
> > > > > >>> > > > > > > >>>>>>>>>> concern;
> > > > > >>> where would the
> > > > > >>> > > > > > > >>>>>>>>>> ChangeDetector actually be registered? 
> > > > > >>> > > > > > > >>>>>>>>>> None of
> > > > > >>> the operators
> > > > > >>> > > > > > in
> > > > > >>> > > > > > > >>>>>>>>>> my example have names or topics or any 
> > > > > >>> > > > > > > >>>>>>>>>> other
> > > > > >>> identifiable
> > > > > >>> > > > > > > >>>>>>>>>> characteristic that could be passed to a
> > > > > >>> ChangeDetector class
> > > > > >>> > > > > > > >>>>>>>>>> registered via config. You could say that 
> > > > > >>> > > > > > > >>>>>>>>>> we
> > > > make
> > > > > >>> > > > > > > >>> ChangeDetector
> > > > > >>> > > > > > > >>>>>>>>>> an optional parameter to every operation in
> > > > > >>> Streams, but this
> > > > > >>> > > > > > > >>>>>>>>>> seems to carry quite a bit of mental burden
> > > > with
> > > > > >>> it. People
> > > > > >>> > > > > > > >>> will
> > > > > >>> > > > > > > >>>>>>>>>> wonder what it's for and whether or not 
> > > > > >>> > > > > > > >>>>>>>>>> they
> > > > > >>> should be using
> > > > > >>> > > > > > > >>> it.
> > > > > >>> > > > > > > >>>>>>>>>> There would almost certainly be a
> > > > misconception
> > > > > >>> that it's
> > > > > >>> > > > > > > >>>>>>>>>> preferable to implement it always, which
> > > > would be
> > > > > >>> unfortunate.
> > > > > >>> > > > > > > >>>>>>>>>> Plus, to actually implment metadata flowing
> > > > > >>> through the
> > > > > >>> > > > > > > >>> topology
> > > > > >>> > > > > > > >>>>>>>>>> as in your use case, you'd have to do two
> > > > things:
> > > > > >>> 1. make sure
> > > > > >>> > > > > > > >>>>>>>>>> that all operations actually preserve the
> > > > > >>> metadata alongside
> > > > > >>> > > > > > > >>> the
> > > > > >>> > > > > > > >>>>>>>>>> data (e.g., don't accidentally add a 
> > > > > >>> > > > > > > >>>>>>>>>> mapValues
> > > > > >>> like I did, or
> > > > > >>> > > > > > > >>> you
> > > > > >>> > > > > > > >>>>>>>>>> drop the metadata). 2. implement a
> > > > ChangeDetector
> > > > > >>> for every
> > > > > >>> > > > > > > >>>>>>>>>> single operation in the topology, or you 
> > > > > >>> > > > > > > >>>>>>>>>> don't
> > > > > >>> get the benefit
> > > > > >>> > > > > > > >>> of
> > > > > >>> > > > > > > >>>>>>>>>> dropping non-changes internally 2b.
> > > > > >>> Alternatively, you could
> > > > > >>> > > > > > > >>> just
> > > > > >>> > > > > > > >>>>>>>>>> add the ChangeDetector to one operation 
> > > > > >>> > > > > > > >>>>>>>>>> toward
> > > > > >>> the end of the
> > > > > >>> > > > > > > >>>>>>>>>> topology. This would not drop redundant
> > > > > >>> computation
> > > > > >>> > > > > > internally,
> > > > > >>> > > > > > > >>>>>>>>>> but only drop redundant _outputs_. But 
> > > > > >>> > > > > > > >>>>>>>>>> this is
> > > > > >>> just about the
> > > > > >>> > > > > > > >>>>>>>>>> same as your current solution.
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> I definitely see your point regarding
> > > > > >>> configuration. I was
> > > > > >>> > > > > > > >>>>>>>>> originally thinking about this when the
> > > > > >>> deduplication was going
> > > > > >>> > > > > > > >>> to
> > > > > >>> > > > > > > >>>>>>>>> be opt-in, and it seemed very natural to say
> > > > > >>> something like:
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> employeeInfo.join(employeePayroll, (info,
> > > > payroll)
> > > > > >>> -> new
> > > > > >>> > > > > > > >>>>>>>>> Result(info.name(), payroll.salary()))
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> .suppress(duplicatesAccordingTo(someChangeDetector))
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> Alternatively you can imagine a similar 
> > > > > >>> > > > > > > >>>>>>>>> method
> > > > > >>> being on
> > > > > >>> > > > > > > >>>>>>>>> Materialized, though obviously this makes 
> > > > > >>> > > > > > > >>>>>>>>> less
> > > > > >>> sense if we
> > > > > >>> > > > > > don't
> > > > > >>> > > > > > > >>>>>>>>> want to require materialization. If we're 
> > > > > >>> > > > > > > >>>>>>>>> now
> > > > > >>> talking about
> > > > > >>> > > > > > > >>>>>>>>> changing the default behavior and not having
> > > > any
> > > > > >>> configuration
> > > > > >>> > > > > > > >>>>>>>>> options, it's harder to find a place for 
> > > > > >>> > > > > > > >>>>>>>>> this.
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> A final thought; if it really is a metadata
> > > > > >>> question, can we
> > > > > >>> > > > > > > >>> just
> > > > > >>> > > > > > > >>>>>>>>>> plan to finish up the support for headers 
> > > > > >>> > > > > > > >>>>>>>>>> in
> > > > > >>> Streams? I.e.,
> > > > > >>> > > > > > > >>> give
> > > > > >>> > > > > > > >>>>>>>>>> you a way to control the way that headers 
> > > > > >>> > > > > > > >>>>>>>>>> flow
> > > > > >>> through the
> > > > > >>> > > > > > > >>>>>>>>>> topology? Then, we could treat headers the
> > > > same
> > > > > >>> way we treat
> > > > > >>> > > > > > > >>>>>>>>>> timestamps in the no-op checking... We
> > > > completely
> > > > > >>> ignore them
> > > > > >>> > > > > > > >>>>>>>>>> for the sake of comparison. Thus, neither 
> > > > > >>> > > > > > > >>>>>>>>>> the
> > > > > >>> timestamp nor
> > > > > >>> > > > > > the
> > > > > >>> > > > > > > >>>>>>>>>> headers would get updated in internal state
> > > > or in
> > > > > >>> downstream
> > > > > >>> > > > > > > >>>>>>>>>> views as long as the value itself doesn't
> > > > change.
> > > > > >>> This seems
> > > > > >>> > > > > > to
> > > > > >>> > > > > > > >>>>>>>>>> give us a way to support your use case 
> > > > > >>> > > > > > > >>>>>>>>>> without
> > > > > >>> adding to the
> > > > > >>> > > > > > > >>>>>>>>>> mental overhead of using Streams for simple
> > > > > >>> things.
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> Agree headers could be a decent fit for this
> > > > > >>> particular case
> > > > > >>> > > > > > > >>>>>>>>> because it's mostly metadata, though to be
> > > > honest
> > > > > >>> we haven't
> > > > > >>> > > > > > > >>> looked
> > > > > >>> > > > > > > >>>>>>>>> at headers much (mostly because, and to your
> > > > > >>> point, support
> > > > > >>> > > > > > > >>> seems
> > > > > >>> > > > > > > >>>>>>>>> to be lacking). I feel like there would be
> > > > other
> > > > > >>> cases where
> > > > > >>> > > > > > > >>> this
> > > > > >>> > > > > > > >>>>>>>>> feature could be valuable, but I admit I 
> > > > > >>> > > > > > > >>>>>>>>> can't
> > > > > >>> come up with
> > > > > >>> > > > > > > >>>>>>>>> anything right this second. Perhaps 
> > > > > >>> > > > > > > >>>>>>>>> yuzhihong
> > > > had
> > > > > >>> an example in
> > > > > >>> > > > > > > >>>>>>>>> mind?
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> I.e., simple things should be easy, and
> > > > complex
> > > > > >>> things should
> > > > > >>> > > > > > > >>> be
> > > > > >>> > > > > > > >>>>>>>>>> possible.
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> What are your thoughts? Thanks, -John
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19, Thomas 
> > > > > >>> > > > > > > >>>>>>>>>> Becker
> > > > > >>> wrote:
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> Hi John, Can you describe how you'd use
> > > > > >>> filtering/mapping to
> > > > > >>> > > > > > > >>>>>>>>>> deduplicate records? To give some 
> > > > > >>> > > > > > > >>>>>>>>>> background
> > > > on
> > > > > >>> my suggestion
> > > > > >>> > > > > > > >>> we
> > > > > >>> > > > > > > >>>>>>>>>> currently have a small stream processor 
> > > > > >>> > > > > > > >>>>>>>>>> that
> > > > > >>> exists solely to
> > > > > >>> > > > > > > >>>>>>>>>> deduplicate, which we do using a process 
> > > > > >>> > > > > > > >>>>>>>>>> that
> > > > I
> > > > > >>> assume would
> > > > > >>> > > > > > be
> > > > > >>> > > > > > > >>>>>>>>>> similar to what would be done here (with a
> > > > store
> > > > > >>> of keys and
> > > > > >>> > > > > > > >>> hash
> > > > > >>> > > > > > > >>>>>>>>>> values). But the records we are 
> > > > > >>> > > > > > > >>>>>>>>>> deduplicating
> > > > > >>> have some
> > > > > >>> > > > > > > >>> metadata
> > > > > >>> > > > > > > >>>>>>>>>> fields (such as timestamps of when the 
> > > > > >>> > > > > > > >>>>>>>>>> record
> > > > was
> > > > > >>> posted) that
> > > > > >>> > > > > > > >>> we
> > > > > >>> > > > > > > >>>>>>>>>> don't consider semantically meaningful for
> > > > > >>> downstream
> > > > > >>> > > > > > > >>> consumers,
> > > > > >>> > > > > > > >>>>>>>>>> and therefore we also suppress updates that
> > > > only
> > > > > >>> touch those
> > > > > >>> > > > > > > >>>>>>>>>> fields.
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> -Tommy
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30 -0600, John
> > > > Roesler
> > > > > >>> wrote:
> > > > > >>> > > > > > > >>> [EXTERNAL
> > > > > >>> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was sent from
> > > > > >>> outside TiVo. DO
> > > > > >>> > > > > > NOT
> > > > > >>> > > > > > > >>>>>>>>>> CLICK any links or attachments unless you
> > > > > >>> expected them.
> > > > > >>> > > > > > > >>>>>>>>>> ________________________________
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s an 
> > > > > >>> > > > > > > >>>>>>>>>> interesting
> > > > > >>> idea. Can you
> > > > > >>> > > > > > > >>> help
> > > > > >>> > > > > > > >>>>>>>>>> think of a use case that isn’t also served 
> > > > > >>> > > > > > > >>>>>>>>>> by
> > > > > >>> filtering or
> > > > > >>> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for helping to
> > > > design
> > > > > >>> this feature!
> > > > > >>> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at 18:56,
> > > > > >>> yuzhih...@gmail.com
> > > > > >>> > > > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com> wrote: I think
> > > > this
> > > > > >>> is good
> > > > > >>> > > > > > > >>> idea. On
> > > > > >>> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas Becker <
> > > > > >>> > > > > > > >>> thomas.bec...@tivo.com
> > > > > >>> > > > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com>> wrote: 
> > > > > >>> > > > > > > >>>>>>>>>> How
> > > > do
> > > > > >>> folks feel
> > > > > >>> > > > > > > >>> about
> > > > > >>> > > > > > > >>>>>>>>>> allowing the mechanism by which no-ops are
> > > > > >>> detected to be
> > > > > >>> > > > > > > >>>>>>>>>> pluggable? Meaning use something like a 
> > > > > >>> > > > > > > >>>>>>>>>> hash
> > > > by
> > > > > >>> default, but
> > > > > >>> > > > > > > >>> you
> > > > > >>> > > > > > > >>>>>>>>>> could optionally provide an implementation 
> > > > > >>> > > > > > > >>>>>>>>>> of
> > > > > >>> something to use
> > > > > >>> > > > > > > >>>>>>>>>> instead, like a ChangeDetector. This could 
> > > > > >>> > > > > > > >>>>>>>>>> be
> > > > > >>> useful for
> > > > > >>> > > > > > > >>> example
> > > > > >>> > > > > > > >>>>>>>>>> to ignore changes to certain fields, which 
> > > > > >>> > > > > > > >>>>>>>>>> may
> > > > > >>> not be relevant
> > > > > >>> > > > > > > >>> to
> > > > > >>> > > > > > > >>>>>>>>>> the operation being performed.
> > > > > >>> > > > > > ________________________________
> > > > > >>> > > > > > > >>>>>>>>>> From: John Roesler <vvcep...@apache.org
> > > > > >>> > > > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>> Sent: Friday,
> > > > > >>> January 31, 2020
> > > > > >>> > > > > > > >>> 4:51
> > > > > >>> > > > > > > >>>>>>>>>> PM To: dev@kafka.apache.org <mailto:
> > > > > >>> dev@kafka.apache.org>
> > > > > >>> > > > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto:
> > > > > >>> dev@kafka.apache.org>> Subject:
> > > > > >>> > > > > > > >>> Re:
> > > > > >>> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change support for
> > > > Kafka
> > > > > >>> Streams
> > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was
> > > > sent
> > > > > >>> from outside
> > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments
> > > > > >>> unless you
> > > > > >>> > > > > > expected
> > > > > >>> > > > > > > >>>>>>>>>> them. ________________________________
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> Hello all, Sorry for my silence. It seems
> > > > like we
> > > > > >>> are getting
> > > > > >>> > > > > > > >>>>>>>>>> close to consensus. Hopefully, we could 
> > > > > >>> > > > > > > >>>>>>>>>> move
> > > > to a
> > > > > >>> vote soon!
> > > > > >>> > > > > > > >>> All
> > > > > >>> > > > > > > >>>>>>>>>> of the reasoning from Matthias and Bruno
> > > > around
> > > > > >>> timestamp is
> > > > > >>> > > > > > > >>>>>>>>>> compelling. I would be strongly in favor of
> > > > > >>> stating a few
> > > > > >>> > > > > > > >>> things
> > > > > >>> > > > > > > >>>>>>>>>> very clearly in the KIP: 1. Streams will 
> > > > > >>> > > > > > > >>>>>>>>>> drop
> > > > > >>> no-op updates
> > > > > >>> > > > > > > >>> only
> > > > > >>> > > > > > > >>>>>>>>>> for KTable operations. That is, we won't 
> > > > > >>> > > > > > > >>>>>>>>>> make
> > > > any
> > > > > >>> changes to
> > > > > >>> > > > > > > >>>>>>>>>> KStream aggregations at the moment. It does
> > > > seem
> > > > > >>> like we can
> > > > > >>> > > > > > > >>>>>>>>>> potentially revisit the time semantics of 
> > > > > >>> > > > > > > >>>>>>>>>> that
> > > > > >>> operation in
> > > > > >>> > > > > > the
> > > > > >>> > > > > > > >>>>>>>>>> future, but we don't need to do it now. On 
> > > > > >>> > > > > > > >>>>>>>>>> the
> > > > > >>> other hand, the
> > > > > >>> > > > > > > >>>>>>>>>> proposed semantics for KTable timestamps
> > > > (marking
> > > > > >>> the
> > > > > >>> > > > > > beginning
> > > > > >>> > > > > > > >>>>>>>>>> of the validity of that record) makes 
> > > > > >>> > > > > > > >>>>>>>>>> sense to
> > > > > >>> me. 2. Streams
> > > > > >>> > > > > > > >>>>>>>>>> will only drop no-op updates for _stateful_
> > > > > >>> KTable operations.
> > > > > >>> > > > > > > >>> We
> > > > > >>> > > > > > > >>>>>>>>>> don't want to add a hard guarantee that
> > > > Streams
> > > > > >>> will _never_
> > > > > >>> > > > > > > >>>>>>>>>> emit a no-op table update because it would
> > > > > >>> require adding
> > > > > >>> > > > > > state
> > > > > >>> > > > > > > >>>>>>>>>> to otherwise stateless operations. If 
> > > > > >>> > > > > > > >>>>>>>>>> someone
> > > > is
> > > > > >>> really
> > > > > >>> > > > > > > >>> concerned
> > > > > >>> > > > > > > >>>>>>>>>> about a particular stateless operation
> > > > producing
> > > > > >>> a lot of
> > > > > >>> > > > > > no-op
> > > > > >>> > > > > > > >>>>>>>>>> results, all they have to do is materialize
> > > > it,
> > > > > >>> and Streams
> > > > > >>> > > > > > > >>> would
> > > > > >>> > > > > > > >>>>>>>>>> automatically drop the no-ops. 
> > > > > >>> > > > > > > >>>>>>>>>> Additionally,
> > > > I'm
> > > > > >>> +1 on not
> > > > > >>> > > > > > > >>> adding
> > > > > >>> > > > > > > >>>>>>>>>> an opt-out at this time. Regarding the KIP
> > > > > >>> itself, I would
> > > > > >>> > > > > > > >>> clean
> > > > > >>> > > > > > > >>>>>>>>>> it up a bit before calling for a vote. 
> > > > > >>> > > > > > > >>>>>>>>>> There
> > > > is a
> > > > > >>> lot of
> > > > > >>> > > > > > > >>>>>>>>>> "discussion"-type language there, which is
> > > > very
> > > > > >>> natural to
> > > > > >>> > > > > > > >>> read,
> > > > > >>> > > > > > > >>>>>>>>>> but makes it a bit hard to see what 
> > > > > >>> > > > > > > >>>>>>>>>> _exactly_
> > > > the
> > > > > >>> kip is
> > > > > >>> > > > > > > >>>>>>>>>> proposing. Richard, would you mind just 
> > > > > >>> > > > > > > >>>>>>>>>> making
> > > > > >>> the "proposed
> > > > > >>> > > > > > > >>>>>>>>>> behavior change" a simple and succinct 
> > > > > >>> > > > > > > >>>>>>>>>> list of
> > > > > >>> bullet points?
> > > > > >>> > > > > > > >>>>>>>>>> I.e., please drop glue phrases like "there 
> > > > > >>> > > > > > > >>>>>>>>>> has
> > > > > >>> been some
> > > > > >>> > > > > > > >>>>>>>>>> discussion" or "possibly we could do X". 
> > > > > >>> > > > > > > >>>>>>>>>> For
> > > > the
> > > > > >>> final version
> > > > > >>> > > > > > > >>> of
> > > > > >>> > > > > > > >>>>>>>>>> the KIP, it should just say, "Streams will 
> > > > > >>> > > > > > > >>>>>>>>>> do
> > > > X,
> > > > > >>> Streams will
> > > > > >>> > > > > > > >>> do
> > > > > >>> > > > > > > >>>>>>>>>> Y". Feel free to add an elaboration 
> > > > > >>> > > > > > > >>>>>>>>>> section to
> > > > > >>> explain more
> > > > > >>> > > > > > > >>> about
> > > > > >>> > > > > > > >>>>>>>>>> what X and Y mean, but we don't need to 
> > > > > >>> > > > > > > >>>>>>>>>> talk
> > > > about
> > > > > >>> > > > > > > >>> possibilities
> > > > > >>> > > > > > > >>>>>>>>>> or alternatives except in the "rejected
> > > > > >>> alternatives" section.
> > > > > >>> > > > > > > >>>>>>>>>> Accordingly, can you also move the options 
> > > > > >>> > > > > > > >>>>>>>>>> you
> > > > > >>> presented in
> > > > > >>> > > > > > the
> > > > > >>> > > > > > > >>>>>>>>>> intro to the "rejected alternatives" 
> > > > > >>> > > > > > > >>>>>>>>>> section
> > > > and
> > > > > >>> only mention
> > > > > >>> > > > > > > >>> the
> > > > > >>> > > > > > > >>>>>>>>>> final proposal itself? This just really 
> > > > > >>> > > > > > > >>>>>>>>>> helps
> > > > > >>> reviewers to
> > > > > >>> > > > > > know
> > > > > >>> > > > > > > >>>>>>>>>> what they are voting for, and it helps
> > > > everyone
> > > > > >>> after the fact
> > > > > >>> > > > > > > >>>>>>>>>> when they are trying to get clarity on what
> > > > > >>> exactly the
> > > > > >>> > > > > > > >>> proposal
> > > > > >>> > > > > > > >>>>>>>>>> is, versus all the things it could have 
> > > > > >>> > > > > > > >>>>>>>>>> been.
> > > > > >>> Thanks, -John
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14, Richard Yu
> > > > wrote:
> > > > > >>> Hello to
> > > > > >>> > > > > > all,
> > > > > >>> > > > > > > >>>>>>>>>> I've finished making some initial
> > > > modifications
> > > > > >>> to the KIP. I
> > > > > >>> > > > > > > >>>>>>>>>> have decided to keep the implementation
> > > > section
> > > > > >>> in the KIP for
> > > > > >>> > > > > > > >>>>>>>>>> record-keeping purposes. For now, we should
> > > > focus
> > > > > >>> on only the
> > > > > >>> > > > > > > >>>>>>>>>> proposed behavior changes instead. See if 
> > > > > >>> > > > > > > >>>>>>>>>> you
> > > > > >>> have any
> > > > > >>> > > > > > > >>> comments!
> > > > > >>> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25, 2020 at 
> > > > > >>> > > > > > > >>>>>>>>>> 11:12
> > > > AM
> > > > > >>> Richard Yu
> > > > > >>> > > > > > > >>>>>>>>>> <yohan.richard...@gmail.com <mailto:
> > > > > >>> > > > > > yohan.richard...@gmail.com
> > > > > >>> > > > > > > >>>>>
> > > > > >>> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all the 
> > > > > >>> > > > > > > >>>>>>>>>> discussion!
> > > > > >>> @John and @Bruno
> > > > > >>> > > > > > > >>> I
> > > > > >>> > > > > > > >>>>>>>>>> will survey other possible systems and see
> > > > what I
> > > > > >>> can do. Just
> > > > > >>> > > > > > > >>> a
> > > > > >>> > > > > > > >>>>>>>>>> question, by systems, I suppose you would 
> > > > > >>> > > > > > > >>>>>>>>>> mean
> > > > > >>> the pros and
> > > > > >>> > > > > > > >>> cons
> > > > > >>> > > > > > > >>>>>>>>>> of different reporting strategies? I'm not
> > > > > >>> completely certain
> > > > > >>> > > > > > > >>> on
> > > > > >>> > > > > > > >>>>>>>>>> this point, so it would be great if you can
> > > > > >>> clarify on that.
> > > > > >>> > > > > > So
> > > > > >>> > > > > > > >>>>>>>>>> here's what I got from all the discussion 
> > > > > >>> > > > > > > >>>>>>>>>> so
> > > > far:
> > > > > >>> - Since both
> > > > > >>> > > > > > > >>>>>>>>>> Matthias and John seems to have come to a
> > > > > >>> consensus on this,
> > > > > >>> > > > > > > >>> then
> > > > > >>> > > > > > > >>>>>>>>>> we will go for an all-round behavorial 
> > > > > >>> > > > > > > >>>>>>>>>> change
> > > > for
> > > > > >>> KTables.
> > > > > >>> > > > > > > >>> After
> > > > > >>> > > > > > > >>>>>>>>>> some thought, I decided that for now, an
> > > > opt-out
> > > > > >>> config will
> > > > > >>> > > > > > > >>> not
> > > > > >>> > > > > > > >>>>>>>>>> be added. As John have pointed out, no-op
> > > > changes
> > > > > >>> tend to
> > > > > >>> > > > > > > >>>>>>>>>> explode further down the topology as they 
> > > > > >>> > > > > > > >>>>>>>>>> are
> > > > > >>> forwarded to
> > > > > >>> > > > > > more
> > > > > >>> > > > > > > >>>>>>>>>> and more processor nodes downstream. - 
> > > > > >>> > > > > > > >>>>>>>>>> About
> > > > > >>> using hash codes,
> > > > > >>> > > > > > > >>>>>>>>>> after some explanation from John, it looks
> > > > like
> > > > > >>> hash codes
> > > > > >>> > > > > > > >>> might
> > > > > >>> > > > > > > >>>>>>>>>> not be as ideal (for implementation). For
> > > > now, we
> > > > > >>> will omit
> > > > > >>> > > > > > > >>> that
> > > > > >>> > > > > > > >>>>>>>>>> detail, and save it for the PR. - @Bruno 
> > > > > >>> > > > > > > >>>>>>>>>> You
> > > > do
> > > > > >>> have valid
> > > > > >>> > > > > > > >>>>>>>>>> concerns. Though, I am not completely 
> > > > > >>> > > > > > > >>>>>>>>>> certain
> > > > if
> > > > > >>> we want to do
> > > > > >>> > > > > > > >>>>>>>>>> emit-on-change only for materialized 
> > > > > >>> > > > > > > >>>>>>>>>> KTables.
> > > > I
> > > > > >>> will put it
> > > > > >>> > > > > > > >>> down
> > > > > >>> > > > > > > >>>>>>>>>> in the KIP regardless. I will do my best to
> > > > > >>> address all points
> > > > > >>> > > > > > > >>>>>>>>>> raised so far on the discussion. Hope we 
> > > > > >>> > > > > > > >>>>>>>>>> could
> > > > > >>> keep this
> > > > > >>> > > > > > going!
> > > > > >>> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24, 2020 at 6:07 
> > > > > >>> > > > > > > >>>>>>>>>> PM
> > > > > >>> Bruno Cadonna
> > > > > >>> > > > > > > >>>>>>>>>> <br...@confluent.io <mailto:
> > > > br...@confluent.io>>
> > > > > >>> wrote: Thank
> > > > > >>> > > > > > > >>> you
> > > > > >>> > > > > > > >>>>>>>>>> Matthias for the use cases! Looking at both
> > > > use
> > > > > >>> cases, I think
> > > > > >>> > > > > > > >>>>>>>>>> you need to elaborate on them in the KIP,
> > > > > >>> Richard. Emit from
> > > > > >>> > > > > > > >>>>>>>>>> plain KTable: I agree with Matthias that 
> > > > > >>> > > > > > > >>>>>>>>>> the
> > > > > >>> lower timestamp
> > > > > >>> > > > > > > >>>>>>>>>> makes sense because it marks the start of 
> > > > > >>> > > > > > > >>>>>>>>>> the
> > > > > >>> validity of the
> > > > > >>> > > > > > > >>>>>>>>>> record. Idempotent records with a higher
> > > > > >>> timestamp can be
> > > > > >>> > > > > > > >>> safely
> > > > > >>> > > > > > > >>>>>>>>>> ignored. A corner case that I discussed 
> > > > > >>> > > > > > > >>>>>>>>>> with
> > > > > >>> Matthias offline
> > > > > >>> > > > > > > >>> is
> > > > > >>> > > > > > > >>>>>>>>>> when we do not materialize a KTable due to
> > > > > >>> optimization. Then
> > > > > >>> > > > > > > >>> we
> > > > > >>> > > > > > > >>>>>>>>>> cannot avoid the idempotent records because
> > > > we do
> > > > > >>> not keep the
> > > > > >>> > > > > > > >>>>>>>>>> first record with the lower timestamp to
> > > > compare
> > > > > >>> to. Emit from
> > > > > >>> > > > > > > >>>>>>>>>> KTable with aggregations: If we specify 
> > > > > >>> > > > > > > >>>>>>>>>> that
> > > > an
> > > > > >>> aggregation
> > > > > >>> > > > > > > >>>>>>>>>> result should have the highest timestamp of
> > > > the
> > > > > >>> records that
> > > > > >>> > > > > > > >>>>>>>>>> participated in the aggregation, we cannot
> > > > ignore
> > > > > >>> any
> > > > > >>> > > > > > > >>> idempotent
> > > > > >>> > > > > > > >>>>>>>>>> records. Admittedly, the result of an
> > > > aggregation
> > > > > >>> usually
> > > > > >>> > > > > > > >>>>>>>>>> changes, but there are aggregations where 
> > > > > >>> > > > > > > >>>>>>>>>> the
> > > > > >>> result may not
> > > > > >>> > > > > > > >>>>>>>>>> change like min and max, or sum when the
> > > > incoming
> > > > > >>> records have
> > > > > >>> > > > > > > >>> a
> > > > > >>> > > > > > > >>>>>>>>>> value of zero. In those cases, we could
> > > > benefit
> > > > > >>> of the emit on
> > > > > >>> > > > > > > >>>>>>>>>> change, but only if we define the semantics
> > > > of the
> > > > > >>> > > > > > aggregations
> > > > > >>> > > > > > > >>>>>>>>>> to not use the highest timestamp of the
> > > > > >>> participating records
> > > > > >>> > > > > > > >>> for
> > > > > >>> > > > > > > >>>>>>>>>> the result. In Kafka Streams, we do not 
> > > > > >>> > > > > > > >>>>>>>>>> have
> > > > min,
> > > > > >>> max, and sum
> > > > > >>> > > > > > > >>> as
> > > > > >>> > > > > > > >>>>>>>>>> explicit aggregations, but we need to 
> > > > > >>> > > > > > > >>>>>>>>>> provide
> > > > an
> > > > > >>> API to define
> > > > > >>> > > > > > > >>>>>>>>>> what timestamp should be used for the 
> > > > > >>> > > > > > > >>>>>>>>>> result
> > > > of
> > > > > >>> an aggregation
> > > > > >>> > > > > > > >>> if
> > > > > >>> > > > > > > >>>>>>>>>> we want to go down this path. All of this 
> > > > > >>> > > > > > > >>>>>>>>>> does
> > > > > >>> not block this
> > > > > >>> > > > > > > >>> KIP
> > > > > >>> > > > > > > >>>>>>>>>> and I just wanted to put this aspects up 
> > > > > >>> > > > > > > >>>>>>>>>> for
> > > > > >>> discussion. The
> > > > > >>> > > > > > > >>> KIP
> > > > > >>> > > > > > > >>>>>>>>>> can limit itself to emit from materialized
> > > > > >>> KTables. However,
> > > > > >>> > > > > > > >>> the
> > > > > >>> > > > > > > >>>>>>>>>> limits should be explicitly stated in the 
> > > > > >>> > > > > > > >>>>>>>>>> KIP.
> > > > > >>> Best, Bruno
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias 
> > > > > >>> > > > > > > >>>>>>>>>> J.
> > > > Sax
> > > > > >>> > > > > > > >>>>>>>>>> <matth...@confluent.io <mailto:
> > > > > >>> matth...@confluent.io>> wrote:
> > > > > >>> > > > > > > >>>>>>>>>> IMHO, the question about semantics depends 
> > > > > >>> > > > > > > >>>>>>>>>> on
> > > > the
> > > > > >>> use case, in
> > > > > >>> > > > > > > >>>>>>>>>> particular on the origin of a KTable. If
> > > > there is
> > > > > >>> a changlog
> > > > > >>> > > > > > > >>>>>>>>>> topic that one reads directly into a 
> > > > > >>> > > > > > > >>>>>>>>>> KTable,
> > > > > >>> emit-on-change
> > > > > >>> > > > > > > >>> does
> > > > > >>> > > > > > > >>>>>>>>>> actually make sense, because the timestamp
> > > > > >>> indicates _when_
> > > > > >>> > > > > > the
> > > > > >>> > > > > > > >>>>>>>>>> update was _effective_. For this case, it 
> > > > > >>> > > > > > > >>>>>>>>>> is
> > > > > >>> semantically
> > > > > >>> > > > > > sound
> > > > > >>> > > > > > > >>>>>>>>>> to _not_ update the timestamp in the store,
> > > > > >>> because the second
> > > > > >>> > > > > > > >>>>>>>>>> update is actually idempotent and advancing
> > > > the
> > > > > >>> timestamp is
> > > > > >>> > > > > > > >>> not
> > > > > >>> > > > > > > >>>>>>>>>> ideal (one could even consider it to be 
> > > > > >>> > > > > > > >>>>>>>>>> wrong
> > > > to
> > > > > >>> advance the
> > > > > >>> > > > > > > >>>>>>>>>> timestamp) because the "valid time" of the
> > > > record
> > > > > >>> pair did not
> > > > > >>> > > > > > > >>>>>>>>>> change. This reasoning also applies to
> > > > > >>> KTable-KTable joins.
> > > > > >>> > > > > > > >>>>>>>>>> However, if the KTable is the result of an
> > > > > >>> aggregation, I
> > > > > >>> > > > > > think
> > > > > >>> > > > > > > >>>>>>>>>> emit-on-update is more natural, because the
> > > > > >>> timestamp reflects
> > > > > >>> > > > > > > >>>>>>>>>> the _last_ time (ie, highest timestamp) of 
> > > > > >>> > > > > > > >>>>>>>>>> all
> > > > > >>> input records
> > > > > >>> > > > > > > >>> the
> > > > > >>> > > > > > > >>>>>>>>>> contributed to the result. Hence, updating 
> > > > > >>> > > > > > > >>>>>>>>>> the
> > > > > >>> timestamp and
> > > > > >>> > > > > > > >>>>>>>>>> emitting a new record actually sounds 
> > > > > >>> > > > > > > >>>>>>>>>> correct
> > > > to
> > > > > >>> me. This
> > > > > >>> > > > > > > >>> applies
> > > > > >>> > > > > > > >>>>>>>>>> to windowed and non-windowed aggregations
> > > > IMHO.
> > > > > >>> However,
> > > > > >>> > > > > > > >>>>>>>>>> considering the argument that the timestamp
> > > > > >>> should not be
> > > > > >>> > > > > > > >>> update
> > > > > >>> > > > > > > >>>>>>>>>> in the first case in the store to begin 
> > > > > >>> > > > > > > >>>>>>>>>> with,
> > > > > >>> both cases are
> > > > > >>> > > > > > > >>>>>>>>>> actually the same, and both can be modeled 
> > > > > >>> > > > > > > >>>>>>>>>> as
> > > > > >>> emit-on-change:
> > > > > >>> > > > > > > >>> if
> > > > > >>> > > > > > > >>>>>>>>>> a `table()` operator does not update the
> > > > > >>> timestamp if the
> > > > > >>> > > > > > value
> > > > > >>> > > > > > > >>>>>>>>>> does not change, there is _no_ change and 
> > > > > >>> > > > > > > >>>>>>>>>> thus
> > > > > >>> nothing is
> > > > > >>> > > > > > > >>>>>>>>>> emitted. At the same time, if an 
> > > > > >>> > > > > > > >>>>>>>>>> aggregation
> > > > > >>> operator does
> > > > > >>> > > > > > > >>> update
> > > > > >>> > > > > > > >>>>>>>>>> the timestamp (even if the value does not
> > > > change)
> > > > > >>> there _is_ a
> > > > > >>> > > > > > > >>>>>>>>>> change and we emit. Note that handling
> > > > > >>> out-of-order data for
> > > > > >>> > > > > > > >>>>>>>>>> aggregations would also work seamlessly 
> > > > > >>> > > > > > > >>>>>>>>>> with
> > > > this
> > > > > >>> approach --
> > > > > >>> > > > > > > >>> for
> > > > > >>> > > > > > > >>>>>>>>>> out-of-order records, the timestamp does 
> > > > > >>> > > > > > > >>>>>>>>>> never
> > > > > >>> change, and
> > > > > >>> > > > > > > >>> thus,
> > > > > >>> > > > > > > >>>>>>>>>> we only emit if the result itself changes.
> > > > > >>> Therefore, I would
> > > > > >>> > > > > > > >>>>>>>>>> argue that we might not even need any 
> > > > > >>> > > > > > > >>>>>>>>>> config,
> > > > > >>> because the
> > > > > >>> > > > > > > >>>>>>>>>> emit-on-change behavior is just correct and
> > > > > >>> reduced the
> > > > > >>> > > > > > > >>>>>>>>>> downstream load, while our current 
> > > > > >>> > > > > > > >>>>>>>>>> behavior is
> > > > > >>> not ideal (even
> > > > > >>> > > > > > > >>> if
> > > > > >>> > > > > > > >>>>>>>>>> it's also correct). Thoughts? -Matthias On
> > > > > >>> 1/24/20 9:37 AM,
> > > > > >>> > > > > > > >>> John
> > > > > >>> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks for that
> > > > idea. I
> > > > > >>> hadn't
> > > > > >>> > > > > > > >>>>>>>>>> considered that option before, and it does
> > > > seem
> > > > > >>> like that
> > > > > >>> > > > > > would
> > > > > >>> > > > > > > >>>>>>>>>> be the right place to put it if we think it
> > > > might
> > > > > >>> be
> > > > > >>> > > > > > > >>> semantically
> > > > > >>> > > > > > > >>>>>>>>>> important to control on a table-by-table
> > > > basis. I
> > > > > >>> had been
> > > > > >>> > > > > > > >>>>>>>>>> thinking of it less semantically and more
> > > > > >>> practically. In the
> > > > > >>> > > > > > > >>>>>>>>>> context of a large topology, or more
> > > > generally, a
> > > > > >>> large
> > > > > >>> > > > > > > >>> software
> > > > > >>> > > > > > > >>>>>>>>>> system that contains many topologies and 
> > > > > >>> > > > > > > >>>>>>>>>> other
> > > > > >>> event-driven
> > > > > >>> > > > > > > >>>>>>>>>> systems, each no-op result becomes an input
> > > > that
> > > > > >>> is destined
> > > > > >>> > > > > > to
> > > > > >>> > > > > > > >>>>>>>>>> itself become a no-op result, and so on, 
> > > > > >>> > > > > > > >>>>>>>>>> all
> > > > the
> > > > > >>> way through
> > > > > >>> > > > > > > >>> the
> > > > > >>> > > > > > > >>>>>>>>>> system. Thus, a single pointless processing
> > > > > >>> result becomes
> > > > > >>> > > > > > > >>>>>>>>>> amplified into a large number of pointless
> > > > > >>> computations, cache
> > > > > >>> > > > > > > >>>>>>>>>> perturbations, and network and disk I/O
> > > > > >>> operations. If you
> > > > > >>> > > > > > also
> > > > > >>> > > > > > > >>>>>>>>>> consider operations with fan-out 
> > > > > >>> > > > > > > >>>>>>>>>> implications,
> > > > > >>> like branching
> > > > > >>> > > > > > > >>> or
> > > > > >>> > > > > > > >>>>>>>>>> foreign-key joins, the wasted resources are
> > > > > >>> amplified not just
> > > > > >>> > > > > > > >>> in
> > > > > >>> > > > > > > >>>>>>>>>> proportion to the size of the system, but 
> > > > > >>> > > > > > > >>>>>>>>>> the
> > > > > >>> size of the
> > > > > >>> > > > > > > >>> system
> > > > > >>> > > > > > > >>>>>>>>>> times the average fan-out (to the power of 
> > > > > >>> > > > > > > >>>>>>>>>> the
> > > > > >>> number of
> > > > > >>> > > > > > > >>> fan-out
> > > > > >>> > > > > > > >>>>>>>>>> operations on the path(s) through the
> > > > system). In
> > > > > >>> my time
> > > > > >>> > > > > > > >>>>>>>>>> operating such systems, I've observed these
> > > > > >>> effects to be very
> > > > > >>> > > > > > > >>>>>>>>>> real, and actually, the system and use case
> > > > > >>> doesn't have to be
> > > > > >>> > > > > > > >>>>>>>>>> very large before the amplification poses 
> > > > > >>> > > > > > > >>>>>>>>>> an
> > > > > >>> existential
> > > > > >>> > > > > > threat
> > > > > >>> > > > > > > >>>>>>>>>> to the system as a whole. This is the basis
> > > > of my
> > > > > >>> advocating
> > > > > >>> > > > > > > >>> for
> > > > > >>> > > > > > > >>>>>>>>>> a simple behavior change, rather than an
> > > > opt-in
> > > > > >>> config of any
> > > > > >>> > > > > > > >>>>>>>>>> kind. It seems like Streams should "do the
> > > > right
> > > > > >>> thing" for
> > > > > >>> > > > > > the
> > > > > >>> > > > > > > >>>>>>>>>> majority use case. My theory (which may be
> > > > wrong)
> > > > > >>> is that the
> > > > > >>> > > > > > > >>>>>>>>>> majority use case is more like "relational
> > > > > >>> queries" than "CEP
> > > > > >>> > > > > > > >>>>>>>>>> queries". Even if you were doing some
> > > > > >>> event-sensitive
> > > > > >>> > > > > > > >>>>>>>>>> computation, wouldn't you do them as Stream
> > > > > >>> operations (where
> > > > > >>> > > > > > > >>>>>>>>>> this feature is inapplicable anyway)? In
> > > > keeping
> > > > > >>> with the
> > > > > >>> > > > > > > >>>>>>>>>> "practical" perspective, I suggested the
> > > > opt-out
> > > > > >>> config only
> > > > > >>> > > > > > in
> > > > > >>> > > > > > > >>>>>>>>>> the (I think unlikely) event that filtering
> > > > out
> > > > > >>> pointless
> > > > > >>> > > > > > > >>> updates
> > > > > >>> > > > > > > >>>>>>>>>> actually harms performance. I'd also be
> > > > perfectly
> > > > > >>> fine without
> > > > > >>> > > > > > > >>>>>>>>>> the opt-out config. I really think that
> > > > (because
> > > > > >>> of the
> > > > > >>> > > > > > > >>> timestamp
> > > > > >>> > > > > > > >>>>>>>>>> semantics work already underway), we're
> > > > already
> > > > > >>> pre-fetching
> > > > > >>> > > > > > > >>> the
> > > > > >>> > > > > > > >>>>>>>>>> prior result most of the time, so there 
> > > > > >>> > > > > > > >>>>>>>>>> would
> > > > > >>> actually be very
> > > > > >>> > > > > > > >>>>>>>>>> little extra I/O involved in implementing
> > > > > >>> emit-on-change.
> > > > > >>> > > > > > > >>>>>>>>>> However, we should consider whether my
> > > > experience
> > > > > >>> is likely to
> > > > > >>> > > > > > > >>>>>>>>>> be general. Do you have some use case in 
> > > > > >>> > > > > > > >>>>>>>>>> mind
> > > > for
> > > > > >>> which you'd
> > > > > >>> > > > > > > >>>>>>>>>> actually want some KTable results to be
> > > > > >>> emit-on-update for
> > > > > >>> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno 
> > > > > >>> > > > > > > >>>>>>>>>> Cadonna
> > > > > >>> wrote: Hi
> > > > > >>> > > > > > > >>> Richard,
> > > > > >>> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree with John 
> > > > > >>> > > > > > > >>>>>>>>>> that
> > > > we
> > > > > >>> should focus
> > > > > >>> > > > > > > >>> on
> > > > > >>> > > > > > > >>>>>>>>>> the interface and behavior change in a 
> > > > > >>> > > > > > > >>>>>>>>>> KIP. We
> > > > > >>> can discuss the
> > > > > >>> > > > > > > >>>>>>>>>> implementation later. I am also +1 for the
> > > > > >>> survey. I had a
> > > > > >>> > > > > > > >>>>>>>>>> thought about this. Couldn't we consider
> > > > > >>> emit-on-change to be
> > > > > >>> > > > > > > >>> one
> > > > > >>> > > > > > > >>>>>>>>>> config of suppress (like 
> > > > > >>> > > > > > > >>>>>>>>>> `untilWindowCloses`)?
> > > > > >>> What you
> > > > > >>> > > > > > > >>>>>>>>>> basically propose is to suppress updates if
> > > > they
> > > > > >>> do not change
> > > > > >>> > > > > > > >>>>>>>>>> the result. Considering emit on change as a
> > > > > >>> flavour of
> > > > > >>> > > > > > suppress
> > > > > >>> > > > > > > >>>>>>>>>> would be more flexible because it would
> > > > specify
> > > > > >>> the behavior
> > > > > >>> > > > > > > >>>>>>>>>> locally for a KTable instead of globally 
> > > > > >>> > > > > > > >>>>>>>>>> for
> > > > all
> > > > > >>> KTables.
> > > > > >>> > > > > > > >>>>>>>>>> Additionally, specifying the behavior in 
> > > > > >>> > > > > > > >>>>>>>>>> one
> > > > > >>> place instead of
> > > > > >>> > > > > > > >>>>>>>>>> multiple places feels more intuitive and
> > > > > >>> consistent to me.
> > > > > >>> > > > > > > >>> Best,
> > > > > >>> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at 7:49 AM John
> > > > Roesler
> > > > > >>> > > > > > > >>>>>>>>>> <vvcep...@apache.org <mailto:
> > > > vvcep...@apache.org>>
> > > > > >>> wrote: Hi
> > > > > >>> > > > > > > >>>>>>>>>> Richard, Thanks for picking this up! I know
> > > > of at
> > > > > >>> least one
> > > > > >>> > > > > > > >>> large
> > > > > >>> > > > > > > >>>>>>>>>> community member for which this feature is
> > > > > >>> absolutely
> > > > > >>> > > > > > > >>> essential.
> > > > > >>> > > > > > > >>>>>>>>>> If I understand your two options, it seems
> > > > like
> > > > > >>> the proposal
> > > > > >>> > > > > > is
> > > > > >>> > > > > > > >>>>>>>>>> to implement it as a behavior change
> > > > regardless,
> > > > > >>> and the
> > > > > >>> > > > > > > >>> question
> > > > > >>> > > > > > > >>>>>>>>>> is whether to provide an opt-out config or
> > > > not.
> > > > > >>> Given that any
> > > > > >>> > > > > > > >>>>>>>>>> implementation of this feature would have 
> > > > > >>> > > > > > > >>>>>>>>>> some
> > > > > >>> performance
> > > > > >>> > > > > > > >>> impact
> > > > > >>> > > > > > > >>>>>>>>>> under some workloads, and also that we 
> > > > > >>> > > > > > > >>>>>>>>>> don't
> > > > know
> > > > > >>> if anyone
> > > > > >>> > > > > > > >>>>>>>>>> really depends on emit-on-update time
> > > > semantics,
> > > > > >>> it seems like
> > > > > >>> > > > > > > >>> we
> > > > > >>> > > > > > > >>>>>>>>>> should propose to add an opt-out config. 
> > > > > >>> > > > > > > >>>>>>>>>> Can
> > > > you
> > > > > >>> update the
> > > > > >>> > > > > > KIP
> > > > > >>> > > > > > > >>>>>>>>>> to mention the exact config key and 
> > > > > >>> > > > > > > >>>>>>>>>> value(s)
> > > > > >>> you'd propose?
> > > > > >>> > > > > > > >>> Just
> > > > > >>> > > > > > > >>>>>>>>>> to move the discussion forward, maybe
> > > > something
> > > > > >>> like: emit.on
> > > > > >>> > > > > > > >>> :=
> > > > > >>> > > > > > > >>>>>>>>>> change|update with the new default being
> > > > "change"
> > > > > >>> Thanks for
> > > > > >>> > > > > > > >>>>>>>>>> pointing out the timestamp issue in
> > > > particular. I
> > > > > >>> agree that
> > > > > >>> > > > > > if
> > > > > >>> > > > > > > >>>>>>>>>> we discard the latter update as a no-op, 
> > > > > >>> > > > > > > >>>>>>>>>> then
> > > > we
> > > > > >>> also have to
> > > > > >>> > > > > > > >>>>>>>>>> discard its timestamp (obviously, we don't
> > > > > >>> forward the
> > > > > >>> > > > > > > >>> timestamp
> > > > > >>> > > > > > > >>>>>>>>>> update, as that's the whole point, but we 
> > > > > >>> > > > > > > >>>>>>>>>> also
> > > > > >>> can't update
> > > > > >>> > > > > > the
> > > > > >>> > > > > > > >>>>>>>>>> timestamp in the store, as the store must
> > > > remain
> > > > > >>> consistent
> > > > > >>> > > > > > > >>> with
> > > > > >>> > > > > > > >>>>>>>>>> what has been emitted). I have to confess
> > > > that I
> > > > > >>> disagree with
> > > > > >>> > > > > > > >>>>>>>>>> your implementation proposal, but it's also
> > > > not
> > > > > >>> necessary to
> > > > > >>> > > > > > > >>>>>>>>>> discuss implementation in the KIP. Maybe it
> > > > would
> > > > > >>> be less
> > > > > >>> > > > > > > >>>>>>>>>> controversial if you just drop that section
> > > > for
> > > > > >>> now, so that
> > > > > >>> > > > > > > >>> the
> > > > > >>> > > > > > > >>>>>>>>>> KIP discussion can focus on the behavior
> > > > change
> > > > > >>> and config.
> > > > > >>> > > > > > > >>> Just
> > > > > >>> > > > > > > >>>>>>>>>> for reference, there is some research into
> > > > this
> > > > > >>> domain. For
> > > > > >>> > > > > > > >>>>>>>>>> example, see the "Report" section (3.2.3) 
> > > > > >>> > > > > > > >>>>>>>>>> of
> > > > the
> > > > > >>> SECRET paper:
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > >
> > > > > >>>
> > > > https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop
> > > > > >>> > > > > > > > le.csail.mit.edu
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data
> > > > > >>> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com
> > > > > >>> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > > > >>>
> > > > > >>> > > > > >
> > > > > >>>
> > > > Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&amp;sdata
> > > > > >>> > > > > > > >
> > > > > >>> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&amp;reserved=0
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > > It might help to round out the proposal if you take 
> > > > > >>> > > > > > > > a
> > > > brief
> > > > > >>> > > > > > > >>> survey of
> > > > > >>> > > > > > > >>>>>>>>>> the behaviors of other systems, along with
> > > > pros
> > > > > >>> and cons if
> > > > > >>> > > > > > any
> > > > > >>> > > > > > > >>>>>>>>>> are reported. Thanks, -John
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu
> > > > wrote:
> > > > > >>> Hi
> > > > > >>> > > > > > everybody!
> > > > > >>> > > > > > > >>>>>>>>>> I'd like to propose a change that we 
> > > > > >>> > > > > > > >>>>>>>>>> probably
> > > > > >>> should've added
> > > > > >>> > > > > > > >>> for
> > > > > >>> > > > > > > >>>>>>>>>> a long time now. The key benefit of this 
> > > > > >>> > > > > > > >>>>>>>>>> KIP
> > > > > >>> would be reduced
> > > > > >>> > > > > > > >>>>>>>>>> traffic in Kafka Streams since a lot of 
> > > > > >>> > > > > > > >>>>>>>>>> no-op
> > > > > >>> results would no
> > > > > >>> > > > > > > >>>>>>>>>> longer be sent downstream. Here is the KIP 
> > > > > >>> > > > > > > >>>>>>>>>> for
> > > > > >>> reference.
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > >
> > > > > >>>
> > > > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi
> > > > > >>> > > > > > > > ki.apache.org
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > > > >>>
> > > > > >>> > > > > >
> > > > > >>>
> > > > %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThom
> > > > > >>> > > > > > > > as.Becker%40tivo.com
> > > > > >>> > > > > > > >>>>>>
> > > > %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > > > >>>
> > > > > >>> > > > > >
> > > > > >>>
> > > > 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&amp;sdata=zYpCSFOsyN4%2B
> > > > > >>> > > > > > > > 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&amp;reserved=0
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > > Currently, I seek to formalize our approach for this
> > > > KIP
> > > > > >>> first
> > > > > >>> > > > > > > >>> before
> > > > > >>> > > > > > > >>>>>>>>>> we determine concrete API additions /
> > > > > >>> configurations. Some
> > > > > >>> > > > > > > >>>>>>>>>> configs might warrant adding, whiles others
> > > > are
> > > > > >>> not necessary
> > > > > >>> > > > > > > >>>>>>>>>> since adding them would only increase
> > > > complexity
> > > > > >>> of Kafka
> > > > > >>> > > > > > > >>>>>>>>>> Streams. Cheers, Richard
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> ________________________________ This email
> > > > and
> > > > > >>> any
> > > > > >>> > > > > > attachments
> > > > > >>> > > > > > > >>>>>>>>>> may contain confidential and privileged
> > > > material
> > > > > >>> for the sole
> > > > > >>> > > > > > > >>> use
> > > > > >>> > > > > > > >>>>>>>>>> of the intended recipient. Any review,
> > > > copying, or
> > > > > >>> > > > > > distribution
> > > > > >>> > > > > > > >>>>>>>>>> of this email (or any attachments) by 
> > > > > >>> > > > > > > >>>>>>>>>> others
> > > > is
> > > > > >>> prohibited. If
> > > > > >>> > > > > > > >>>>>>>>>> you are not the intended recipient, please
> > > > > >>> contact the sender
> > > > > >>> > > > > > > >>>>>>>>>> immediately and permanently delete this 
> > > > > >>> > > > > > > >>>>>>>>>> email
> > > > and
> > > > > >>> any
> > > > > >>> > > > > > > >>>>>>>>>> attachments. No employee or agent of TiVo 
> > > > > >>> > > > > > > >>>>>>>>>> is
> > > > > >>> authorized to
> > > > > >>> > > > > > > >>>>>>>>>> conclude any binding agreement on behalf of
> > > > TiVo
> > > > > >>> by email.
> > > > > >>> > > > > > > >>>>>>>>>> Binding agreements with TiVo may only be 
> > > > > >>> > > > > > > >>>>>>>>>> made
> > > > by
> > > > > >>> a signed
> > > > > >>> > > > > > > >>> written
> > > > > >>> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker* *Principal
> > > > Engineer *
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> *Personalized Content Discovery*
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
> > > > > >>> http://www.tivo.com/>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>> This email and any attachments may contain
> > > > > >>> confidential and
> > > > > >>> > > > > > > >>>>>>>>>> privileged material for the sole use of the
> > > > > >>> intended
> > > > > >>> > > > > > recipient.
> > > > > >>> > > > > > > >>>>>>>>>> Any review, copying, or distribution of 
> > > > > >>> > > > > > > >>>>>>>>>> this
> > > > > >>> email (or any
> > > > > >>> > > > > > > >>>>>>>>>> attachments) by others is prohibited. If 
> > > > > >>> > > > > > > >>>>>>>>>> you
> > > > are
> > > > > >>> not the
> > > > > >>> > > > > > > >>> intended
> > > > > >>> > > > > > > >>>>>>>>>> recipient, please contact the sender
> > > > immediately
> > > > > >>> and
> > > > > >>> > > > > > > >>> permanently
> > > > > >>> > > > > > > >>>>>>>>>> delete this email and any attachments. No
> > > > > >>> employee or agent of
> > > > > >>> > > > > > > >>>>>>>>>> TiVo is authorized to conclude any binding
> > > > > >>> agreement on behalf
> > > > > >>> > > > > > > >>> of
> > > > > >>> > > > > > > >>>>>>>>>> TiVo by email. Binding agreements with TiVo
> > > > may
> > > > > >>> only be made
> > > > > >>> > > > > > > >>> by a
> > > > > >>> > > > > > > >>>>>>>>>> signed written agreement.
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> --
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> *Tommy Becker* /Principal Engineer /
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> /Personalized Content Discovery/
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
> > > > > >>> http://www.tivo.com/>
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>>>>
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > >
> > > > > >>>
> > > > ----------------------------------------------------------------------
> > > > > >>> > > > > > > >>>>>>>
> > > > > >>> > > > > > > >>>>>>
> > > > > >>> > > > > > > >>>>>
> > > > > >>> > > > > > > >>>>
> > > > > >>> > > > > > > >>>
> > > > > >>> > > > > > > >>
> > > > > >>> > > > > > > >
> > > > > >>> > > > > > >
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > Attachments:
> > > > > >>> > > > > > > * signature.asc
> > > > > >>> > > > > >
> > > > > >>> > > >
> > > > > >>> >
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> >
> >

Reply via email to