Hi all,

@John Will add some notes accordingly.

To all: Thanks for all your input!
It looks like we can wrap up this discussion thread then.

I've started a vote thread, so please feel free to cast your vote there!

We should be pretty close. :)

Cheers,
Richard

On Thu, Feb 27, 2020 at 2:34 PM John Roesler <vvcep...@apache.org> wrote:

> Hi Richard,
>
> Thanks for the update!
>
> I read it over, and overall it looks good!
>
> I have only a minor concern about the rate metric definition:
> > The rate option indicates the ratio of records dropped to actual volume
> of records passing through the task
> That's not the definition of a "rate". It should be something more like
> "the average number of dropped idempotent updates per second".
>
> Incidentally, I mentioned this KIP to Guozhang, and he brought up an
> interesting concern I'd like to share with the list. He noted that if we
> filter
> out idempotent table updates, stream time will not advance with every
> input event anymore. He asked if this would have a negative impact on
> operations that depend on stream time.
>
> I think this is a valid concern. For example, you can use Suppress to
> buffer KTable updates until a certain amount of stream time passes.
> Specifically,
> the Suppress processor itself advances stream time as it receives new
> records
> to its `process` method. In the pathological case, all updates are
> idempotent,
> get dropped, and the Suppress operator never emits anything, even though to
> an outside observer, stream time should have advanced.
>
> Example scenario:
> > inputTable = builder.table(input)
> > suppressed = inputTable.suppress(untilTimeLimit(10))
> >
> > input: key=A, timestamp=0, value=X
> > inputTable: key=A, timestamp=0, value=X
> > suppressed buffer: key=A, timestamp=0, value=X (observed stream time = 0)
> > output: (nothing)
> >
> > input: key=A, timestamp=11, value=X
> > // update is idempotent, so it gets dropped
> > inputTable: key=A, timestamp=0, value=X
> > suppressed buffer: key=A, timestamp=0, value=X (observed stream time = 0)
> > output: (nothing)
>
> Note that even though stream time has technically advanced beyond the
> suppression config of 10, the suppression buffer doesn't see it because the
> KTable source already dropped the idempotent update.
>
> I'm thinking that this situation is indeed concerning, and we should be
> aware
> and make a note of it. However, I don't think that it should change our
> proposal.
> To understand why, I'll enumerate all the usages of stream time in Kafka
> Streams:
>
> windowed operations
> -----------------------------
> KStreamSessionWindowAggregate & KStreamWindowAggregate:
>   - used to determine if an incoming record belongs to a window that is
> already closed
> Suppressed.untilWindowCloses():
>   - used to flush out results for windows, once they are closed
> AbstractRocksDBSegmentedBytesStore & InMemorySessionStore &
> InMemoryWindowStore:
>   - used to create new segments and drop old ones that are out of retention
>
> non-windowed operations
> -----------------------------------
> Suppressed.untilTimeLimit
>   - used to flush out prior updates that have passed the configured age,
> in stream time
>
> Note that most of the usages are in windowed operations. One interesting
> thing
> about this context is that records with higher timestamps belong to
> different windows.
> In order to advance stream time far enough to close a window or push it
> out of retention, the new records must have timestamps that in later
> windows, which means
> that they are updates to _new_ keys, which means they would not be
> suppressed as
> idempotent.
>
> Updates within a window could still be suppressed, though:
> For example, if the window size is 10, and the grace period is 5, and we
> get updates all
> for the same key with timestamps 0, 11, and 16, today, we would emit the
> record for
> the [0,10) window as soon as we got the update at time 16 (since stream
> time is now
> past the window end + grace period time of 15). But if we drop the time=16
> update, we
> wouldn't emit the [0,10) window results until we get a record with time >=
> 20.
>
> You can see that the maximum amount of (stream) time that dropping
> idempotent
> updates could delay updates is one window. This might be surprising, but
> it doesn't
> seem too bad, especially since Suppress explicitly does _not_ promise to
> emit the
> results at the earliest possible time, just at some time after the window
> closes.
>
> Even better, though, all the windowed aggregations are _stream_
> aggregations
> that _produce_ a KTable, and we already decided that (at least for now),
> we would
> include the timestamp in the idempotence check for stream aggregation
> results.
> With this strategy, we would actually not suppress _any_ update to a
> stream aggregation (windowed or otherwise) that advances stream time.
>
> So there's no problem at all with windowed operations.
>
> This only leaves non-windowed operations, of which there's only one. I
> have to admit
> that I regret Suppressed.untilTimeLimit. It turns out that everyone I've
> heard of who
> used this API actually wanted it to be wall-clock based, not stream-time
> based. So,
> I'm not sure in practice if this sharp edge will actually cut anyone.
>
> I think a "proper" fix would be to introduce some kind of control message
> to advance
> stream time independently of records. We've already talked about this a
> little in
> KAFKA-8769, and it would also be necessary for global consistency markers.
> Basically,
> once a record is dropped as an idempotent update, we could still forward a
> time
> marker through the topology, which could trigger suppressions, as well as
> window
> boundaries, but wouldn't result in any computations.
>
> But practically speaking, I'm really not confident that there's anyone
> really using the
> untilTimeLimit suppression, and even if they are, if they would really see
> consecutive
> idempotent updates for long enough to really have an observable impact on
> what gets emitted from the suppression buffer. In fact, if some
> hypothetical person did find
> themselves on the wrong end of my assumption, they could _remove_ the
> suppression, and rely instead on idempotence-dropping to get the same
> level of
> traffic reduction that they enjoy from the suppression.
>
> Anyway, long story short, I'm advocating to continue with the current
> proposal
> and just make a note of the situations I've outlined above.
>
> Thanks for your time,
> -John
>
>
> On Thu, Feb 27, 2020, at 14:33, Richard Yu wrote:
> > Hi all,
> >
> > I might've made a minor mistake. The processor node level is level 3, not
> > level 1.
> > I will correct the KIP accordingly.
> >
> > After looking over things, I decided to start the voting thread this
> > afternoon.
> >
> > Cheers,
> > Richard
> >
> > On Thu, Feb 27, 2020 at 12:29 PM Richard Yu <yohan.richard...@gmail.com>
> > wrote:
> >
> > > Hi Bruno, Hi John,
> > >
> > > Thanks for your comments! I updated the KIP accordingly, and it looks
> like
> > > for quite a few points. I was doing some beating around the bush which
> > > could've been avoided.
> > >
> > > Looks like we can reduce the metric to Level 1 (per processor node)
> then.
> > >
> > > I've cleaned up most of the unnecessary info, and we should be fairly
> > > close.
> > > I will start working on a PR soon for this KIP. (although we might
> split
> > > that up into stages)
> > >
> > > Cheers,
> > > Richard
> > >
> > > On Thu, Feb 27, 2020 at 6:06 AM Bruno Cadonna <br...@confluent.io>
> wrote:
> > >
> > >> Hi John,
> > >>
> > >> I agree with you. It is better to measure the metric on processor node
> > >> level. The users can do the rollup to task-level by themselves.
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> On Thu, Feb 27, 2020 at 12:09 AM John Roesler <vvcep...@apache.org>
> > >> wrote:
> > >> >
> > >> > Hi Richard,
> > >> >
> > >> > I've been making a final pass over the KIP.
> > >> >
> > >> > Re: Proposed Behavior Change:
> > >> >
> > >> > I think this point is controversial and probably doesn't need to be
> > >> there at all:
> > >> > > 2.b. In certain situations where there is a high volume of
> idempotent
> > >> > > updates throughout the Streams DAG, it will be recommended
> practice
> > >> > > to materialize all operations to reduce traffic overall across the
> > >> entire
> > >> > >  network of nodes.
> > >> >
> > >> > Re-reading all the points, it seems like we can sum them up in a way
> > >> that's
> > >> > a little more straight to the point, and gives us the right amount
> of
> > >> flexibility:
> > >> >
> > >> > > Proposed Behavior Changes
> > >> > >
> > >> > > Definition: "idempotent update" is one in which the new result and
> > >> prior
> > >> > > result,  when serialized, are identical byte arrays
> > >> > >
> > >> > > Note: an "update" is a concept that only applies to Table
> operations,
> > >> so
> > >> > > the concept of an "idempotent update" also only applies to Table
> > >> operations.
> > >> > > See
> > >>
> https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
> > >> > > for more information.
> > >> > >
> > >> > > Given that definition, we propose for Streams to drop idempotent
> > >> updates
> > >> > > in any situation where it's possible and convenient to do so. For
> > >> example,
> > >> > > any time we already have both the prior and new results
> serialized, we
> > >> > > may compare them, and drop the update if it is idempotent.
> > >> > >
> > >> > > Note that under this proposal, we can implement idempotence
> checking
> > >> > > in the following situations:
> > >> > > 1. Any aggregation (for example, KGroupedStream, KGroupedTable,
> > >> > >     TimeWindowedKStream, and SessionWindowedKStream operations)
> > >> > > 2. Any Materialized KTable operation
> > >> > > 3. Repartition operations, when we need to send both prior and new
> > >> results
> > >> >
> > >> > Notice that in my proposed wording, we neither limit ourselves to
> just
> > >> the
> > >> > situations enumerated, nor promise to implement the optimization in
> > >> every
> > >> > possible situation. IMHO, this is the best way to propose such a
> > >> feature.
> > >> > That way, we have the flexibility to implement it in stages, and
> also
> > >> to add
> > >> > on to the implementation in the future.
> > >> >
> > >> >
> > >> > Re: Metrics
> > >> >
> > >> > I agree with Bruno, although, I think it might just be a confusing
> > >> statement.
> > >> > It might be clearer to drop all the "discussion", and just say: "We
> > >> will add a
> > >> > metric to count the number of idempotent updates that we have
> dropped".
> > >> >
> > >> > Also, with respect to the metric, I'm wondering if the metric
> should be
> > >> task-
> > >> > level or processor-node-level. Since the interesting action takes
> place
> > >> inside
> > >> > individual processor nodes, I _think_ it would be higher leverage to
> > >> just
> > >> > measure it at the node level. WDYT?
> > >> >
> > >> > Re: Design Reasoning
> > >> >
> > >> > This section seems to be a little bit outdated. I also just noticed
> a
> > >> "surprise"
> > >> > configuration "timestamp.aggregation.selection.policy" hidden in
> point
> > >> 1.a.
> > >> > Is that part of the proposal? We haven't discussed it, and I think
> we
> > >> were
> > >> > talking about this KIP being "configuration free".
> > >> >
> > >> > There is also some discussion of discarded alternative in the Design
> > >> Reasoning
> > >> > section, which is confusing. Finally, there was a point there I
> didn't
> > >> understand
> > >> > at all, about stateless operators not being intended to load prior
> > >> results.
> > >> > This statement doesn't seem to be true, but it also doesn't seem to
> be
> > >> relevant,
> > >> > so maybe we can just drop it.
> > >> >
> > >> > Overall, it might help if you make a pass on this section, and just
> > >> discuss as
> > >> > briefly as possible the justification for the proposed behavior
> change,
> > >> and
> > >> > not adding a configuration. Try to avoid talking about things that
> we
> > >> are not
> > >> > proposing, since that will just lead to confusion.
> > >> >
> > >> > Similarly, I'd just completely remove the "Implementation
> [discarded]"
> > >> section.
> > >> > It was good to have this as part of the discussion initially, but
> as we
> > >> move
> > >> > toward a vote, it's better to just streamline the KIP document as
> much
> > >> as
> > >> > possible. Keeping a "discarded" section in the document will just
> make
> > >> it
> > >> > harder for new people to understand the proposal. We did the same
> thing
> > >> > with KIP-441, where there were two prior drafts included at the end
> of
> > >> the
> > >> > document, and we just deleted them for clarity.
> > >> >
> > >> > I liked the "Compatibility" and "Rejected Alternatives" section.
> Very
> > >> clear
> > >> > and to the point.
> > >> >
> > >> > Thanks again for the contribution! I think once the KIP document is
> > >> cleaned
> > >> > up, we'll be in good shape to finalize the discussion.
> > >> > -John
> > >> >
> > >> >
> > >> > On Wed, Feb 26, 2020, at 07:27, Bruno Cadonna wrote:
> > >> > > Hi Richard,
> > >> > >
> > >> > > 1. Could you change "idempotent update operations will only be
> dropped
> > >> > > from KTables, not from other classes." -> idempotent update
> operations
> > >> > > will only be dropped from materialized KTables? For
> non-materialized
> > >> > > KTables -- as they can occur after optimization of the topology
> -- we
> > >> > > cannot drop idempotent updates.
> > >> > >
> > >> > > 2. I cannot completely follow the metrics section. Do you want to
> > >> > > record all idempotent updates or only the dropped ones? In
> particular,
> > >> > > I do not understand the following sentences:
> > >> > > "For that matter, even if we don't drop idempotent updates, we
> should
> > >> > > at the very least record the number of idempotent updates that has
> > >> > > been seen go through a particular processor."
> > >> > > "Therefore, we should add some metrics which will count the
> number of
> > >> > > idempotent updates that each node has seen."
> > >> > > I do not see how we can record idempotent updates that we do not
> drop.
> > >> > > If we see them, we should drop them. If we do not see them, we
> cannot
> > >> > > drop them and we cannot record them.
> > >> > >
> > >> > > Best,
> > >> > > Bruno
> > >> > >
> > >> > > On Wed, Feb 26, 2020 at 4:57 AM Richard Yu <
> > >> yohan.richard...@gmail.com> wrote:
> > >> > > >
> > >> > > > Hi John,
> > >> > > >
> > >> > > > Sounds goods. It looks like we are close to wrapping things up.
> If
> > >> there
> > >> > > > isn't any other revisions which needs to be made. (If so, please
> > >> comment in
> > >> > > > the thread)
> > >> > > > I will start the voting process this Thursday (Pacific Standard
> > >> Time).
> > >> > > >
> > >> > > > Cheers,
> > >> > > > Richard
> > >> > > >
> > >> > > > On Tue, Feb 25, 2020 at 11:59 AM John Roesler <
> vvcep...@apache.org>
> > >> wrote:
> > >> > > >
> > >> > > > > Hi Richard,
> > >> > > > >
> > >> > > > > Sorry for the slow reply. I actually think we should avoid
> > >> checking
> > >> > > > > equals() for now. Your reasoning is good, but the truth is
> that
> > >> > > > > depending on the implementation of equals() is non-trivial,
> > >> > > > > semantically, and (though I proposed it before), I'm not
> convinced
> > >> > > > > it's worth the risk. Much better to start with exactly one
> kind of
> > >> > > > > "idempotence detection".
> > >> > > > >
> > >> > > > > Even if someone does update their serdes, we know that the new
> > >> > > > > serde would still be able to _de_serialize the old format, or
> the
> > >> whole
> > >> > > > > app would break. The situation is that the new result gets
> encoded
> > >> > > > > in the new binary format, which means we don't detect an
> > >> idempotent
> > >> > > > > update for what it is. In this case, we'd write the new binary
> > >> format to
> > >> > > > > disk and the changelog, and forward it downstream. However, we
> > >> only
> > >> > > > > do this once. Now that the binary format for that record has
> been
> > >> updated,
> > >> > > > > we would correctly detect idempotence of any subsequent
> updates.
> > >> > > > >
> > >> > > > > Plus, we would still be able to filter out idempotent updates
> in
> > >> > > > > repartition
> > >> > > > > sinks, since for those, we use the new serde to serialize both
> > >> the "old"
> > >> > > > > and
> > >> > > > > "new" result.
> > >> > > > >
> > >> > > > > It's certainly a good observation, but I think we can just
> make a
> > >> note of
> > >> > > > > it
> > >> > > > > in "rejected alternatives" for now, and plan to refine it
> later,
> > >> if it does
> > >> > > > > pose a big performance problem.
> > >> > > > >
> > >> > > > > Thanks!
> > >> > > > > -John
> > >> > > > >
> > >> > > > > On Sat, Feb 22, 2020, at 18:14, Richard Yu wrote:
> > >> > > > > > Hi all,
> > >> > > > > >
> > >> > > > > > Updated the KIP.
> > >> > > > > >
> > >> > > > > > Just a question: do you think it would be a good idea if we
> > >> check for
> > >> > > > > both
> > >> > > > > > Object#equals() and binary equality?
> > >> > > > > > Because there might be some subtle changes in the
> serialization
> > >> (for
> > >> > > > > > example, if the user decides to upgrade their serialization
> > >> procedure to
> > >> > > > > a
> > >> > > > > > new one), but the underlying values of the result might be
> the
> > >> same.
> > >> > > > > > (therefore equals() might return true)
> > >> > > > > >
> > >> > > > > > Do you think this would be plausible?
> > >> > > > > >
> > >> > > > > > Cheers,
> > >> > > > > > Richard
> > >> > > > > >
> > >> > > > > > On Fri, Feb 21, 2020 at 2:37 PM Richard Yu <
> > >> yohan.richard...@gmail.com>
> > >> > > > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hello,
> > >> > > > > > >
> > >> > > > > > > Just to make some updates. I changed the name of the
> metric
> > >> so that it
> > >> > > > > was
> > >> > > > > > > more in line with usual Kafka naming conventions for
> metrics
> > >> / sensors.
> > >> > > > > > > Below is the updated description of the metric:
> > >> > > > > > >
> > >> > > > > > > dropped-idempotent-updates : (Level 2 - Per Task) DEBUG
> (rate
> > >> | total)
> > >> > > > > > >
> > >> > > > > > > Description: This metric will record the number of updates
> > >> that have
> > >> > > > > been
> > >> > > > > > > dropped since they are essentially re-performing an
> earlier
> > >> operation.
> > >> > > > > > >
> > >> > > > > > > Note:
> > >> > > > > > >
> > >> > > > > > >    - The rate option indicates the ratio of records
> dropped
> > >> to actual
> > >> > > > > > >    volume of records passing through the task.
> > >> > > > > > >    - The total option will just give a raw count of the
> > >> number of
> > >> > > > > records
> > >> > > > > > >    dropped.
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > I hope that this is more on point.
> > >> > > > > > >
> > >> > > > > > > Best,
> > >> > > > > > > Richard
> > >> > > > > > >
> > >> > > > > > > On Fri, Feb 21, 2020 at 2:20 PM Richard Yu <
> > >> yohan.richard...@gmail.com
> > >> > > > > >
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > >> Hi all,
> > >> > > > > > >>
> > >> > > > > > >> Thanks for the clarification. I was just confused a
> little
> > >> on what was
> > >> > > > > > >> going on.
> > >> > > > > > >>
> > >> > > > > > >> So I guess then that for the actual proposal. We got the
> > >> following:
> > >> > > > > > >>
> > >> > > > > > >> 1. We check for binary equality, and perform no extra
> look
> > >> ups.
> > >> > > > > > >> 2. Emphasize that this applies only to materialized
> tables.
> > >> > > > > > >> 3. We drop aggregation updates if key, value and
> timestamp
> > >> is the
> > >> > > > > same.
> > >> > > > > > >>
> > >> > > > > > >> Then that settles the behavior changes. So it looks like
> the
> > >> Metric
> > >> > > > > that
> > >> > > > > > >> is the only thing that is left. In this case, I think the
> > >> metric
> > >> > > > > would be
> > >> > > > > > >> named the following: IdempotentUpdateMetric. This is
> mostly
> > >> off the
> > >> > > > > top of
> > >> > > > > > >> my head. So if you think that we change it, feel free to
> say
> > >> so.
> > >> > > > > > >> The metric will report the number of dropped operations
> > >> inherently.
> > >> > > > > > >>
> > >> > > > > > >> It will probably be added as a Sensor, similar to the
> > >> dropped records
> > >> > > > > > >> sensor we already have.
> > >> > > > > > >>
> > >> > > > > > >> If there isn't anything else, I will probably start the
> > >> voting process
> > >> > > > > > >> next week!
> > >> > > > > > >>
> > >> > > > > > >> Cheers,
> > >> > > > > > >> Richard
> > >> > > > > > >>
> > >> > > > > > >>
> > >> > > > > > >> On Fri, Feb 21, 2020 at 11:23 AM John Roesler <
> > >> vvcep...@apache.org>
> > >> > > > > > >> wrote:
> > >> > > > > > >>
> > >> > > > > > >>> Hi Bruno,
> > >> > > > > > >>>
> > >> > > > > > >>> Thanks for the clarification. Indeed, I was thinking two
> > >> things:
> > >> > > > > > >>> 1. For the initial implementation, we can just avoid
> adding
> > >> any extra
> > >> > > > > > >>> lookups, but only do the comparison when we already
> happen
> > >> to have
> > >> > > > > > >>> the prior value.
> > >> > > > > > >>> 2. I think, as a result of the timestamp semantics, we
> > >> actually _do_
> > >> > > > > look
> > >> > > > > > >>> up the prior value approximately all the time, so the
> > >> idempotence
> > >> > > > > check
> > >> > > > > > >>> should be quite effective.
> > >> > > > > > >>>
> > >> > > > > > >>> I think that second point is the same thing you're
> > >> referring to
> > >> > > > > > >>> potentially
> > >> > > > > > >>> being unnecessary. It does mean that we do fetch the
> whole
> > >> value in a
> > >> > > > > > >>> lot of cases where we really only need the timestamp,
> so it
> > >> could
> > >> > > > > > >>> certainly
> > >> > > > > > >>> be optimized in the future. In that future, we would
> need
> > >> to weigh
> > >> > > > > that
> > >> > > > > > >>> optimization against losing the idempotence check. But,
> > >> that's a
> > >> > > > > problem
> > >> > > > > > >>> for tomorrow :)
> > >> > > > > > >>>
> > >> > > > > > >>> I'm 100% on board with scrutinizing the performance as
> we
> > >> implement
> > >> > > > > > >>> this feature.
> > >> > > > > > >>>
> > >> > > > > > >>> Thanks again,
> > >> > > > > > >>> -John
> > >> > > > > > >>>
> > >> > > > > > >>> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote:
> > >> > > > > > >>> > Hi John,
> > >> > > > > > >>> >
> > >> > > > > > >>> > I am glad to help you with your imagination. With
> > >> overhead, I
> > >> > > > > mainly
> > >> > > > > > >>> > meant the additional lookup into the state store to
> get
> > >> the current
> > >> > > > > > >>> > value, but I see now in the code that we do that
> lookup
> > >> anyways
> > >> > > > > > >>> > (although I think we could avoid that in the cases
> where
> > >> we do not
> > >> > > > > > >>> > need the old value). With or without config, we need
> to
> > >> evaluate
> > >> > > > > the
> > >> > > > > > >>> > performance benefits of this change, in any case.
> > >> > > > > > >>> >
> > >> > > > > > >>> > Best,
> > >> > > > > > >>> > Bruno
> > >> > > > > > >>> >
> > >> > > > > > >>> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler <
> > >> vvcep...@apache.org>
> > >> > > > > > >>> wrote:
> > >> > > > > > >>> > >
> > >> > > > > > >>> > > Thanks for your remarks, Bruno!
> > >> > > > > > >>> > >
> > >> > > > > > >>> > > I'm in favor of standardizing on terminology like
> "not
> > >> forwarding
> > >> > > > > > >>> > > idempotent updates" or "dropping idempotent
> updates".
> > >> Maybe we
> > >> > > > > > >>> > > should make a pass on the KIP and just convert
> > >> everything to this
> > >> > > > > > >>> > > phrasing. In retrospect, even the term
> "emit-on-change"
> > >> has too
> > >> > > > > much
> > >> > > > > > >>> > > semantic baggage, since it implies the semantics
> from
> > >> the SECRET
> > >> > > > > > >>> > > paper, which we don't really want to imply here.
> > >> > > > > > >>> > >
> > >> > > > > > >>> > > I'm also in favor of the metric as you propose.
> > >> > > > > > >>> > >
> > >> > > > > > >>> > > Likewise with stream aggregations, I was also under
> the
> > >> > > > > impression
> > >> > > > > > >>> > > that we agreed on dropping idempotent updates to the
> > >> aggregation
> > >> > > > > > >>> > > result, any time we find that our "new" (key, value,
> > >> timestamp)
> > >> > > > > > >>> result
> > >> > > > > > >>> > > is identical to the prior one.
> > >> > > > > > >>> > >
> > >> > > > > > >>> > > Also, I'm +1 on all your recommendations for
> updating
> > >> the KIP
> > >> > > > > > >>> document
> > >> > > > > > >>> > > for clarity.
> > >> > > > > > >>> > >
> > >> > > > > > >>> > > Regarding the opt-out config. Perhaps I'm suffering
> > >> from a
> > >> > > > > failure of
> > >> > > > > > >>> > > imagination, but I don't see how the current
> proposal
> > >> could
> > >> > > > > really
> > >> > > > > > >>> have
> > >> > > > > > >>> > > a measurable impact on latency. If all we do is
> make a
> > >> single
> > >> > > > > extra
> > >> > > > > > >>> pass
> > >> > > > > > >>> > > to compare two byte arrays for equality, only in the
> > >> cases where
> > >> > > > > we
> > >> > > > > > >>> already
> > >> > > > > > >>> > > have the byte arrays available, it seems unlikely to
> > >> measurably
> > >> > > > > > >>> affect the
> > >> > > > > > >>> > > processing of non-idempotent updates. It seems
> > >> guaranteed to
> > >> > > > > > >>> _decrease_
> > >> > > > > > >>> > > the latency of processing idempotent updates, since
> we
> > >> get to
> > >> > > > > skip a
> > >> > > > > > >>> > > store#put, at least one producer#send, and also all
> > >> downstream
> > >> > > > > > >>> processing,
> > >> > > > > > >>> > > including all the disk and network operations
> > >> associated with
> > >> > > > > > >>> downstream
> > >> > > > > > >>> > > operations.
> > >> > > > > > >>> > >
> > >> > > > > > >>> > > It seems like if we're pretty sure this change would
> > >> only
> > >> > > > > _help_, we
> > >> > > > > > >>> shouldn't
> > >> > > > > > >>> > > introduce the operational burden of an extra
> > >> configuration. If we
> > >> > > > > > >>> want to
> > >> > > > > > >>> > > be more aggressive about dropping idempotent
> operations
> > >> in the
> > >> > > > > > >>> future,
> > >> > > > > > >>> > > such as depending on equals() or adding a
> ChangeDetector
> > >> > > > > interface,
> > >> > > > > > >>> then
> > >> > > > > > >>> > > we should consider adding a configuration as part of
> > >> that future
> > >> > > > > > >>> work. In
> > >> > > > > > >>> > > fact, if we add a simple "opt-in/opt-out" switch
> right
> > >> now, we
> > >> > > > > might
> > >> > > > > > >>> find
> > >> > > > > > >>> > > that it's actually insufficient for whatever future
> > >> feature we
> > >> > > > > might
> > >> > > > > > >>> propose,
> > >> > > > > > >>> > > then we have a mess of deprecating the opt-out
> config
> > >> and
> > >> > > > > replacing
> > >> > > > > > >>> it.
> > >> > > > > > >>> > >
> > >> > > > > > >>> > > What do you think?
> > >> > > > > > >>> > > -John
> > >> > > > > > >>> > >
> > >> > > > > > >>> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna wrote:
> > >> > > > > > >>> > > > Hi all,
> > >> > > > > > >>> > > >
> > >> > > > > > >>> > > > Sorry for the late reply!
> > >> > > > > > >>> > > >
> > >> > > > > > >>> > > > I am also in favour of baby steps.
> > >> > > > > > >>> > > >
> > >> > > > > > >>> > > > I am undecided whether the KIP should contain a
> > >> opt-out config
> > >> > > > > or
> > >> > > > > > >>> not.
> > >> > > > > > >>> > > > The overhead of emit-on-change might affect
> latency.
> > >> For
> > >> > > > > > >>> applications
> > >> > > > > > >>> > > > where low latency is crucial and there are not too
> > >> many
> > >> > > > > idempotent
> > >> > > > > > >>> > > > updates, it would be better to fall back to
> > >> emit-on-update.
> > >> > > > > > >>> However,
> > >> > > > > > >>> > > > we do not know how much emit-on-change impacts
> > >> latency. We
> > >> > > > > would
> > >> > > > > > >>> first
> > >> > > > > > >>> > > > need to benchmark that before we can decide about
> the
> > >> > > > > > >>> opt-out-config.
> > >> > > > > > >>> > > >
> > >> > > > > > >>> > > > A metric of dropped idempotent updates seems
> useful
> > >> to me to be
> > >> > > > > > >>> > > > informed about potential upstream applications or
> > >> upstream
> > >> > > > > > >>> operators
> > >> > > > > > >>> > > > that produce too many idempotent updates. The KIP
> > >> should state
> > >> > > > > the
> > >> > > > > > >>> > > > name of the metric, its group, its tags, and its
> > >> recording
> > >> > > > > level
> > >> > > > > > >>> (see
> > >> > > > > > >>> > > > KIP-444 or KIP-471 for examples). I propose DEBUG
> as
> > >> reporting
> > >> > > > > > >>> level.
> > >> > > > > > >>> > > >
> > >> > > > > > >>> > > > Richard, what competing proposals for
> emit-on-change
> > >> for
> > >> > > > > > >>> aggregations
> > >> > > > > > >>> > > > do you mean? I have the feeling that we agreed to
> get
> > >> rid of
> > >> > > > > > >>> > > > idempotent updates if the aggregate is updated
> with
> > >> the same
> > >> > > > > key,
> > >> > > > > > >>> > > > value, AND timestamp. I am also fine if we do not
> > >> include this
> > >> > > > > into
> > >> > > > > > >>> > > > this KIP (remember: baby steps).
> > >> > > > > > >>> > > >
> > >> > > > > > >>> > > > You write that "emit-on-change is more correct".
> > >> Since we
> > >> > > > > agreed
> > >> > > > > > >>> that
> > >> > > > > > >>> > > > this is an optimization, IMO you cannot argue this
> > >> way.
> > >> > > > > > >>> > > >
> > >> > > > > > >>> > > > Please put "Alternative Approaches" under
> "Rejected
> > >> > > > > Alternatives",
> > >> > > > > > >>> so
> > >> > > > > > >>> > > > that it becomes clear that we are not going to
> > >> implement them.
> > >> > > > > In
> > >> > > > > > >>> > > > general, I think the KIP needs a bit of clean-up
> > >> (probably, you
> > >> > > > > > >>> > > > already planned for it). "Design Reasoning" is a
> bit
> > >> of
> > >> > > > > behavior
> > >> > > > > > >>> > > > changes, rejected alternatives and duplicates a
> bit
> > >> the
> > >> > > > > content in
> > >> > > > > > >>> > > > those sections.
> > >> > > > > > >>> > > >
> > >> > > > > > >>> > > > I do not like the name "no-op operations" or
> > >> "no-ops", because
> > >> > > > > they
> > >> > > > > > >>> > > > are rather generic. I like more "idempotent
> updates".
> > >> > > > > > >>> > > >
> > >> > > > > > >>> > > > Best,
> > >> > > > > > >>> > > > Bruno
> > >> > > > > > >>> > > >
> > >> > > > > > >>> > > >
> > >> > > > > > >>> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu <
> > >> > > > > > >>> yohan.richard...@gmail.com> wrote:
> > >> > > > > > >>> > > > >
> > >> > > > > > >>> > > > > Hi all,
> > >> > > > > > >>> > > > >
> > >> > > > > > >>> > > > > We are definitely making progress!
> > >> > > > > > >>> > > > >
> > >> > > > > > >>> > > > > @John should I emphasize in the proposed
> behavior
> > >> changes
> > >> > > > > that
> > >> > > > > > >>> we are only
> > >> > > > > > >>> > > > > doing binary equality checks for stateful
> operators?
> > >> > > > > > >>> > > > > It looks like we have come close to finalizing
> this
> > >> part of
> > >> > > > > the
> > >> > > > > > >>> KIP. (I
> > >> > > > > > >>> > > > > will note in the KIP that this proposal is
> intended
> > >> for
> > >> > > > > > >>> optimization, not
> > >> > > > > > >>> > > > > semantics correctness)
> > >> > > > > > >>> > > > >
> > >> > > > > > >>> > > > > I do think maybe we still have one other detail
> we
> > >> need to
> > >> > > > > > >>> discuss. So far,
> > >> > > > > > >>> > > > > there has been quite a bit of back and forth
> about
> > >> what the
> > >> > > > > > >>> behavior of
> > >> > > > > > >>> > > > > aggregations should look like in emit on
> change. I
> > >> have seen
> > >> > > > > > >>> > > > > multiple competing proposals, so I am not
> > >> completely certain
> > >> > > > > > >>> which one we
> > >> > > > > > >>> > > > > should go with, or how we will be able to
> > >> compromise in
> > >> > > > > between
> > >> > > > > > >>> them.
> > >> > > > > > >>> > > > >
> > >> > > > > > >>> > > > > Let me know what your thoughts are on this
> matter,
> > >> since we
> > >> > > > > are
> > >> > > > > > >>> probably
> > >> > > > > > >>> > > > > close to wrapping up most other stuff.
> > >> > > > > > >>> > > > > @Matthias J. Sax <matth...@confluent.io>  and
> > >> @Bruno, see
> > >> > > > > what
> > >> > > > > > >>> you think
> > >> > > > > > >>> > > > > about this.
> > >> > > > > > >>> > > > >
> > >> > > > > > >>> > > > > Best,
> > >> > > > > > >>> > > > > Richard
> > >> > > > > > >>> > > > >
> > >> > > > > > >>> > > > >
> > >> > > > > > >>> > > > >
> > >> > > > > > >>> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler <
> > >> > > > > > >>> vvcep...@apache.org> wrote:
> > >> > > > > > >>> > > > >
> > >> > > > > > >>> > > > > > Thanks, Matthias!
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>> > > > > > Regarding numbers, it would be hard to know
> how
> > >> many
> > >> > > > > > >>> applications
> > >> > > > > > >>> > > > > > would benefit, since we don't know how many
> > >> applications
> > >> > > > > there
> > >> > > > > > >>> are,
> > >> > > > > > >>> > > > > > or anything about their data sets or
> topologies.
> > >> We could
> > >> > > > > do a
> > >> > > > > > >>> survey,
> > >> > > > > > >>> > > > > > but it seems overkill if we take the
> conservative
> > >> approach.
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>> > > > > > I have my own practical stream processing
> > >> experience that
> > >> > > > > > >>> tells me this
> > >> > > > > > >>> > > > > > is absolutely critical for any
> moderate-to-large
> > >> relational
> > >> > > > > > >>> stream
> > >> > > > > > >>> > > > > > processing use cases. I'll leave it to you to
> > >> decide if you
> > >> > > > > > >>> find that
> > >> > > > > > >>> > > > > > convincing, but it's definitely not an
> > >> _assumption_. I've
> > >> > > > > also
> > >> > > > > > >>> heard from
> > >> > > > > > >>> > > > > > a few Streams users who have already had to
> > >> implement
> > >> > > > > their own
> > >> > > > > > >>> > > > > > noop-suppression transformers in order to get
> to
> > >> production
> > >> > > > > > >>> scale.
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>> > > > > > Regardless, it sounds like we can agree on
> taking
> > >> an
> > >> > > > > > >>> opportunistic approach
> > >> > > > > > >>> > > > > > and targeting the optimization just to use a
> > >> > > > > binary-equality
> > >> > > > > > >>> check at
> > >> > > > > > >>> > > > > > stateful operators. (I'd also suggest in sink
> > >> nodes, when
> > >> > > > > we
> > >> > > > > > >>> are about to
> > >> > > > > > >>> > > > > > send old and new values, since they are also
> > >> already
> > >> > > > > present
> > >> > > > > > >>> and serialized
> > >> > > > > > >>> > > > > > at that point.) We could make the KIP even
> more
> > >> vague, and
> > >> > > > > > >>> just say that
> > >> > > > > > >>> > > > > > we'll drop no-op updates "when possible".
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>> > > > > > I'm curious what Bruno and the others think
> about
> > >> this. If
> > >> > > > > it
> > >> > > > > > >>> seems like
> > >> > > > > > >>> > > > > > a good starting point, perhaps we could move
> to a
> > >> vote soon
> > >> > > > > > >>> and get to
> > >> > > > > > >>> > > > > > work on the implementation!
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>> > > > > > Thanks,
> > >> > > > > > >>> > > > > > -John
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J.
> Sax
> > >> wrote:
> > >> > > > > > >>> > > > > > > Talking about optimizations and reducing
> > >> downstream load:
> > >> > > > > > >>> > > > > > >
> > >> > > > > > >>> > > > > > > Do we actually have any numbers? I have the
> > >> impression
> > >> > > > > that
> > >> > > > > > >>> this KIP is
> > >> > > > > > >>> > > > > > > more or less build on the _assumption_ that
> > >> there is a
> > >> > > > > > >>> problem. Yes,
> > >> > > > > > >>> > > > > > > there are some use cases that would benefit
> > >> from this;
> > >> > > > > But
> > >> > > > > > >>> how many
> > >> > > > > > >>> > > > > > > applications would actually benefit? And how
> > >> much load
> > >> > > > > > >>> reduction would
> > >> > > > > > >>> > > > > > > they get?
> > >> > > > > > >>> > > > > > >
> > >> > > > > > >>> > > > > > > The simplest approach (following John idea
> to
> > >> make baby
> > >> > > > > > >>> steps) would be
> > >> > > > > > >>> > > > > > > to apply the emit-on-change pattern only if
> > >> there is a
> > >> > > > > > >>> store. For this
> > >> > > > > > >>> > > > > > > case we need to serialize old and new result
> > >> anyway and
> > >> > > > > thus
> > >> > > > > > >>> a simple
> > >> > > > > > >>> > > > > > > byte-array comparison is no overhead.
> > >> > > > > > >>> > > > > > >
> > >> > > > > > >>> > > > > > > Sending `oldValues` by default would become
> > >> expensive
> > >> > > > > > >>> because we would
> > >> > > > > > >>> > > > > > > need to serialize the recomputed old
> result, as
> > >> well as
> > >> > > > > the
> > >> > > > > > >>> new result,
> > >> > > > > > >>> > > > > > > to make the comparison (and we now the
> > >> serialization is
> > >> > > > > not
> > >> > > > > > >>> cheap). We
> > >> > > > > > >>> > > > > > > are facing a trade-off between CPU overhead
> and
> > >> > > > > downstream
> > >> > > > > > >>> load and I am
> > >> > > > > > >>> > > > > > > not sure if we should hard code this. My
> > >> original
> > >> > > > > argument
> > >> > > > > > >>> for sending
> > >> > > > > > >>> > > > > > > `oldValues` was about semantics; but for an
> > >> > > > > optimization, I
> > >> > > > > > >>> am not sure
> > >> > > > > > >>> > > > > > > if this would be the right choice.
> > >> > > > > > >>> > > > > > >
> > >> > > > > > >>> > > > > > > For now, users who want to opt-in can force
> a
> > >> > > > > > >>> materialization. A
> > >> > > > > > >>> > > > > > > materialization may be expensive and if we
> see
> > >> future
> > >> > > > > > >>> demand, we could
> > >> > > > > > >>> > > > > > > still add an option to send `oldValues`
> instead
> > >> of
> > >> > > > > > >>> materialization (this
> > >> > > > > > >>> > > > > > > would at least save the store overhead). As
> we
> > >> consider
> > >> > > > > the
> > >> > > > > > >>> KIP an
> > >> > > > > > >>> > > > > > > optimization, a "config" seems to make
> sense.
> > >> > > > > > >>> > > > > > >
> > >> > > > > > >>> > > > > > >
> > >> > > > > > >>> > > > > > > -Matthias
> > >> > > > > > >>> > > > > > >
> > >> > > > > > >>> > > > > > >
> > >> > > > > > >>> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote:
> > >> > > > > > >>> > > > > > > > Hi John!
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > Thanks for the reply.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > About the changes we have discussed so
> far. I
> > >> think
> > >> > > > > upon
> > >> > > > > > >>> further
> > >> > > > > > >>> > > > > > > > consideration, we have been mostly talking
> > >> about this
> > >> > > > > from
> > >> > > > > > >>> the
> > >> > > > > > >>> > > > > > perspective
> > >> > > > > > >>> > > > > > > > that no stop-gap effort is acceptable.
> > >> However, in
> > >> > > > > recent
> > >> > > > > > >>> discussion,
> > >> > > > > > >>> > > > > > > if we
> > >> > > > > > >>> > > > > > > > consider optimization, then it appears
> that
> > >> the
> > >> > > > > > >>> perspective I
> > >> > > > > > >>> > > > > > mentioned no
> > >> > > > > > >>> > > > > > > > longer applies. After all, we are no
> longer
> > >> concerned
> > >> > > > > so
> > >> > > > > > >>> much about
> > >> > > > > > >>> > > > > > > > semantics correctness, then reducing
> traffic
> > >> as much as
> > >> > > > > > >>> possible
> > >> > > > > > >>> > > > > > without
> > >> > > > > > >>> > > > > > > > performance tradeoffs.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > In this case, I think a cache would be a
> good
> > >> idea for
> > >> > > > > > >>> stateless
> > >> > > > > > >>> > > > > > > > operations. This cache will not be backed
> by
> > >> a store
> > >> > > > > > >>> obviously. We can
> > >> > > > > > >>> > > > > > > > probably use Kafka's ThreadCache. We
> should
> > >> be able to
> > >> > > > > > >>> catch a large
> > >> > > > > > >>> > > > > > > > portion of the no-ops if we at least store
> > >> some
> > >> > > > > results in
> > >> > > > > > >>> the cache.
> > >> > > > > > >>> > > > > > Not
> > >> > > > > > >>> > > > > > > > all will be caught, but I think the impact
> > >> will be
> > >> > > > > > >>> significant.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > On another note, I think that we should
> > >> implement
> > >> > > > > > >>> competing proposals
> > >> > > > > > >>> > > > > > i.e.
> > >> > > > > > >>> > > > > > > > one where we forward both old and new
> values
> > >> with a
> > >> > > > > > >>> reasonable
> > >> > > > > > >>> > > > > > proportion
> > >> > > > > > >>> > > > > > > > of artificial no-ops (we do not
> necessarily
> > >> have to
> > >> > > > > rely
> > >> > > > > > >>> on equals so
> > >> > > > > > >>> > > > > > much
> > >> > > > > > >>> > > > > > > > as comparing the serialized binary data
> after
> > >> the
> > >> > > > > > >>> operation), and in
> > >> > > > > > >>> > > > > > > > another scenario, the cache for stateless
> > >> ops. It
> > >> > > > > would be
> > >> > > > > > >>> > > > > > unreasonable if
> > >> > > > > > >>> > > > > > > > we completely disregard either approach,
> > >> since they
> > >> > > > > both
> > >> > > > > > >>> have merit.
> > >> > > > > > >>> > > > > > The
> > >> > > > > > >>> > > > > > > > reason for implementing both is to perform
> > >> benchmark
> > >> > > > > tests
> > >> > > > > > >>> on them, and
> > >> > > > > > >>> > > > > > > > compare them with the original. This way,
> we
> > >> can more
> > >> > > > > > >>> clearly see what
> > >> > > > > > >>> > > > > > is
> > >> > > > > > >>> > > > > > > > the drawbacks and the gains. So far, we
> have
> > >> been
> > >> > > > > > >>> discussing only
> > >> > > > > > >>> > > > > > > > hypotheticals, and if we continue to do
> so, I
> > >> think it
> > >> > > > > is
> > >> > > > > > >>> likely no
> > >> > > > > > >>> > > > > > ground
> > >> > > > > > >>> > > > > > > > will be gained.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > After all, what we seek is optimization,
> and
> > >> > > > > performance
> > >> > > > > > >>> benchmarks
> > >> > > > > > >>> > > > > > > will be
> > >> > > > > > >>> > > > > > > > mandatory for a KIP of this nature.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > Hope this helps,
> > >> > > > > > >>> > > > > > > > Richard
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John
> Roesler <
> > >> > > > > > >>> vvcep...@apache.org>
> > >> > > > > > >>> > > > > > wrote:
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >> Hi again, all,
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> Sorry on my part for my silence.
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> I've just taken another look over the
> recent
> > >> history
> > >> > > > > of
> > >> > > > > > >>> this
> > >> > > > > > >>> > > > > > > discussion. It
> > >> > > > > > >>> > > > > > > >> seems like the #1 point to clarify
> (because
> > >> it affect
> > >> > > > > > >>> everything
> > >> > > > > > >>> > > > > > else) is
> > >> > > > > > >>> > > > > > > >> that,
> > >> > > > > > >>> > > > > > > >> yes, I was 100% envisioning this as an
> > >> _optimization_.
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> As a consequence, I don't think it's
> > >> critical to make
> > >> > > > > any
> > >> > > > > > >>> hard
> > >> > > > > > >>> > > > > > guarantees
> > >> > > > > > >>> > > > > > > >> about
> > >> > > > > > >>> > > > > > > >> what results get forwarded and what
> (no-op
> > >> updates)
> > >> > > > > get
> > >> > > > > > >>> dropped. I'd
> > >> > > > > > >>> > > > > > > >> initially
> > >> > > > > > >>> > > > > > > >> just been thinking about doing this
> > >> opportunistically
> > >> > > > > in
> > >> > > > > > >>> cases where
> > >> > > > > > >>> > > > > > we
> > >> > > > > > >>> > > > > > > >> already
> > >> > > > > > >>> > > > > > > >> had the "old" and "new" result in memory,
> > >> thanks to a
> > >> > > > > > >>> request to
> > >> > > > > > >>> > > > > > > "emit old
> > >> > > > > > >>> > > > > > > >> values", or to the implementation of
> > >> timestamp
> > >> > > > > semantics.
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> However, whether or not it's semantically
> > >> critical, I
> > >> > > > > do
> > >> > > > > > >>> think that
> > >> > > > > > >>> > > > > > > >> Matthias's
> > >> > > > > > >>> > > > > > > >> idea to use the change-forwarding
> mechanism
> > >> to check
> > >> > > > > for
> > >> > > > > > >>> no-ops even
> > >> > > > > > >>> > > > > > on
> > >> > > > > > >>> > > > > > > >> stateless operations is pretty
> interesting.
> > >> > > > > Specifically,
> > >> > > > > > >>> this would
> > >> > > > > > >>> > > > > > > >> _really_
> > >> > > > > > >>> > > > > > > >> let you pare down useless updates by
> using
> > >> mapValues
> > >> > > > > to
> > >> > > > > > >>> strip down
> > >> > > > > > >>> > > > > > > records
> > >> > > > > > >>> > > > > > > >> only
> > >> > > > > > >>> > > > > > > >> to what you really need. However, the
> > >> dependence on
> > >> > > > > the
> > >> > > > > > >>> > > > > > implementation of
> > >> > > > > > >>> > > > > > > >> equals() is troubling.
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> It might make sense to table this idea,
> as
> > >> well as my
> > >> > > > > > >>> complicated
> > >> > > > > > >>> > > > > > no-op
> > >> > > > > > >>> > > > > > > >> detection algorithm, and initially
> propose
> > >> just a
> > >> > > > > > >>> nonconfigurable
> > >> > > > > > >>> > > > > > feature
> > >> > > > > > >>> > > > > > > >> to
> > >> > > > > > >>> > > > > > > >> check "old" and "new" results for binary
> > >> equality
> > >> > > > > before
> > >> > > > > > >>> forwarding.
> > >> > > > > > >>> > > > > > > I.e.,
> > >> > > > > > >>> > > > > > > >> if
> > >> > > > > > >>> > > > > > > >> any operation determines that the old and
> > >> new results
> > >> > > > > are
> > >> > > > > > >>> > > > > > > >> binary-identical, we
> > >> > > > > > >>> > > > > > > >> would not forward.
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> I'll admit that this doesn't serve
> Tommy's
> > >> use case
> > >> > > > > very
> > >> > > > > > >>> well, but it
> > >> > > > > > >>> > > > > > > >> might be
> > >> > > > > > >>> > > > > > > >> better to take baby steps with an
> > >> optimization like
> > >> > > > > this
> > >> > > > > > >>> and not risk
> > >> > > > > > >>> > > > > > > >> over-reaching in a way that actually
> harms
> > >> > > > > performance or
> > >> > > > > > >>> > > > > > correctness. We
> > >> > > > > > >>> > > > > > > >> could
> > >> > > > > > >>> > > > > > > >> always expand the feature to use
> equals() or
> > >> some
> > >> > > > > kind of
> > >> > > > > > >>> > > > > > ChangeDetector
> > >> > > > > > >>> > > > > > > >> later
> > >> > > > > > >>> > > > > > > >> on, in a more focused discussion.
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> Regarding metrics or debug logs, I guess
> I
> > >> don't feel
> > >> > > > > > >>> strongly, but it
> > >> > > > > > >>> > > > > > > >> feels
> > >> > > > > > >>> > > > > > > >> like two things will happen that make it
> > >> nicer to add
> > >> > > > > > >>> them:
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> 1. This feature is going to
> surprise/annoy
> > >> _somebody_,
> > >> > > > > > >>> and it would be
> > >> > > > > > >>> > > > > > > >> nice to
> > >> > > > > > >>> > > > > > > >> be able to definitively say the reason
> that
> > >> updates
> > >> > > > > are
> > >> > > > > > >>> dropped is
> > >> > > > > > >>> > > > > > that
> > >> > > > > > >>> > > > > > > >> they
> > >> > > > > > >>> > > > > > > >> were no-ops. The easiest smoking gun is
> if
> > >> there are
> > >> > > > > > >>> debug-logs that
> > >> > > > > > >>> > > > > > > can be
> > >> > > > > > >>> > > > > > > >> enabled. This person might just be
> looking
> > >> at the
> > >> > > > > > >>> dashboards,
> > >> > > > > > >>> > > > > > > wondering why
> > >> > > > > > >>> > > > > > > >> there are 100K updates per second going
> into
> > >> their
> > >> > > > > app,
> > >> > > > > > >>> but only 1K
> > >> > > > > > >>> > > > > > > >> results per
> > >> > > > > > >>> > > > > > > >> second coming out. Having the metric
> there
> > >> makes the
> > >> > > > > > >>> accounting
> > >> > > > > > >>> > > > > > easier.
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> 2. Somebody is going to struggle with
> > >> high-volume
> > >> > > > > > >>> updates, and it
> > >> > > > > > >>> > > > > > > would be
> > >> > > > > > >>> > > > > > > >> nice
> > >> > > > > > >>> > > > > > > >> for them to know that this feature is
> saving
> > >> them
> > >> > > > > > >>> X-thousand updates
> > >> > > > > > >>> > > > > > per
> > >> > > > > > >>> > > > > > > >> second,
> > >> > > > > > >>> > > > > > > >> etc.
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> What does everyone think about this?
> Note,
> > >> as I read
> > >> > > > > it,
> > >> > > > > > >>> what I've
> > >> > > > > > >>> > > > > > said
> > >> > > > > > >>> > > > > > > >> above is
> > >> > > > > > >>> > > > > > > >> already reflected in the text of the KIP.
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> Thanks,
> > >> > > > > > >>> > > > > > > >> -John
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard
> Yu
> > >> wrote:
> > >> > > > > > >>> > > > > > > >>> Hi all,
> > >> > > > > > >>> > > > > > > >>>
> > >> > > > > > >>> > > > > > > >>> Bumping this. If you feel that this KIP
> is
> > >> not too
> > >> > > > > > >>> urgent. Then let
> > >> > > > > > >>> > > > > > me
> > >> > > > > > >>> > > > > > > >>> know. :)
> > >> > > > > > >>> > > > > > > >>>
> > >> > > > > > >>> > > > > > > >>> Cheers,
> > >> > > > > > >>> > > > > > > >>> Richard
> > >> > > > > > >>> > > > > > > >>>
> > >> > > > > > >>> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard
> Yu <
> > >> > > > > > >>> > > > > > yohan.richard...@gmail.com>
> > >> > > > > > >>> > > > > > > >>> wrote:
> > >> > > > > > >>> > > > > > > >>>
> > >> > > > > > >>> > > > > > > >>>> Hi all,
> > >> > > > > > >>> > > > > > > >>>>
> > >> > > > > > >>> > > > > > > >>>> I've had just a few thoughts regarding
> the
> > >> > > > > forwarding
> > >> > > > > > >>> of <key,
> > >> > > > > > >>> > > > > > > >>>> change<old_value, new_value>>. As
> Matthias
> > >> already
> > >> > > > > > >>> mentioned, there
> > >> > > > > > >>> > > > > > > >> are two
> > >> > > > > > >>> > > > > > > >>>> separate priorities by which we can
> judge
> > >> this KIP:
> > >> > > > > > >>> > > > > > > >>>>
> > >> > > > > > >>> > > > > > > >>>> 1. A optimization perspective: In this
> > >> case, the
> > >> > > > > user
> > >> > > > > > >>> would prefer
> > >> > > > > > >>> > > > > > the
> > >> > > > > > >>> > > > > > > >>>> impact of this KIP to be as minimal as
> > >> possible. By
> > >> > > > > > >>> such logic, if
> > >> > > > > > >>> > > > > > > >>>> stateless operations are performed
> twice,
> > >> that could
> > >> > > > > > >>> prove
> > >> > > > > > >>> > > > > > > >> unacceptable for
> > >> > > > > > >>> > > > > > > >>>> them. (since operations can prove
> > >> expensive)
> > >> > > > > > >>> > > > > > > >>>>
> > >> > > > > > >>> > > > > > > >>>> 2. Semantics correctness perspective:
> > >> Unlike the
> > >> > > > > > >>> optimization
> > >> > > > > > >>> > > > > > > >> approach, we
> > >> > > > > > >>> > > > > > > >>>> are more concerned with all KTable
> > >> operations
> > >> > > > > obeying
> > >> > > > > > >>> the same
> > >> > > > > > >>> > > > > > emission
> > >> > > > > > >>> > > > > > > >>>> policy. i.e. emit on change. In this
> case,
> > >> a
> > >> > > > > > >>> discrepancy would not
> > >> > > > > > >>> > > > > > be
> > >> > > > > > >>> > > > > > > >>>> tolerated, even though an extra
> > >> performance cost
> > >> > > > > will
> > >> > > > > > >>> be incurred.
> > >> > > > > > >>> > > > > > > >>>> Therefore, we will follow Matthias's
> > >> approach, and
> > >> > > > > then
> > >> > > > > > >>> perform the
> > >> > > > > > >>> > > > > > > >>>> operation once on the old value, and
> once
> > >> on the
> > >> > > > > new.
> > >> > > > > > >>> > > > > > > >>>>
> > >> > > > > > >>> > > > > > > >>>> The issue here I think is more black
> and
> > >> white than
> > >> > > > > in
> > >> > > > > > >>> between. The
> > >> > > > > > >>> > > > > > > >> second
> > >> > > > > > >>> > > > > > > >>>> option in particular would be favorable
> > >> for users
> > >> > > > > with
> > >> > > > > > >>> inexpensive
> > >> > > > > > >>> > > > > > > >>>> stateless operations, while for the
> former
> > >> option,
> > >> > > > > we
> > >> > > > > > >>> are probably
> > >> > > > > > >>> > > > > > > >> dealing
> > >> > > > > > >>> > > > > > > >>>> with more expensive ones. So the
> simplest
> > >> solution
> > >> > > > > is
> > >> > > > > > >>> probably to
> > >> > > > > > >>> > > > > > > >> allow the
> > >> > > > > > >>> > > > > > > >>>> user to choose one of the behaviors,
> and
> > >> have a
> > >> > > > > config
> > >> > > > > > >>> which can
> > >> > > > > > >>> > > > > > > >> switch in
> > >> > > > > > >>> > > > > > > >>>> between them.
> > >> > > > > > >>> > > > > > > >>>>
> > >> > > > > > >>> > > > > > > >>>> Its the simplest compromise I can come
> up
> > >> with at
> > >> > > > > the
> > >> > > > > > >>> moment, but if
> > >> > > > > > >>> > > > > > > >> you
> > >> > > > > > >>> > > > > > > >>>> think you have a better plan which
> could
> > >> better
> > >> > > > > balance
> > >> > > > > > >>> tradeoffs.
> > >> > > > > > >>> > > > > > Then
> > >> > > > > > >>> > > > > > > >>>> please let us know. :)
> > >> > > > > > >>> > > > > > > >>>>
> > >> > > > > > >>> > > > > > > >>>> Best,
> > >> > > > > > >>> > > > > > > >>>> Richard
> > >> > > > > > >>> > > > > > > >>>>
> > >> > > > > > >>> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John
> > >> Roesler <
> > >> > > > > > >>> vvcep...@apache.org>
> > >> > > > > > >>> > > > > > > >> wrote:
> > >> > > > > > >>> > > > > > > >>>>
> > >> > > > > > >>> > > > > > > >>>>> Hi all,
> > >> > > > > > >>> > > > > > > >>>>>
> > >> > > > > > >>> > > > > > > >>>>> Thanks for the thoughtful comments!
> > >> > > > > > >>> > > > > > > >>>>>
> > >> > > > > > >>> > > > > > > >>>>> I need more time to reflect on your
> > >> thoughts, but
> > >> > > > > just
> > >> > > > > > >>> wanted to
> > >> > > > > > >>> > > > > > offer
> > >> > > > > > >>> > > > > > > >>>>> a quick clarification about equals().
> > >> > > > > > >>> > > > > > > >>>>>
> > >> > > > > > >>> > > > > > > >>>>> I only meant that we can't be sure if
> a
> > >> class's
> > >> > > > > > >>> equals()
> > >> > > > > > >>> > > > > > > >> implementation
> > >> > > > > > >>> > > > > > > >>>>> returns true for two semantically
> > >> identical
> > >> > > > > instances.
> > >> > > > > > >>> I.e., if a
> > >> > > > > > >>> > > > > > > >> class
> > >> > > > > > >>> > > > > > > >>>>> doesn't
> > >> > > > > > >>> > > > > > > >>>>> override the default equals()
> > >> implementation, then
> > >> > > > > we
> > >> > > > > > >>> would see
> > >> > > > > > >>> > > > > > > >> behavior
> > >> > > > > > >>> > > > > > > >>>>> like:
> > >> > > > > > >>> > > > > > > >>>>>
> > >> > > > > > >>> > > > > > > >>>>> new MyPair("A", 1).equals(new
> MyPair("A",
> > >> 1))
> > >> > > > > returns
> > >> > > > > > >>> false
> > >> > > > > > >>> > > > > > > >>>>>
> > >> > > > > > >>> > > > > > > >>>>> In that case, I would still like to
> catch
> > >> no-op
> > >> > > > > > >>> updates by
> > >> > > > > > >>> > > > > > comparing
> > >> > > > > > >>> > > > > > > >> the
> > >> > > > > > >>> > > > > > > >>>>> serialized form of the records when we
> > >> happen to
> > >> > > > > have
> > >> > > > > > >>> it serialized
> > >> > > > > > >>> > > > > > > >> anyway
> > >> > > > > > >>> > > > > > > >>>>> (such as when the operation is
> stateful,
> > >> or when
> > >> > > > > we're
> > >> > > > > > >>> sending to a
> > >> > > > > > >>> > > > > > > >>>>> repartition topic and we have both the
> > >> "new" and
> > >> > > > > "old"
> > >> > > > > > >>> value from
> > >> > > > > > >>> > > > > > > >>>>> upstream).
> > >> > > > > > >>> > > > > > > >>>>>
> > >> > > > > > >>> > > > > > > >>>>> I didn't mean to suggest we'd try to
> use
> > >> > > > > reflection to
> > >> > > > > > >>> detect
> > >> > > > > > >>> > > > > > whether
> > >> > > > > > >>> > > > > > > >>>>> equals
> > >> > > > > > >>> > > > > > > >>>>> is implemented, although that is a
> neat
> > >> trick. I
> > >> > > > > was
> > >> > > > > > >>> thinking more
> > >> > > > > > >>> > > > > > of
> > >> > > > > > >>> > > > > > > >> a
> > >> > > > > > >>> > > > > > > >>>>> belt-and-suspenders algorithm where
> we do
> > >> the check
> > >> > > > > > >>> for no-ops
> > >> > > > > > >>> > > > > > based
> > >> > > > > > >>> > > > > > > >> on
> > >> > > > > > >>> > > > > > > >>>>> equals() and then _also_ check the
> > >> serialized bytes
> > >> > > > > > >>> for equality.
> > >> > > > > > >>> > > > > > > >>>>>
> > >> > > > > > >>> > > > > > > >>>>> Thanks,
> > >> > > > > > >>> > > > > > > >>>>> -John
> > >> > > > > > >>> > > > > > > >>>>>
> > >> > > > > > >>> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu
> > >> wrote:
> > >> > > > > > >>> > > > > > > >>>>>> Thanks for the comments, Matthias.
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > > > >>>>>> w.r.t. requirement of an `equals()`
> > >> > > > > implementation,
> > >> > > > > > >>> each template
> > >> > > > > > >>> > > > > > > >> type
> > >> > > > > > >>> > > > > > > >>>>>> would have an equals() method. We can
> > >> use the
> > >> > > > > > >>> following code to
> > >> > > > > > >>> > > > > > know
> > >> > > > > > >>> > > > > > > >>>>>> whether it is provided by JVM or
> > >> provided by user.
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > > > >>>>>> boolean customEquals = false;
> > >> > > > > > >>> > > > > > > >>>>>> try {
> > >> > > > > > >>> > > > > > > >>>>>>     Class cls =
> > >> > > > > value.getClass().getMethod("equals",
> > >> > > > > > >>> > > > > > > >>>>>> Object.class).getDeclaringClass();
> > >> > > > > > >>> > > > > > > >>>>>>     if (!Object.class.equals(cls)) {
> > >> > > > > > >>> > > > > > > >>>>>>         customEquals = true;
> > >> > > > > > >>> > > > > > > >>>>>>     }
> > >> > > > > > >>> > > > > > > >>>>>> } catch (NoSuchMethodException nsme)
> {
> > >> > > > > > >>> > > > > > > >>>>>>     // equals is always defined, this
> > >> wouldn't hit
> > >> > > > > > >>> > > > > > > >>>>>> }
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > > > >>>>>> The next question is: what if the
> user
> > >> doesn't
> > >> > > > > > >>> provide equals()
> > >> > > > > > >>> > > > > > > >> method ?
> > >> > > > > > >>> > > > > > > >>>>>> Would we automatically fall back to
> > >> > > > > emit-on-update ?
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > > > >>>>>> Cheers
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM
> Matthias
> > >> J. Sax <
> > >> > > > > > >>> mj...@apache.org>
> > >> > > > > > >>> > > > > > > >>>>> wrote:
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > > > > First a high level comment:
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > Overall, I would like to make one step
> back,
> > >> and make
> > >> > > > > sure
> > >> > > > > > >>> we are
> > >> > > > > > >>> > > > > > > > discussion on the same level. Originally,
> I
> > >> understood
> > >> > > > > > >>> this KIP
> > >> > > > > > >>> > > > > > > >>> as a
> > >> > > > > > >>> > > > > > > > proposed change of _semantics_, however,
> > >> given the
> > >> > > > > latest
> > >> > > > > > >>> > > > > > > >>> discussion
> > >> > > > > > >>> > > > > > > > it seems it's actually not -- it's more an
> > >> > > > > _optimization_
> > >> > > > > > >>> > > > > > > >>> proposal.
> > >> > > > > > >>> > > > > > > > Hence, we only need to make sure that this
> > >> optimization
> > >> > > > > > >>> does not
> > >> > > > > > >>> > > > > > > >>> break
> > >> > > > > > >>> > > > > > > > existing semantics. It this the right way
> to
> > >> think
> > >> > > > > about
> > >> > > > > > >>> it?
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > If yes, than it might actually be ok to
> have
> > >> different
> > >> > > > > > >>> behavior
> > >> > > > > > >>> > > > > > > > depending if there is a materialized
> KTable
> > >> or not. So
> > >> > > > > > >>> far, we
> > >> > > > > > >>> > > > > > > >>> never
> > >> > > > > > >>> > > > > > > > defined a public contract about our emit
> > >> strategy and
> > >> > > > > it
> > >> > > > > > >>> seems
> > >> > > > > > >>> > > > > > > >>> this
> > >> > > > > > >>> > > > > > > > KIP does not define one either.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > Hence, I don't have as strong of an
> opinion
> > >> about
> > >> > > > > sending
> > >> > > > > > >>> > > > > > > >>> oldValues
> > >> > > > > > >>> > > > > > > > for example any longer. I guess the
> question
> > >> is really,
> > >> > > > > > >>> what can
> > >> > > > > > >>> > > > > > > >>> we
> > >> > > > > > >>> > > > > > > > implement in a reasonable way.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > Other comments:
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > @Richard:
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > Can you please add the KIP to the KIP
> > >> overview table:
> > >> > > > > It's
> > >> > > > > > >>> missing
> > >> > > > > > >>> > > > > > > > (
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > > > >>>
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>>
> > >> > > > >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro
> > >> > > > > > >>> > > > > > > > posals).
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > @Bruno:
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > You mentioned caching. I think it's
> irrelevant
> > >> > > > > > >>> (orthogonal) and
> > >> > > > > > >>> > > > > > > >>> we can
> > >> > > > > > >>> > > > > > > > discuss this KIP without considering it.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > @John:
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >>>>>>>>> Even in the source table, we
> forward
> > >> the
> > >> > > > > updated
> > >> > > > > > >>> record with
> > >> > > > > > >>> > > > > > the
> > >> > > > > > >>> > > > > > > >>>>>>>>> higher of the two timestamps. So
> the
> > >> example is
> > >> > > > > > >>> more like:
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > That is not correct. Currently, we forward
> > >> with the
> > >> > > > > smaller
> > >> > > > > > >>> > > > > > > > out-of-order timestamp (changing the
> > >> timestamp would
> > >> > > > > > >>> corrupt the
> > >> > > > > > >>> > > > > > > >>> data
> > >> > > > > > >>> > > > > > > > -- we don't know, because we don't check,
> if
> > >> the value
> > >> > > > > is
> > >> > > > > > >>> the
> > >> > > > > > >>> > > > > > > >>> same
> > >> > > > > > >>> > > > > > > >>>>>> or
> > >> > > > > > >>> > > > > > > > a different one, hence, we must emit the
> > >> out-of-order
> > >> > > > > > >>> record
> > >> > > > > > >>> > > > > > > >>> as-is).
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > If we start to do emit-on-change, we also
> > >> need to emit
> > >> > > > > a
> > >> > > > > > >>> new
> > >> > > > > > >>> > > > > > > >>> record if
> > >> > > > > > >>> > > > > > > > the timestamp changes due to out-of-order
> > >> data, hence,
> > >> > > > > we
> > >> > > > > > >>> would
> > >> > > > > > >>> > > > > > > >>> still
> > >> > > > > > >>> > > > > > > > need to emit <K,V,T1> because that give us
> > >> correct
> > >> > > > > > >>> semantics:
> > >> > > > > > >>> > > > > > > >>> assume
> > >> > > > > > >>> > > > > > > > you have a filter() and afterward use the
> > >> filter
> > >> > > > > KTable in
> > >> > > > > > >>> a
> > >> > > > > > >>> > > > > > > > stream-table join -- the lower T1
> timestamp
> > >> must be
> > >> > > > > > >>> propagated to
> > >> > > > > > >>> > > > > > > >>> the
> > >> > > > > > >>> > > > > > > > filtered KTable to ensure that that the
> > >> stream-table
> > >> > > > > join
> > >> > > > > > >>> compute
> > >> > > > > > >>> > > > > > > >>> the
> > >> > > > > > >>> > > > > > > > correct result.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > Your point about requiring an `equals()`
> > >> > > > > implementation is
> > >> > > > > > >>> > > > > > > >>> actually a
> > >> > > > > > >>> > > > > > > > quite interesting one and boils down to my
> > >> statement
> > >> > > > > from
> > >> > > > > > >>> above
> > >> > > > > > >>> > > > > > > >>> about
> > >> > > > > > >>> > > > > > > > "what can we actually implement". What I
> don't
> > >> > > > > understand
> > >> > > > > > >>> is:
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >>>>>>>>> This way, we still don't have to
> rely
> > >> on the
> > >> > > > > > >>> existence of an
> > >> > > > > > >>> > > > > > > >>>>>>>>> equals() method, but if it is
> there,
> > >> we can
> > >> > > > > > >>> benefit from it.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > Your bullet point (2) says it uses
> `equals()`
> > >> --
> > >> > > > > hence, it
> > >> > > > > > >>> seems
> > >> > > > > > >>> > > > > > > >>> we
> > >> > > > > > >>> > > > > > > > actually to rely on it? Also, how can we
> > >> detect if
> > >> > > > > there
> > >> > > > > > >>> is an
> > >> > > > > > >>> > > > > > > > `equals()` method to do the comparison?
> Would
> > >> be fail
> > >> > > > > if
> > >> > > > > > >>> we don't
> > >> > > > > > >>> > > > > > > >>> have
> > >> > > > > > >>> > > > > > > > `equals()` nor corresponding serializes
> to do
> > >> the
> > >> > > > > > >>> comparison?
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we
> > >> absolutely need
> > >> > > > > > >>> metrics and
> > >> > > > > > >>> > > > > > > >>> logs if
> > >> > > > > > >>> > > > > > > >>>>>>>>> we're going to drop any records.
> And,
> > >> yes, we
> > >> > > > > > >>> should propose
> > >> > > > > > >>> > > > > > > >>>>>>>>> metrics and logs that are similar
> to
> > >> the
> > >> > > > > existing
> > >> > > > > > >>> ones when we
> > >> > > > > > >>> > > > > > > >>> drop
> > >> > > > > > >>> > > > > > > >>>>>>>>> records for other reasons.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > I am not sure about this point. In fact,
> we
> > >> have
> > >> > > > > already
> > >> > > > > > >>> some
> > >> > > > > > >>> > > > > > > >>> no-ops
> > >> > > > > > >>> > > > > > > > in Kafka Streams in our join-operators and
> > >> don't report
> > >> > > > > > >>> any of
> > >> > > > > > >>> > > > > > > >>> those
> > >> > > > > > >>> > > > > > > > either. Emit-on-change is operator
> semantics
> > >> and I
> > >> > > > > don't
> > >> > > > > > >>> see why
> > >> > > > > > >>> > > > > > > >>> we
> > >> > > > > > >>> > > > > > > > would need to have a metric for it? It
> seems
> > >> to be
> > >> > > > > quite
> > >> > > > > > >>> different
> > >> > > > > > >>> > > > > > > > compared to dropping late or malformed
> > >> records.
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > -Matthias
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote:
> > >> > > > > > >>> > > > > > > >>>>>>>>> Thanks John for your thoughtful
> > >> reply. Some
> > >> > > > > > >>> comments inline.
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 -0600,
> > >> John Roesler
> > >> > > > > > >>> wrote:
> > >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This
> > >> email was
> > >> > > > > sent
> > >> > > > > > >>> from outside
> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or
> > >> attachments
> > >> > > > > > >>> unless you
> > >> > > > > > >>> > > > > > expected
> > >> > > > > > >>> > > > > > > >>>>>>>>>> them.
> > >> ________________________________
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Tommy,
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Thanks for the context. I can
> see the
> > >> > > > > attraction
> > >> > > > > > >>> of
> > >> > > > > > >>> > > > > > considering
> > >> > > > > > >>> > > > > > > >>>>>>>>>> these use cases together.
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> To answer your question, if a
> part
> > >> of the
> > >> > > > > record
> > >> > > > > > >>> is not
> > >> > > > > > >>> > > > > > > >>> relevant
> > >> > > > > > >>> > > > > > > >>>>>>>>>> to downstream consumers, I was
> > >> thinking you
> > >> > > > > could
> > >> > > > > > >>> just use a
> > >> > > > > > >>> > > > > > > >>>>>>>>>> mapValue to remove it.
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a
> > >> join between
> > >> > > > > two
> > >> > > > > > >>> tables.
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> employeeInfo.join(
> employeePayroll,
> > >> (info,
> > >> > > > > > >>> payroll) -> new
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Result(info.name(),
> > >> payroll.salary()) )
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> We only care about one attribute
> > >> from the Info
> > >> > > > > > >>> table (name),
> > >> > > > > > >>> > > > > > > >>> and
> > >> > > > > > >>> > > > > > > >>>>>>>>>> one from the Payroll table
> (salary),
> > >> and these
> > >> > > > > > >>> attributes
> > >> > > > > > >>> > > > > > > >>> change
> > >> > > > > > >>> > > > > > > >>>>>>>>>> rarely. On the other hand, there
> > >> might be many
> > >> > > > > > >>> other
> > >> > > > > > >>> > > > > > attributes
> > >> > > > > > >>> > > > > > > >>>>>>>>>> that change frequently of these
> > >> tables. We can
> > >> > > > > > >>> avoid
> > >> > > > > > >>> > > > > > triggering
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the join unnecessarily by mapping
> > >> the input
> > >> > > > > > >>> tables to drop the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> unnecessary information before
> the
> > >> join:
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> names =
> employeeInfo.mapValues(info
> > >> ->
> > >> > > > > info.name())
> > >> > > > > > >>> salaries
> > >> > > > > > >>> > > > > > =
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> employeePayroll.mapValues(payroll ->
> > >> > > > > > >>> payroll.salary())
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> names.join( salaries, (name,
> salary)
> > >> -> new
> > >> > > > > > >>> Result(name,
> > >> > > > > > >>> > > > > > > >>> salary)
> > >> > > > > > >>> > > > > > > >>>>>>>>>> )
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> Ahh yes I see. This works, but in
> the
> > >> case
> > >> > > > > where
> > >> > > > > > >>> you're using
> > >> > > > > > >>> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it
> > >> seems like
> > >> > > > > this
> > >> > > > > > >>> approach
> > >> > > > > > >>> > > > > > could
> > >> > > > > > >>> > > > > > > >>>>>>>>> lead to a proliferation of
> "skinny"
> > >> record
> > >> > > > > types
> > >> > > > > > >>> that just drop
> > >> > > > > > >>> > > > > > > >>>>>>>>> various fields.
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Especially if we take Matthias's
> > >> idea to drop
> > >> > > > > > >>> non-changes even
> > >> > > > > > >>> > > > > > > >>>>>>>>>> for stateless operations, this
> would
> > >> be quite
> > >> > > > > > >>> efficient and is
> > >> > > > > > >>> > > > > > > >>>>>>>>>> also a very straightforward
> > >> optimization to
> > >> > > > > > >>> understand once
> > >> > > > > > >>> > > > > > you
> > >> > > > > > >>> > > > > > > >>>>>>>>>> know that Streams provides
> > >> emit-on-change.
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> From the context that you
> provided,
> > >> it seems
> > >> > > > > like
> > >> > > > > > >>> a slightly
> > >> > > > > > >>> > > > > > > >>>>>>>>>> different situation, though.
> Reading
> > >> between
> > >> > > > > the
> > >> > > > > > >>> lines a
> > >> > > > > > >>> > > > > > > >>> little,
> > >> > > > > > >>> > > > > > > >>>>>>>>>> it sounds like: in contrast to
> the
> > >> example
> > >> > > > > above,
> > >> > > > > > >>> in which we
> > >> > > > > > >>> > > > > > > >>> are
> > >> > > > > > >>> > > > > > > >>>>>>>>>> filtering out extra _data_, you
> have
> > >> some
> > >> > > > > extra
> > >> > > > > > >>> _metadata_
> > >> > > > > > >>> > > > > > that
> > >> > > > > > >>> > > > > > > >>>>>>>>>> you still wish to pass down with
> the
> > >> data when
> > >> > > > > > >>> there is a
> > >> > > > > > >>> > > > > > > >>> "real"
> > >> > > > > > >>> > > > > > > >>>>>>>>>> update, but you don't want the
> > >> metadata
> > >> > > > > itself to
> > >> > > > > > >>> cause an
> > >> > > > > > >>> > > > > > > >>>>>>>>>> update.
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> Despite my lack of clarity, yes
> > >> you've got it
> > >> > > > > > >>> right ;) This
> > >> > > > > > >>> > > > > > > >>>>>>>>> particular processor is the first
> > >> stop for this
> > >> > > > > > >>> data after
> > >> > > > > > >>> > > > > > > >>> coming
> > >> > > > > > >>> > > > > > > >>>>>>>>> in from external users, who often
> > >> simply post
> > >> > > > > the
> > >> > > > > > >>> same content
> > >> > > > > > >>> > > > > > > >>> each
> > >> > > > > > >>> > > > > > > >>>>>>>>> time and we're trying to shield
> > >> downstream
> > >> > > > > > >>> consumers from
> > >> > > > > > >>> > > > > > > >>>>>>>>> unnecessary churn.
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> It does seem handy to be able to
> > >> plug in a
> > >> > > > > custom
> > >> > > > > > >>> > > > > > > >>> ChangeDetector
> > >> > > > > > >>> > > > > > > >>>>>>>>>> for this purpose, but I worry
> about
> > >> the API
> > >> > > > > > >>> complexity. Maybe
> > >> > > > > > >>> > > > > > > >>> you
> > >> > > > > > >>> > > > > > > >>>>>>>>>> can help think though how to
> provide
> > >> the same
> > >> > > > > > >>> benefit while
> > >> > > > > > >>> > > > > > > >>>>>>>>>> limiting user-facing complexity.
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Here's some extra context to
> > >> consider:
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> We currently don't make any extra
> > >> requirements
> > >> > > > > > >>> about the
> > >> > > > > > >>> > > > > > nature
> > >> > > > > > >>> > > > > > > >>>>>>>>>> of data that you can use in
> Streams.
> > >> For
> > >> > > > > example,
> > >> > > > > > >>> you don't
> > >> > > > > > >>> > > > > > > >>> have
> > >> > > > > > >>> > > > > > > >>>>>>>>>> to implement hashCode and
> equals, or
> > >> > > > > compareTo,
> > >> > > > > > >>> etc. With the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> current proposal, we can do an
> > >> airtight
> > >> > > > > > >>> comparison based only
> > >> > > > > > >>> > > > > > > >>> on
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the serialized form of the
> values,
> > >> and we
> > >> > > > > > >>> actually don't have
> > >> > > > > > >>> > > > > > > >>> to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> deserialize the "prior" value at
> all
> > >> for a
> > >> > > > > large
> > >> > > > > > >>> number of
> > >> > > > > > >>> > > > > > > >>>>>>>>>> operations. Admitedly, if we
> extend
> > >> the
> > >> > > > > proposal
> > >> > > > > > >>> to include
> > >> > > > > > >>> > > > > > > >>> no-op
> > >> > > > > > >>> > > > > > > >>>>>>>>>> detection for stateless
> operations,
> > >> we'd
> > >> > > > > probably
> > >> > > > > > >>> need to rely
> > >> > > > > > >>> > > > > > > >>> on
> > >> > > > > > >>> > > > > > > >>>>>>>>>> equals() for no-op checking,
> > >> otherwise we'd
> > >> > > > > wind
> > >> > > > > > >>> up requiring
> > >> > > > > > >>> > > > > > > >>>>>>>>>> serdes for stateless operations
> as
> > >> well.
> > >> > > > > > >>> Actually, I'd
> > >> > > > > > >>> > > > > > probably
> > >> > > > > > >>> > > > > > > >>>>>>>>>> argue for doing exactly that:
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> 1. In stateful operations, drop
> if
> > >> the
> > >> > > > > serialized
> > >> > > > > > >>> byte[]s are
> > >> > > > > > >>> > > > > > > >>> the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> same. After deserializing, also
> drop
> > >> if the
> > >> > > > > > >>> objects are equal
> > >> > > > > > >>> > > > > > > >>>>>>>>>> according to Object#equals().
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> 2. In stateless operations,
> compare
> > >> the "new"
> > >> > > > > and
> > >> > > > > > >>> "old" values
> > >> > > > > > >>> > > > > > > >>>>>>>>>> (if "old" is available) based on
> > >> > > > > Object#equals().
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> 3. As a final optimization, after
> > >> serializing
> > >> > > > > and
> > >> > > > > > >>> before
> > >> > > > > > >>> > > > > > > >>> sending
> > >> > > > > > >>> > > > > > > >>>>>>>>>> repartition records, compare the
> > >> serialized
> > >> > > > > data
> > >> > > > > > >>> and drop
> > >> > > > > > >>> > > > > > > >>>>>>>>>> no-ops.
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> This way, we still don't have to
> > >> rely on the
> > >> > > > > > >>> existence of an
> > >> > > > > > >>> > > > > > > >>>>>>>>>> equals() method, but if it is
> there,
> > >> we can
> > >> > > > > > >>> benefit from it.
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Also, we don't require a serde in
> > >> any new
> > >> > > > > > >>> situations, but we
> > >> > > > > > >>> > > > > > > >>> can
> > >> > > > > > >>> > > > > > > >>>>>>>>>> still leverage it when it is
> > >> available.
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> For clarity, in my example above,
> > >> even if the
> > >> > > > > > >>> employeeInfo and
> > >> > > > > > >>> > > > > > > >>>>>>>>>> employeePayroll and Result
> records
> > >> all have
> > >> > > > > > >>> serdes, we need
> > >> > > > > > >>> > > > > > the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> "name" field (presumably String)
> and
> > >> the
> > >> > > > > "salary"
> > >> > > > > > >>> field
> > >> > > > > > >>> > > > > > > >>>>>>>>>> (presumable a Double) to have
> serdes
> > >> as well
> > >> > > > > in
> > >> > > > > > >>> the naive
> > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation. But if we can
> > >> leverage
> > >> > > > > equals(),
> > >> > > > > > >>> then the
> > >> > > > > > >>> > > > > > > >>> "right
> > >> > > > > > >>> > > > > > > >>>>>>>>>> thing" happens automatically.
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> I still don't totally follow why
> the
> > >> individual
> > >> > > > > > >>> components
> > >> > > > > > >>> > > > > > > >>> (name,
> > >> > > > > > >>> > > > > > > >>>>>>>>> salary) would have to have serdes
> > >> here. If
> > >> > > > > Result
> > >> > > > > > >>> has one, we
> > >> > > > > > >>> > > > > > > >>>>>>>>> compare bytes, and if Result
> > >> additionally has
> > >> > > > > an
> > >> > > > > > >>> equals()
> > >> > > > > > >>> > > > > > method
> > >> > > > > > >>> > > > > > > >>>>>>>>> (which presumably includes equals
> > >> comparisons
> > >> > > > > on
> > >> > > > > > >>> the
> > >> > > > > > >>> > > > > > constituent
> > >> > > > > > >>> > > > > > > >>>>>>>>> fields), have we not covered our
> > >> bases?
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> This dovetails in with my
> primary UX
> > >> concern;
> > >> > > > > > >>> where would the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> ChangeDetector actually be
> > >> registered? None of
> > >> > > > > > >>> the operators
> > >> > > > > > >>> > > > > > in
> > >> > > > > > >>> > > > > > > >>>>>>>>>> my example have names or topics
> or
> > >> any other
> > >> > > > > > >>> identifiable
> > >> > > > > > >>> > > > > > > >>>>>>>>>> characteristic that could be
> passed
> > >> to a
> > >> > > > > > >>> ChangeDetector class
> > >> > > > > > >>> > > > > > > >>>>>>>>>> registered via config. You could
> say
> > >> that we
> > >> > > > > make
> > >> > > > > > >>> > > > > > > >>> ChangeDetector
> > >> > > > > > >>> > > > > > > >>>>>>>>>> an optional parameter to every
> > >> operation in
> > >> > > > > > >>> Streams, but this
> > >> > > > > > >>> > > > > > > >>>>>>>>>> seems to carry quite a bit of
> mental
> > >> burden
> > >> > > > > with
> > >> > > > > > >>> it. People
> > >> > > > > > >>> > > > > > > >>> will
> > >> > > > > > >>> > > > > > > >>>>>>>>>> wonder what it's for and whether
> or
> > >> not they
> > >> > > > > > >>> should be using
> > >> > > > > > >>> > > > > > > >>> it.
> > >> > > > > > >>> > > > > > > >>>>>>>>>> There would almost certainly be a
> > >> > > > > misconception
> > >> > > > > > >>> that it's
> > >> > > > > > >>> > > > > > > >>>>>>>>>> preferable to implement it
> always,
> > >> which
> > >> > > > > would be
> > >> > > > > > >>> unfortunate.
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Plus, to actually implment
> metadata
> > >> flowing
> > >> > > > > > >>> through the
> > >> > > > > > >>> > > > > > > >>> topology
> > >> > > > > > >>> > > > > > > >>>>>>>>>> as in your use case, you'd have
> to
> > >> do two
> > >> > > > > things:
> > >> > > > > > >>> 1. make sure
> > >> > > > > > >>> > > > > > > >>>>>>>>>> that all operations actually
> > >> preserve the
> > >> > > > > > >>> metadata alongside
> > >> > > > > > >>> > > > > > > >>> the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> data (e.g., don't accidentally
> add a
> > >> mapValues
> > >> > > > > > >>> like I did, or
> > >> > > > > > >>> > > > > > > >>> you
> > >> > > > > > >>> > > > > > > >>>>>>>>>> drop the metadata). 2. implement
> a
> > >> > > > > ChangeDetector
> > >> > > > > > >>> for every
> > >> > > > > > >>> > > > > > > >>>>>>>>>> single operation in the
> topology, or
> > >> you don't
> > >> > > > > > >>> get the benefit
> > >> > > > > > >>> > > > > > > >>> of
> > >> > > > > > >>> > > > > > > >>>>>>>>>> dropping non-changes internally
> 2b.
> > >> > > > > > >>> Alternatively, you could
> > >> > > > > > >>> > > > > > > >>> just
> > >> > > > > > >>> > > > > > > >>>>>>>>>> add the ChangeDetector to one
> > >> operation toward
> > >> > > > > > >>> the end of the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> topology. This would not drop
> > >> redundant
> > >> > > > > > >>> computation
> > >> > > > > > >>> > > > > > internally,
> > >> > > > > > >>> > > > > > > >>>>>>>>>> but only drop redundant
> _outputs_.
> > >> But this is
> > >> > > > > > >>> just about the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> same as your current solution.
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> I definitely see your point
> regarding
> > >> > > > > > >>> configuration. I was
> > >> > > > > > >>> > > > > > > >>>>>>>>> originally thinking about this
> when
> > >> the
> > >> > > > > > >>> deduplication was going
> > >> > > > > > >>> > > > > > > >>> to
> > >> > > > > > >>> > > > > > > >>>>>>>>> be opt-in, and it seemed very
> natural
> > >> to say
> > >> > > > > > >>> something like:
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> employeeInfo.join(employeePayroll,
> > >> (info,
> > >> > > > > payroll)
> > >> > > > > > >>> -> new
> > >> > > > > > >>> > > > > > > >>>>>>>>> Result(info.name(),
> > >> payroll.salary()))
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> .suppress(duplicatesAccordingTo(someChangeDetector))
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> Alternatively you can imagine a
> > >> similar method
> > >> > > > > > >>> being on
> > >> > > > > > >>> > > > > > > >>>>>>>>> Materialized, though obviously
> this
> > >> makes less
> > >> > > > > > >>> sense if we
> > >> > > > > > >>> > > > > > don't
> > >> > > > > > >>> > > > > > > >>>>>>>>> want to require materialization.
> If
> > >> we're now
> > >> > > > > > >>> talking about
> > >> > > > > > >>> > > > > > > >>>>>>>>> changing the default behavior and
> not
> > >> having
> > >> > > > > any
> > >> > > > > > >>> configuration
> > >> > > > > > >>> > > > > > > >>>>>>>>> options, it's harder to find a
> place
> > >> for this.
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> A final thought; if it really is
> a
> > >> metadata
> > >> > > > > > >>> question, can we
> > >> > > > > > >>> > > > > > > >>> just
> > >> > > > > > >>> > > > > > > >>>>>>>>>> plan to finish up the support for
> > >> headers in
> > >> > > > > > >>> Streams? I.e.,
> > >> > > > > > >>> > > > > > > >>> give
> > >> > > > > > >>> > > > > > > >>>>>>>>>> you a way to control the way that
> > >> headers flow
> > >> > > > > > >>> through the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> topology? Then, we could treat
> > >> headers the
> > >> > > > > same
> > >> > > > > > >>> way we treat
> > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamps in the no-op
> checking...
> > >> We
> > >> > > > > completely
> > >> > > > > > >>> ignore them
> > >> > > > > > >>> > > > > > > >>>>>>>>>> for the sake of comparison. Thus,
> > >> neither the
> > >> > > > > > >>> timestamp nor
> > >> > > > > > >>> > > > > > the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> headers would get updated in
> > >> internal state
> > >> > > > > or in
> > >> > > > > > >>> downstream
> > >> > > > > > >>> > > > > > > >>>>>>>>>> views as long as the value itself
> > >> doesn't
> > >> > > > > change.
> > >> > > > > > >>> This seems
> > >> > > > > > >>> > > > > > to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> give us a way to support your use
> > >> case without
> > >> > > > > > >>> adding to the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> mental overhead of using Streams
> for
> > >> simple
> > >> > > > > > >>> things.
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> Agree headers could be a decent
> fit
> > >> for this
> > >> > > > > > >>> particular case
> > >> > > > > > >>> > > > > > > >>>>>>>>> because it's mostly metadata,
> though
> > >> to be
> > >> > > > > honest
> > >> > > > > > >>> we haven't
> > >> > > > > > >>> > > > > > > >>> looked
> > >> > > > > > >>> > > > > > > >>>>>>>>> at headers much (mostly because,
> and
> > >> to your
> > >> > > > > > >>> point, support
> > >> > > > > > >>> > > > > > > >>> seems
> > >> > > > > > >>> > > > > > > >>>>>>>>> to be lacking). I feel like there
> > >> would be
> > >> > > > > other
> > >> > > > > > >>> cases where
> > >> > > > > > >>> > > > > > > >>> this
> > >> > > > > > >>> > > > > > > >>>>>>>>> feature could be valuable, but I
> > >> admit I can't
> > >> > > > > > >>> come up with
> > >> > > > > > >>> > > > > > > >>>>>>>>> anything right this second.
> Perhaps
> > >> yuzhihong
> > >> > > > > had
> > >> > > > > > >>> an example in
> > >> > > > > > >>> > > > > > > >>>>>>>>> mind?
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., simple things should be
> easy,
> > >> and
> > >> > > > > complex
> > >> > > > > > >>> things should
> > >> > > > > > >>> > > > > > > >>> be
> > >> > > > > > >>> > > > > > > >>>>>>>>>> possible.
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> What are your thoughts? Thanks,
> -John
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19,
> > >> Thomas Becker
> > >> > > > > > >>> wrote:
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi John, Can you describe how
> you'd
> > >> use
> > >> > > > > > >>> filtering/mapping to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate records? To give some
> > >> background
> > >> > > > > on
> > >> > > > > > >>> my suggestion
> > >> > > > > > >>> > > > > > > >>> we
> > >> > > > > > >>> > > > > > > >>>>>>>>>> currently have a small stream
> > >> processor that
> > >> > > > > > >>> exists solely to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate, which we do using a
> > >> process that
> > >> > > > > I
> > >> > > > > > >>> assume would
> > >> > > > > > >>> > > > > > be
> > >> > > > > > >>> > > > > > > >>>>>>>>>> similar to what would be done
> here
> > >> (with a
> > >> > > > > store
> > >> > > > > > >>> of keys and
> > >> > > > > > >>> > > > > > > >>> hash
> > >> > > > > > >>> > > > > > > >>>>>>>>>> values). But the records we are
> > >> deduplicating
> > >> > > > > > >>> have some
> > >> > > > > > >>> > > > > > > >>> metadata
> > >> > > > > > >>> > > > > > > >>>>>>>>>> fields (such as timestamps of
> when
> > >> the record
> > >> > > > > was
> > >> > > > > > >>> posted) that
> > >> > > > > > >>> > > > > > > >>> we
> > >> > > > > > >>> > > > > > > >>>>>>>>>> don't consider semantically
> > >> meaningful for
> > >> > > > > > >>> downstream
> > >> > > > > > >>> > > > > > > >>> consumers,
> > >> > > > > > >>> > > > > > > >>>>>>>>>> and therefore we also suppress
> > >> updates that
> > >> > > > > only
> > >> > > > > > >>> touch those
> > >> > > > > > >>> > > > > > > >>>>>>>>>> fields.
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> -Tommy
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30
> -0600,
> > >> John
> > >> > > > > Roesler
> > >> > > > > > >>> wrote:
> > >> > > > > > >>> > > > > > > >>> [EXTERNAL
> > >> > > > > > >>> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was
> > >> sent from
> > >> > > > > > >>> outside TiVo. DO
> > >> > > > > > >>> > > > > > NOT
> > >> > > > > > >>> > > > > > > >>>>>>>>>> CLICK any links or attachments
> > >> unless you
> > >> > > > > > >>> expected them.
> > >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s
> an
> > >> interesting
> > >> > > > > > >>> idea. Can you
> > >> > > > > > >>> > > > > > > >>> help
> > >> > > > > > >>> > > > > > > >>>>>>>>>> think of a use case that isn’t
> also
> > >> served by
> > >> > > > > > >>> filtering or
> > >> > > > > > >>> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for
> > >> helping to
> > >> > > > > design
> > >> > > > > > >>> this feature!
> > >> > > > > > >>> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at
> 18:56,
> > >> > > > > > >>> yuzhih...@gmail.com
> > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com>
> wrote:
> > >> I think
> > >> > > > > this
> > >> > > > > > >>> is good
> > >> > > > > > >>> > > > > > > >>> idea. On
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas
> > >> Becker <
> > >> > > > > > >>> > > > > > > >>> thomas.bec...@tivo.com
> > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com>>
> > >> wrote: How
> > >> > > > > do
> > >> > > > > > >>> folks feel
> > >> > > > > > >>> > > > > > > >>> about
> > >> > > > > > >>> > > > > > > >>>>>>>>>> allowing the mechanism by which
> > >> no-ops are
> > >> > > > > > >>> detected to be
> > >> > > > > > >>> > > > > > > >>>>>>>>>> pluggable? Meaning use something
> > >> like a hash
> > >> > > > > by
> > >> > > > > > >>> default, but
> > >> > > > > > >>> > > > > > > >>> you
> > >> > > > > > >>> > > > > > > >>>>>>>>>> could optionally provide an
> > >> implementation of
> > >> > > > > > >>> something to use
> > >> > > > > > >>> > > > > > > >>>>>>>>>> instead, like a ChangeDetector.
> This
> > >> could be
> > >> > > > > > >>> useful for
> > >> > > > > > >>> > > > > > > >>> example
> > >> > > > > > >>> > > > > > > >>>>>>>>>> to ignore changes to certain
> fields,
> > >> which may
> > >> > > > > > >>> not be relevant
> > >> > > > > > >>> > > > > > > >>> to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the operation being performed.
> > >> > > > > > >>> > > > > > ________________________________
> > >> > > > > > >>> > > > > > > >>>>>>>>>> From: John Roesler <
> > >> vvcep...@apache.org
> > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>>
> Sent:
> > >> Friday,
> > >> > > > > > >>> January 31, 2020
> > >> > > > > > >>> > > > > > > >>> 4:51
> > >> > > > > > >>> > > > > > > >>>>>>>>>> PM To: dev@kafka.apache.org
> <mailto:
> > >> > > > > > >>> dev@kafka.apache.org>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto:
> > >> > > > > > >>> dev@kafka.apache.org>> Subject:
> > >> > > > > > >>> > > > > > > >>> Re:
> > >> > > > > > >>> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change
> > >> support for
> > >> > > > > Kafka
> > >> > > > > > >>> Streams
> > >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This
> > >> email was
> > >> > > > > sent
> > >> > > > > > >>> from outside
> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or
> > >> attachments
> > >> > > > > > >>> unless you
> > >> > > > > > >>> > > > > > expected
> > >> > > > > > >>> > > > > > > >>>>>>>>>> them.
> > >> ________________________________
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hello all, Sorry for my silence.
> It
> > >> seems
> > >> > > > > like we
> > >> > > > > > >>> are getting
> > >> > > > > > >>> > > > > > > >>>>>>>>>> close to consensus. Hopefully, we
> > >> could move
> > >> > > > > to a
> > >> > > > > > >>> vote soon!
> > >> > > > > > >>> > > > > > > >>> All
> > >> > > > > > >>> > > > > > > >>>>>>>>>> of the reasoning from Matthias
> and
> > >> Bruno
> > >> > > > > around
> > >> > > > > > >>> timestamp is
> > >> > > > > > >>> > > > > > > >>>>>>>>>> compelling. I would be strongly
> in
> > >> favor of
> > >> > > > > > >>> stating a few
> > >> > > > > > >>> > > > > > > >>> things
> > >> > > > > > >>> > > > > > > >>>>>>>>>> very clearly in the KIP: 1.
> Streams
> > >> will drop
> > >> > > > > > >>> no-op updates
> > >> > > > > > >>> > > > > > > >>> only
> > >> > > > > > >>> > > > > > > >>>>>>>>>> for KTable operations. That is,
> we
> > >> won't make
> > >> > > > > any
> > >> > > > > > >>> changes to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> KStream aggregations at the
> moment.
> > >> It does
> > >> > > > > seem
> > >> > > > > > >>> like we can
> > >> > > > > > >>> > > > > > > >>>>>>>>>> potentially revisit the time
> > >> semantics of that
> > >> > > > > > >>> operation in
> > >> > > > > > >>> > > > > > the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> future, but we don't need to do
> it
> > >> now. On the
> > >> > > > > > >>> other hand, the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> proposed semantics for KTable
> > >> timestamps
> > >> > > > > (marking
> > >> > > > > > >>> the
> > >> > > > > > >>> > > > > > beginning
> > >> > > > > > >>> > > > > > > >>>>>>>>>> of the validity of that record)
> > >> makes sense to
> > >> > > > > > >>> me. 2. Streams
> > >> > > > > > >>> > > > > > > >>>>>>>>>> will only drop no-op updates for
> > >> _stateful_
> > >> > > > > > >>> KTable operations.
> > >> > > > > > >>> > > > > > > >>> We
> > >> > > > > > >>> > > > > > > >>>>>>>>>> don't want to add a hard
> guarantee
> > >> that
> > >> > > > > Streams
> > >> > > > > > >>> will _never_
> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit a no-op table update
> because it
> > >> would
> > >> > > > > > >>> require adding
> > >> > > > > > >>> > > > > > state
> > >> > > > > > >>> > > > > > > >>>>>>>>>> to otherwise stateless
> operations.
> > >> If someone
> > >> > > > > is
> > >> > > > > > >>> really
> > >> > > > > > >>> > > > > > > >>> concerned
> > >> > > > > > >>> > > > > > > >>>>>>>>>> about a particular stateless
> > >> operation
> > >> > > > > producing
> > >> > > > > > >>> a lot of
> > >> > > > > > >>> > > > > > no-op
> > >> > > > > > >>> > > > > > > >>>>>>>>>> results, all they have to do is
> > >> materialize
> > >> > > > > it,
> > >> > > > > > >>> and Streams
> > >> > > > > > >>> > > > > > > >>> would
> > >> > > > > > >>> > > > > > > >>>>>>>>>> automatically drop the no-ops.
> > >> Additionally,
> > >> > > > > I'm
> > >> > > > > > >>> +1 on not
> > >> > > > > > >>> > > > > > > >>> adding
> > >> > > > > > >>> > > > > > > >>>>>>>>>> an opt-out at this time.
> Regarding
> > >> the KIP
> > >> > > > > > >>> itself, I would
> > >> > > > > > >>> > > > > > > >>> clean
> > >> > > > > > >>> > > > > > > >>>>>>>>>> it up a bit before calling for a
> > >> vote. There
> > >> > > > > is a
> > >> > > > > > >>> lot of
> > >> > > > > > >>> > > > > > > >>>>>>>>>> "discussion"-type language there,
> > >> which is
> > >> > > > > very
> > >> > > > > > >>> natural to
> > >> > > > > > >>> > > > > > > >>> read,
> > >> > > > > > >>> > > > > > > >>>>>>>>>> but makes it a bit hard to see
> what
> > >> _exactly_
> > >> > > > > the
> > >> > > > > > >>> kip is
> > >> > > > > > >>> > > > > > > >>>>>>>>>> proposing. Richard, would you
> mind
> > >> just making
> > >> > > > > > >>> the "proposed
> > >> > > > > > >>> > > > > > > >>>>>>>>>> behavior change" a simple and
> > >> succinct list of
> > >> > > > > > >>> bullet points?
> > >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., please drop glue phrases
> like
> > >> "there has
> > >> > > > > > >>> been some
> > >> > > > > > >>> > > > > > > >>>>>>>>>> discussion" or "possibly we
> could do
> > >> X". For
> > >> > > > > the
> > >> > > > > > >>> final version
> > >> > > > > > >>> > > > > > > >>> of
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the KIP, it should just say,
> > >> "Streams will do
> > >> > > > > X,
> > >> > > > > > >>> Streams will
> > >> > > > > > >>> > > > > > > >>> do
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Y". Feel free to add an
> elaboration
> > >> section to
> > >> > > > > > >>> explain more
> > >> > > > > > >>> > > > > > > >>> about
> > >> > > > > > >>> > > > > > > >>>>>>>>>> what X and Y mean, but we don't
> need
> > >> to talk
> > >> > > > > about
> > >> > > > > > >>> > > > > > > >>> possibilities
> > >> > > > > > >>> > > > > > > >>>>>>>>>> or alternatives except in the
> > >> "rejected
> > >> > > > > > >>> alternatives" section.
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Accordingly, can you also move
> the
> > >> options you
> > >> > > > > > >>> presented in
> > >> > > > > > >>> > > > > > the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> intro to the "rejected
> alternatives"
> > >> section
> > >> > > > > and
> > >> > > > > > >>> only mention
> > >> > > > > > >>> > > > > > > >>> the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> final proposal itself? This just
> > >> really helps
> > >> > > > > > >>> reviewers to
> > >> > > > > > >>> > > > > > know
> > >> > > > > > >>> > > > > > > >>>>>>>>>> what they are voting for, and it
> > >> helps
> > >> > > > > everyone
> > >> > > > > > >>> after the fact
> > >> > > > > > >>> > > > > > > >>>>>>>>>> when they are trying to get
> clarity
> > >> on what
> > >> > > > > > >>> exactly the
> > >> > > > > > >>> > > > > > > >>> proposal
> > >> > > > > > >>> > > > > > > >>>>>>>>>> is, versus all the things it
> could
> > >> have been.
> > >> > > > > > >>> Thanks, -John
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14,
> > >> Richard Yu
> > >> > > > > wrote:
> > >> > > > > > >>> Hello to
> > >> > > > > > >>> > > > > > all,
> > >> > > > > > >>> > > > > > > >>>>>>>>>> I've finished making some initial
> > >> > > > > modifications
> > >> > > > > > >>> to the KIP. I
> > >> > > > > > >>> > > > > > > >>>>>>>>>> have decided to keep the
> > >> implementation
> > >> > > > > section
> > >> > > > > > >>> in the KIP for
> > >> > > > > > >>> > > > > > > >>>>>>>>>> record-keeping purposes. For
> now, we
> > >> should
> > >> > > > > focus
> > >> > > > > > >>> on only the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> proposed behavior changes
> instead.
> > >> See if you
> > >> > > > > > >>> have any
> > >> > > > > > >>> > > > > > > >>> comments!
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25,
> 2020
> > >> at 11:12
> > >> > > > > AM
> > >> > > > > > >>> Richard Yu
> > >> > > > > > >>> > > > > > > >>>>>>>>>> <yohan.richard...@gmail.com
> <mailto:
> > >> > > > > > >>> > > > > > yohan.richard...@gmail.com
> > >> > > > > > >>> > > > > > > >>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all the
> > >> discussion!
> > >> > > > > > >>> @John and @Bruno
> > >> > > > > > >>> > > > > > > >>> I
> > >> > > > > > >>> > > > > > > >>>>>>>>>> will survey other possible
> systems
> > >> and see
> > >> > > > > what I
> > >> > > > > > >>> can do. Just
> > >> > > > > > >>> > > > > > > >>> a
> > >> > > > > > >>> > > > > > > >>>>>>>>>> question, by systems, I suppose
> you
> > >> would mean
> > >> > > > > > >>> the pros and
> > >> > > > > > >>> > > > > > > >>> cons
> > >> > > > > > >>> > > > > > > >>>>>>>>>> of different reporting
> strategies?
> > >> I'm not
> > >> > > > > > >>> completely certain
> > >> > > > > > >>> > > > > > > >>> on
> > >> > > > > > >>> > > > > > > >>>>>>>>>> this point, so it would be great
> if
> > >> you can
> > >> > > > > > >>> clarify on that.
> > >> > > > > > >>> > > > > > So
> > >> > > > > > >>> > > > > > > >>>>>>>>>> here's what I got from all the
> > >> discussion so
> > >> > > > > far:
> > >> > > > > > >>> - Since both
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias and John seems to have
> come
> > >> to a
> > >> > > > > > >>> consensus on this,
> > >> > > > > > >>> > > > > > > >>> then
> > >> > > > > > >>> > > > > > > >>>>>>>>>> we will go for an all-round
> > >> behavorial change
> > >> > > > > for
> > >> > > > > > >>> KTables.
> > >> > > > > > >>> > > > > > > >>> After
> > >> > > > > > >>> > > > > > > >>>>>>>>>> some thought, I decided that for
> > >> now, an
> > >> > > > > opt-out
> > >> > > > > > >>> config will
> > >> > > > > > >>> > > > > > > >>> not
> > >> > > > > > >>> > > > > > > >>>>>>>>>> be added. As John have pointed
> out,
> > >> no-op
> > >> > > > > changes
> > >> > > > > > >>> tend to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> explode further down the
> topology as
> > >> they are
> > >> > > > > > >>> forwarded to
> > >> > > > > > >>> > > > > > more
> > >> > > > > > >>> > > > > > > >>>>>>>>>> and more processor nodes
> downstream.
> > >> - About
> > >> > > > > > >>> using hash codes,
> > >> > > > > > >>> > > > > > > >>>>>>>>>> after some explanation from
> John, it
> > >> looks
> > >> > > > > like
> > >> > > > > > >>> hash codes
> > >> > > > > > >>> > > > > > > >>> might
> > >> > > > > > >>> > > > > > > >>>>>>>>>> not be as ideal (for
> > >> implementation). For
> > >> > > > > now, we
> > >> > > > > > >>> will omit
> > >> > > > > > >>> > > > > > > >>> that
> > >> > > > > > >>> > > > > > > >>>>>>>>>> detail, and save it for the PR. -
> > >> @Bruno You
> > >> > > > > do
> > >> > > > > > >>> have valid
> > >> > > > > > >>> > > > > > > >>>>>>>>>> concerns. Though, I am not
> > >> completely certain
> > >> > > > > if
> > >> > > > > > >>> we want to do
> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change only for
> materialized
> > >> KTables.
> > >> > > > > I
> > >> > > > > > >>> will put it
> > >> > > > > > >>> > > > > > > >>> down
> > >> > > > > > >>> > > > > > > >>>>>>>>>> in the KIP regardless. I will do
> my
> > >> best to
> > >> > > > > > >>> address all points
> > >> > > > > > >>> > > > > > > >>>>>>>>>> raised so far on the discussion.
> > >> Hope we could
> > >> > > > > > >>> keep this
> > >> > > > > > >>> > > > > > going!
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24,
> 2020
> > >> at 6:07 PM
> > >> > > > > > >>> Bruno Cadonna
> > >> > > > > > >>> > > > > > > >>>>>>>>>> <br...@confluent.io <mailto:
> > >> > > > > br...@confluent.io>>
> > >> > > > > > >>> wrote: Thank
> > >> > > > > > >>> > > > > > > >>> you
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias for the use cases!
> Looking
> > >> at both
> > >> > > > > use
> > >> > > > > > >>> cases, I think
> > >> > > > > > >>> > > > > > > >>>>>>>>>> you need to elaborate on them in
> the
> > >> KIP,
> > >> > > > > > >>> Richard. Emit from
> > >> > > > > > >>> > > > > > > >>>>>>>>>> plain KTable: I agree with
> Matthias
> > >> that the
> > >> > > > > > >>> lower timestamp
> > >> > > > > > >>> > > > > > > >>>>>>>>>> makes sense because it marks the
> > >> start of the
> > >> > > > > > >>> validity of the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> record. Idempotent records with a
> > >> higher
> > >> > > > > > >>> timestamp can be
> > >> > > > > > >>> > > > > > > >>> safely
> > >> > > > > > >>> > > > > > > >>>>>>>>>> ignored. A corner case that I
> > >> discussed with
> > >> > > > > > >>> Matthias offline
> > >> > > > > > >>> > > > > > > >>> is
> > >> > > > > > >>> > > > > > > >>>>>>>>>> when we do not materialize a
> KTable
> > >> due to
> > >> > > > > > >>> optimization. Then
> > >> > > > > > >>> > > > > > > >>> we
> > >> > > > > > >>> > > > > > > >>>>>>>>>> cannot avoid the idempotent
> records
> > >> because
> > >> > > > > we do
> > >> > > > > > >>> not keep the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> first record with the lower
> > >> timestamp to
> > >> > > > > compare
> > >> > > > > > >>> to. Emit from
> > >> > > > > > >>> > > > > > > >>>>>>>>>> KTable with aggregations: If we
> > >> specify that
> > >> > > > > an
> > >> > > > > > >>> aggregation
> > >> > > > > > >>> > > > > > > >>>>>>>>>> result should have the highest
> > >> timestamp of
> > >> > > > > the
> > >> > > > > > >>> records that
> > >> > > > > > >>> > > > > > > >>>>>>>>>> participated in the aggregation,
> we
> > >> cannot
> > >> > > > > ignore
> > >> > > > > > >>> any
> > >> > > > > > >>> > > > > > > >>> idempotent
> > >> > > > > > >>> > > > > > > >>>>>>>>>> records. Admittedly, the result
> of an
> > >> > > > > aggregation
> > >> > > > > > >>> usually
> > >> > > > > > >>> > > > > > > >>>>>>>>>> changes, but there are
> aggregations
> > >> where the
> > >> > > > > > >>> result may not
> > >> > > > > > >>> > > > > > > >>>>>>>>>> change like min and max, or sum
> when
> > >> the
> > >> > > > > incoming
> > >> > > > > > >>> records have
> > >> > > > > > >>> > > > > > > >>> a
> > >> > > > > > >>> > > > > > > >>>>>>>>>> value of zero. In those cases, we
> > >> could
> > >> > > > > benefit
> > >> > > > > > >>> of the emit on
> > >> > > > > > >>> > > > > > > >>>>>>>>>> change, but only if we define the
> > >> semantics
> > >> > > > > of the
> > >> > > > > > >>> > > > > > aggregations
> > >> > > > > > >>> > > > > > > >>>>>>>>>> to not use the highest timestamp
> of
> > >> the
> > >> > > > > > >>> participating records
> > >> > > > > > >>> > > > > > > >>> for
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the result. In Kafka Streams, we
> do
> > >> not have
> > >> > > > > min,
> > >> > > > > > >>> max, and sum
> > >> > > > > > >>> > > > > > > >>> as
> > >> > > > > > >>> > > > > > > >>>>>>>>>> explicit aggregations, but we
> need
> > >> to provide
> > >> > > > > an
> > >> > > > > > >>> API to define
> > >> > > > > > >>> > > > > > > >>>>>>>>>> what timestamp should be used for
> > >> the result
> > >> > > > > of
> > >> > > > > > >>> an aggregation
> > >> > > > > > >>> > > > > > > >>> if
> > >> > > > > > >>> > > > > > > >>>>>>>>>> we want to go down this path.
> All of
> > >> this does
> > >> > > > > > >>> not block this
> > >> > > > > > >>> > > > > > > >>> KIP
> > >> > > > > > >>> > > > > > > >>>>>>>>>> and I just wanted to put this
> > >> aspects up for
> > >> > > > > > >>> discussion. The
> > >> > > > > > >>> > > > > > > >>> KIP
> > >> > > > > > >>> > > > > > > >>>>>>>>>> can limit itself to emit from
> > >> materialized
> > >> > > > > > >>> KTables. However,
> > >> > > > > > >>> > > > > > > >>> the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> limits should be explicitly
> stated
> > >> in the KIP.
> > >> > > > > > >>> Best, Bruno
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM
> > >> Matthias J.
> > >> > > > > Sax
> > >> > > > > > >>> > > > > > > >>>>>>>>>> <matth...@confluent.io <mailto:
> > >> > > > > > >>> matth...@confluent.io>> wrote:
> > >> > > > > > >>> > > > > > > >>>>>>>>>> IMHO, the question about
> semantics
> > >> depends on
> > >> > > > > the
> > >> > > > > > >>> use case, in
> > >> > > > > > >>> > > > > > > >>>>>>>>>> particular on the origin of a
> > >> KTable. If
> > >> > > > > there is
> > >> > > > > > >>> a changlog
> > >> > > > > > >>> > > > > > > >>>>>>>>>> topic that one reads directly
> into a
> > >> KTable,
> > >> > > > > > >>> emit-on-change
> > >> > > > > > >>> > > > > > > >>> does
> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually make sense, because the
> > >> timestamp
> > >> > > > > > >>> indicates _when_
> > >> > > > > > >>> > > > > > the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> update was _effective_. For this
> > >> case, it is
> > >> > > > > > >>> semantically
> > >> > > > > > >>> > > > > > sound
> > >> > > > > > >>> > > > > > > >>>>>>>>>> to _not_ update the timestamp in
> the
> > >> store,
> > >> > > > > > >>> because the second
> > >> > > > > > >>> > > > > > > >>>>>>>>>> update is actually idempotent and
> > >> advancing
> > >> > > > > the
> > >> > > > > > >>> timestamp is
> > >> > > > > > >>> > > > > > > >>> not
> > >> > > > > > >>> > > > > > > >>>>>>>>>> ideal (one could even consider
> it to
> > >> be wrong
> > >> > > > > to
> > >> > > > > > >>> advance the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp) because the "valid
> time"
> > >> of the
> > >> > > > > record
> > >> > > > > > >>> pair did not
> > >> > > > > > >>> > > > > > > >>>>>>>>>> change. This reasoning also
> applies
> > >> to
> > >> > > > > > >>> KTable-KTable joins.
> > >> > > > > > >>> > > > > > > >>>>>>>>>> However, if the KTable is the
> result
> > >> of an
> > >> > > > > > >>> aggregation, I
> > >> > > > > > >>> > > > > > think
> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-update is more natural,
> > >> because the
> > >> > > > > > >>> timestamp reflects
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the _last_ time (ie, highest
> > >> timestamp) of all
> > >> > > > > > >>> input records
> > >> > > > > > >>> > > > > > > >>> the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> contributed to the result. Hence,
> > >> updating the
> > >> > > > > > >>> timestamp and
> > >> > > > > > >>> > > > > > > >>>>>>>>>> emitting a new record actually
> > >> sounds correct
> > >> > > > > to
> > >> > > > > > >>> me. This
> > >> > > > > > >>> > > > > > > >>> applies
> > >> > > > > > >>> > > > > > > >>>>>>>>>> to windowed and non-windowed
> > >> aggregations
> > >> > > > > IMHO.
> > >> > > > > > >>> However,
> > >> > > > > > >>> > > > > > > >>>>>>>>>> considering the argument that the
> > >> timestamp
> > >> > > > > > >>> should not be
> > >> > > > > > >>> > > > > > > >>> update
> > >> > > > > > >>> > > > > > > >>>>>>>>>> in the first case in the store to
> > >> begin with,
> > >> > > > > > >>> both cases are
> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually the same, and both can
> be
> > >> modeled as
> > >> > > > > > >>> emit-on-change:
> > >> > > > > > >>> > > > > > > >>> if
> > >> > > > > > >>> > > > > > > >>>>>>>>>> a `table()` operator does not
> update
> > >> the
> > >> > > > > > >>> timestamp if the
> > >> > > > > > >>> > > > > > value
> > >> > > > > > >>> > > > > > > >>>>>>>>>> does not change, there is _no_
> > >> change and thus
> > >> > > > > > >>> nothing is
> > >> > > > > > >>> > > > > > > >>>>>>>>>> emitted. At the same time, if an
> > >> aggregation
> > >> > > > > > >>> operator does
> > >> > > > > > >>> > > > > > > >>> update
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the timestamp (even if the value
> > >> does not
> > >> > > > > change)
> > >> > > > > > >>> there _is_ a
> > >> > > > > > >>> > > > > > > >>>>>>>>>> change and we emit. Note that
> > >> handling
> > >> > > > > > >>> out-of-order data for
> > >> > > > > > >>> > > > > > > >>>>>>>>>> aggregations would also work
> > >> seamlessly with
> > >> > > > > this
> > >> > > > > > >>> approach --
> > >> > > > > > >>> > > > > > > >>> for
> > >> > > > > > >>> > > > > > > >>>>>>>>>> out-of-order records, the
> timestamp
> > >> does never
> > >> > > > > > >>> change, and
> > >> > > > > > >>> > > > > > > >>> thus,
> > >> > > > > > >>> > > > > > > >>>>>>>>>> we only emit if the result itself
> > >> changes.
> > >> > > > > > >>> Therefore, I would
> > >> > > > > > >>> > > > > > > >>>>>>>>>> argue that we might not even need
> > >> any config,
> > >> > > > > > >>> because the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change behavior is just
> > >> correct and
> > >> > > > > > >>> reduced the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> downstream load, while our
> current
> > >> behavior is
> > >> > > > > > >>> not ideal (even
> > >> > > > > > >>> > > > > > > >>> if
> > >> > > > > > >>> > > > > > > >>>>>>>>>> it's also correct). Thoughts?
> > >> -Matthias On
> > >> > > > > > >>> 1/24/20 9:37 AM,
> > >> > > > > > >>> > > > > > > >>> John
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks
> for
> > >> that
> > >> > > > > idea. I
> > >> > > > > > >>> hadn't
> > >> > > > > > >>> > > > > > > >>>>>>>>>> considered that option before,
> and
> > >> it does
> > >> > > > > seem
> > >> > > > > > >>> like that
> > >> > > > > > >>> > > > > > would
> > >> > > > > > >>> > > > > > > >>>>>>>>>> be the right place to put it if
> we
> > >> think it
> > >> > > > > might
> > >> > > > > > >>> be
> > >> > > > > > >>> > > > > > > >>> semantically
> > >> > > > > > >>> > > > > > > >>>>>>>>>> important to control on a
> > >> table-by-table
> > >> > > > > basis. I
> > >> > > > > > >>> had been
> > >> > > > > > >>> > > > > > > >>>>>>>>>> thinking of it less semantically
> and
> > >> more
> > >> > > > > > >>> practically. In the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> context of a large topology, or
> more
> > >> > > > > generally, a
> > >> > > > > > >>> large
> > >> > > > > > >>> > > > > > > >>> software
> > >> > > > > > >>> > > > > > > >>>>>>>>>> system that contains many
> topologies
> > >> and other
> > >> > > > > > >>> event-driven
> > >> > > > > > >>> > > > > > > >>>>>>>>>> systems, each no-op result
> becomes
> > >> an input
> > >> > > > > that
> > >> > > > > > >>> is destined
> > >> > > > > > >>> > > > > > to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> itself become a no-op result,
> and so
> > >> on, all
> > >> > > > > the
> > >> > > > > > >>> way through
> > >> > > > > > >>> > > > > > > >>> the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> system. Thus, a single pointless
> > >> processing
> > >> > > > > > >>> result becomes
> > >> > > > > > >>> > > > > > > >>>>>>>>>> amplified into a large number of
> > >> pointless
> > >> > > > > > >>> computations, cache
> > >> > > > > > >>> > > > > > > >>>>>>>>>> perturbations, and network and
> disk
> > >> I/O
> > >> > > > > > >>> operations. If you
> > >> > > > > > >>> > > > > > also
> > >> > > > > > >>> > > > > > > >>>>>>>>>> consider operations with fan-out
> > >> implications,
> > >> > > > > > >>> like branching
> > >> > > > > > >>> > > > > > > >>> or
> > >> > > > > > >>> > > > > > > >>>>>>>>>> foreign-key joins, the wasted
> > >> resources are
> > >> > > > > > >>> amplified not just
> > >> > > > > > >>> > > > > > > >>> in
> > >> > > > > > >>> > > > > > > >>>>>>>>>> proportion to the size of the
> > >> system, but the
> > >> > > > > > >>> size of the
> > >> > > > > > >>> > > > > > > >>> system
> > >> > > > > > >>> > > > > > > >>>>>>>>>> times the average fan-out (to the
> > >> power of the
> > >> > > > > > >>> number of
> > >> > > > > > >>> > > > > > > >>> fan-out
> > >> > > > > > >>> > > > > > > >>>>>>>>>> operations on the path(s)
> through the
> > >> > > > > system). In
> > >> > > > > > >>> my time
> > >> > > > > > >>> > > > > > > >>>>>>>>>> operating such systems, I've
> > >> observed these
> > >> > > > > > >>> effects to be very
> > >> > > > > > >>> > > > > > > >>>>>>>>>> real, and actually, the system
> and
> > >> use case
> > >> > > > > > >>> doesn't have to be
> > >> > > > > > >>> > > > > > > >>>>>>>>>> very large before the
> amplification
> > >> poses an
> > >> > > > > > >>> existential
> > >> > > > > > >>> > > > > > threat
> > >> > > > > > >>> > > > > > > >>>>>>>>>> to the system as a whole. This is
> > >> the basis
> > >> > > > > of my
> > >> > > > > > >>> advocating
> > >> > > > > > >>> > > > > > > >>> for
> > >> > > > > > >>> > > > > > > >>>>>>>>>> a simple behavior change, rather
> > >> than an
> > >> > > > > opt-in
> > >> > > > > > >>> config of any
> > >> > > > > > >>> > > > > > > >>>>>>>>>> kind. It seems like Streams
> should
> > >> "do the
> > >> > > > > right
> > >> > > > > > >>> thing" for
> > >> > > > > > >>> > > > > > the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case. My theory
> (which
> > >> may be
> > >> > > > > wrong)
> > >> > > > > > >>> is that the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case is more like
> > >> "relational
> > >> > > > > > >>> queries" than "CEP
> > >> > > > > > >>> > > > > > > >>>>>>>>>> queries". Even if you were doing
> some
> > >> > > > > > >>> event-sensitive
> > >> > > > > > >>> > > > > > > >>>>>>>>>> computation, wouldn't you do
> them as
> > >> Stream
> > >> > > > > > >>> operations (where
> > >> > > > > > >>> > > > > > > >>>>>>>>>> this feature is inapplicable
> > >> anyway)? In
> > >> > > > > keeping
> > >> > > > > > >>> with the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> "practical" perspective, I
> suggested
> > >> the
> > >> > > > > opt-out
> > >> > > > > > >>> config only
> > >> > > > > > >>> > > > > > in
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the (I think unlikely) event that
> > >> filtering
> > >> > > > > out
> > >> > > > > > >>> pointless
> > >> > > > > > >>> > > > > > > >>> updates
> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually harms performance. I'd
> also
> > >> be
> > >> > > > > perfectly
> > >> > > > > > >>> fine without
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the opt-out config. I really
> think
> > >> that
> > >> > > > > (because
> > >> > > > > > >>> of the
> > >> > > > > > >>> > > > > > > >>> timestamp
> > >> > > > > > >>> > > > > > > >>>>>>>>>> semantics work already underway),
> > >> we're
> > >> > > > > already
> > >> > > > > > >>> pre-fetching
> > >> > > > > > >>> > > > > > > >>> the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> prior result most of the time, so
> > >> there would
> > >> > > > > > >>> actually be very
> > >> > > > > > >>> > > > > > > >>>>>>>>>> little extra I/O involved in
> > >> implementing
> > >> > > > > > >>> emit-on-change.
> > >> > > > > > >>> > > > > > > >>>>>>>>>> However, we should consider
> whether
> > >> my
> > >> > > > > experience
> > >> > > > > > >>> is likely to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> be general. Do you have some use
> > >> case in mind
> > >> > > > > for
> > >> > > > > > >>> which you'd
> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually want some KTable
> results to
> > >> be
> > >> > > > > > >>> emit-on-update for
> > >> > > > > > >>> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02,
> > >> Bruno Cadonna
> > >> > > > > > >>> wrote: Hi
> > >> > > > > > >>> > > > > > > >>> Richard,
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree
> with
> > >> John that
> > >> > > > > we
> > >> > > > > > >>> should focus
> > >> > > > > > >>> > > > > > > >>> on
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the interface and behavior
> change in
> > >> a KIP. We
> > >> > > > > > >>> can discuss the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation later. I am also
> +1
> > >> for the
> > >> > > > > > >>> survey. I had a
> > >> > > > > > >>> > > > > > > >>>>>>>>>> thought about this. Couldn't we
> > >> consider
> > >> > > > > > >>> emit-on-change to be
> > >> > > > > > >>> > > > > > > >>> one
> > >> > > > > > >>> > > > > > > >>>>>>>>>> config of suppress (like
> > >> `untilWindowCloses`)?
> > >> > > > > > >>> What you
> > >> > > > > > >>> > > > > > > >>>>>>>>>> basically propose is to suppress
> > >> updates if
> > >> > > > > they
> > >> > > > > > >>> do not change
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the result. Considering emit on
> > >> change as a
> > >> > > > > > >>> flavour of
> > >> > > > > > >>> > > > > > suppress
> > >> > > > > > >>> > > > > > > >>>>>>>>>> would be more flexible because it
> > >> would
> > >> > > > > specify
> > >> > > > > > >>> the behavior
> > >> > > > > > >>> > > > > > > >>>>>>>>>> locally for a KTable instead of
> > >> globally for
> > >> > > > > all
> > >> > > > > > >>> KTables.
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Additionally, specifying the
> > >> behavior in one
> > >> > > > > > >>> place instead of
> > >> > > > > > >>> > > > > > > >>>>>>>>>> multiple places feels more
> intuitive
> > >> and
> > >> > > > > > >>> consistent to me.
> > >> > > > > > >>> > > > > > > >>> Best,
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at
> 7:49
> > >> AM John
> > >> > > > > Roesler
> > >> > > > > > >>> > > > > > > >>>>>>>>>> <vvcep...@apache.org <mailto:
> > >> > > > > vvcep...@apache.org>>
> > >> > > > > > >>> wrote: Hi
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Richard, Thanks for picking this
> up!
> > >> I know
> > >> > > > > of at
> > >> > > > > > >>> least one
> > >> > > > > > >>> > > > > > > >>> large
> > >> > > > > > >>> > > > > > > >>>>>>>>>> community member for which this
> > >> feature is
> > >> > > > > > >>> absolutely
> > >> > > > > > >>> > > > > > > >>> essential.
> > >> > > > > > >>> > > > > > > >>>>>>>>>> If I understand your two
> options, it
> > >> seems
> > >> > > > > like
> > >> > > > > > >>> the proposal
> > >> > > > > > >>> > > > > > is
> > >> > > > > > >>> > > > > > > >>>>>>>>>> to implement it as a behavior
> change
> > >> > > > > regardless,
> > >> > > > > > >>> and the
> > >> > > > > > >>> > > > > > > >>> question
> > >> > > > > > >>> > > > > > > >>>>>>>>>> is whether to provide an opt-out
> > >> config or
> > >> > > > > not.
> > >> > > > > > >>> Given that any
> > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation of this feature
> would
> > >> have some
> > >> > > > > > >>> performance
> > >> > > > > > >>> > > > > > > >>> impact
> > >> > > > > > >>> > > > > > > >>>>>>>>>> under some workloads, and also
> that
> > >> we don't
> > >> > > > > know
> > >> > > > > > >>> if anyone
> > >> > > > > > >>> > > > > > > >>>>>>>>>> really depends on emit-on-update
> time
> > >> > > > > semantics,
> > >> > > > > > >>> it seems like
> > >> > > > > > >>> > > > > > > >>> we
> > >> > > > > > >>> > > > > > > >>>>>>>>>> should propose to add an opt-out
> > >> config. Can
> > >> > > > > you
> > >> > > > > > >>> update the
> > >> > > > > > >>> > > > > > KIP
> > >> > > > > > >>> > > > > > > >>>>>>>>>> to mention the exact config key
> and
> > >> value(s)
> > >> > > > > > >>> you'd propose?
> > >> > > > > > >>> > > > > > > >>> Just
> > >> > > > > > >>> > > > > > > >>>>>>>>>> to move the discussion forward,
> maybe
> > >> > > > > something
> > >> > > > > > >>> like: emit.on
> > >> > > > > > >>> > > > > > > >>> :=
> > >> > > > > > >>> > > > > > > >>>>>>>>>> change|update with the new
> default
> > >> being
> > >> > > > > "change"
> > >> > > > > > >>> Thanks for
> > >> > > > > > >>> > > > > > > >>>>>>>>>> pointing out the timestamp issue
> in
> > >> > > > > particular. I
> > >> > > > > > >>> agree that
> > >> > > > > > >>> > > > > > if
> > >> > > > > > >>> > > > > > > >>>>>>>>>> we discard the latter update as a
> > >> no-op, then
> > >> > > > > we
> > >> > > > > > >>> also have to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> discard its timestamp
> (obviously, we
> > >> don't
> > >> > > > > > >>> forward the
> > >> > > > > > >>> > > > > > > >>> timestamp
> > >> > > > > > >>> > > > > > > >>>>>>>>>> update, as that's the whole
> point,
> > >> but we also
> > >> > > > > > >>> can't update
> > >> > > > > > >>> > > > > > the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp in the store, as the
> store
> > >> must
> > >> > > > > remain
> > >> > > > > > >>> consistent
> > >> > > > > > >>> > > > > > > >>> with
> > >> > > > > > >>> > > > > > > >>>>>>>>>> what has been emitted). I have to
> > >> confess
> > >> > > > > that I
> > >> > > > > > >>> disagree with
> > >> > > > > > >>> > > > > > > >>>>>>>>>> your implementation proposal, but
> > >> it's also
> > >> > > > > not
> > >> > > > > > >>> necessary to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> discuss implementation in the
> KIP.
> > >> Maybe it
> > >> > > > > would
> > >> > > > > > >>> be less
> > >> > > > > > >>> > > > > > > >>>>>>>>>> controversial if you just drop
> that
> > >> section
> > >> > > > > for
> > >> > > > > > >>> now, so that
> > >> > > > > > >>> > > > > > > >>> the
> > >> > > > > > >>> > > > > > > >>>>>>>>>> KIP discussion can focus on the
> > >> behavior
> > >> > > > > change
> > >> > > > > > >>> and config.
> > >> > > > > > >>> > > > > > > >>> Just
> > >> > > > > > >>> > > > > > > >>>>>>>>>> for reference, there is some
> > >> research into
> > >> > > > > this
> > >> > > > > > >>> domain. For
> > >> > > > > > >>> > > > > > > >>>>>>>>>> example, see the "Report" section
> > >> (3.2.3) of
> > >> > > > > the
> > >> > > > > > >>> SECRET paper:
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>>
> > >> > > > >
> > >> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop
> > >> > > > > > >>> > > > > > > > le.csail.mit.edu
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data
> > >> > > > > > >>> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com
> > >> > > > > > >>> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > > > >>>
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>>
> > >> > > > >
> > >>
> Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&amp;sdata
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>>
> > >> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&amp;reserved=0
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > > It might help to round out the proposal if
> > >> you take a
> > >> > > > > brief
> > >> > > > > > >>> > > > > > > >>> survey of
> > >> > > > > > >>> > > > > > > >>>>>>>>>> the behaviors of other systems,
> > >> along with
> > >> > > > > pros
> > >> > > > > > >>> and cons if
> > >> > > > > > >>> > > > > > any
> > >> > > > > > >>> > > > > > > >>>>>>>>>> are reported. Thanks, -John
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27,
> > >> Richard Yu
> > >> > > > > wrote:
> > >> > > > > > >>> Hi
> > >> > > > > > >>> > > > > > everybody!
> > >> > > > > > >>> > > > > > > >>>>>>>>>> I'd like to propose a change
> that we
> > >> probably
> > >> > > > > > >>> should've added
> > >> > > > > > >>> > > > > > > >>> for
> > >> > > > > > >>> > > > > > > >>>>>>>>>> a long time now. The key benefit
> of
> > >> this KIP
> > >> > > > > > >>> would be reduced
> > >> > > > > > >>> > > > > > > >>>>>>>>>> traffic in Kafka Streams since a
> lot
> > >> of no-op
> > >> > > > > > >>> results would no
> > >> > > > > > >>> > > > > > > >>>>>>>>>> longer be sent downstream. Here
> is
> > >> the KIP for
> > >> > > > > > >>> reference.
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>>
> > >> > > > >
> > >> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi
> > >> > > > > > >>> > > > > > > > ki.apache.org
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>>
> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > > > >>>
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>>
> > >> > > > >
> > >>
> %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThom
> > >> > > > > > >>> > > > > > > > as.Becker%40tivo.com
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > > > >>>
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>>
> > >> > > > >
> > >>
> 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&amp;sdata=zYpCSFOsyN4%2B
> > >> > > > > > >>> > > > > > > >
> > >> 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&amp;reserved=0
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > > Currently, I seek to formalize our
> approach
> > >> for this
> > >> > > > > KIP
> > >> > > > > > >>> first
> > >> > > > > > >>> > > > > > > >>> before
> > >> > > > > > >>> > > > > > > >>>>>>>>>> we determine concrete API
> additions /
> > >> > > > > > >>> configurations. Some
> > >> > > > > > >>> > > > > > > >>>>>>>>>> configs might warrant adding,
> whiles
> > >> others
> > >> > > > > are
> > >> > > > > > >>> not necessary
> > >> > > > > > >>> > > > > > > >>>>>>>>>> since adding them would only
> increase
> > >> > > > > complexity
> > >> > > > > > >>> of Kafka
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Streams. Cheers, Richard
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________
> > >> This email
> > >> > > > > and
> > >> > > > > > >>> any
> > >> > > > > > >>> > > > > > attachments
> > >> > > > > > >>> > > > > > > >>>>>>>>>> may contain confidential and
> > >> privileged
> > >> > > > > material
> > >> > > > > > >>> for the sole
> > >> > > > > > >>> > > > > > > >>> use
> > >> > > > > > >>> > > > > > > >>>>>>>>>> of the intended recipient. Any
> > >> review,
> > >> > > > > copying, or
> > >> > > > > > >>> > > > > > distribution
> > >> > > > > > >>> > > > > > > >>>>>>>>>> of this email (or any
> attachments)
> > >> by others
> > >> > > > > is
> > >> > > > > > >>> prohibited. If
> > >> > > > > > >>> > > > > > > >>>>>>>>>> you are not the intended
> recipient,
> > >> please
> > >> > > > > > >>> contact the sender
> > >> > > > > > >>> > > > > > > >>>>>>>>>> immediately and permanently
> delete
> > >> this email
> > >> > > > > and
> > >> > > > > > >>> any
> > >> > > > > > >>> > > > > > > >>>>>>>>>> attachments. No employee or
> agent of
> > >> TiVo is
> > >> > > > > > >>> authorized to
> > >> > > > > > >>> > > > > > > >>>>>>>>>> conclude any binding agreement on
> > >> behalf of
> > >> > > > > TiVo
> > >> > > > > > >>> by email.
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Binding agreements with TiVo may
> > >> only be made
> > >> > > > > by
> > >> > > > > > >>> a signed
> > >> > > > > > >>> > > > > > > >>> written
> > >> > > > > > >>> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker*
> > >> *Principal
> > >> > > > > Engineer *
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> *Personalized Content Discovery*
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
> > >> > > > > > >>> http://www.tivo.com/>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>> This email and any attachments
> may
> > >> contain
> > >> > > > > > >>> confidential and
> > >> > > > > > >>> > > > > > > >>>>>>>>>> privileged material for the sole
> use
> > >> of the
> > >> > > > > > >>> intended
> > >> > > > > > >>> > > > > > recipient.
> > >> > > > > > >>> > > > > > > >>>>>>>>>> Any review, copying, or
> distribution
> > >> of this
> > >> > > > > > >>> email (or any
> > >> > > > > > >>> > > > > > > >>>>>>>>>> attachments) by others is
> > >> prohibited. If you
> > >> > > > > are
> > >> > > > > > >>> not the
> > >> > > > > > >>> > > > > > > >>> intended
> > >> > > > > > >>> > > > > > > >>>>>>>>>> recipient, please contact the
> sender
> > >> > > > > immediately
> > >> > > > > > >>> and
> > >> > > > > > >>> > > > > > > >>> permanently
> > >> > > > > > >>> > > > > > > >>>>>>>>>> delete this email and any
> > >> attachments. No
> > >> > > > > > >>> employee or agent of
> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo is authorized to conclude
> any
> > >> binding
> > >> > > > > > >>> agreement on behalf
> > >> > > > > > >>> > > > > > > >>> of
> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo by email. Binding agreements
> > >> with TiVo
> > >> > > > > may
> > >> > > > > > >>> only be made
> > >> > > > > > >>> > > > > > > >>> by a
> > >> > > > > > >>> > > > > > > >>>>>>>>>> signed written agreement.
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> --
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> *Tommy Becker* /Principal
> Engineer /
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> /Personalized Content Discovery/
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
> > >> > > > > > >>> http://www.tivo.com/>
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>>
> > >> > > > >
> > >> ----------------------------------------------------------------------
> > >> > > > > > >>> > > > > > > >>>>>>>
> > >> > > > > > >>> > > > > > > >>>>>>
> > >> > > > > > >>> > > > > > > >>>>>
> > >> > > > > > >>> > > > > > > >>>>
> > >> > > > > > >>> > > > > > > >>>
> > >> > > > > > >>> > > > > > > >>
> > >> > > > > > >>> > > > > > > >
> > >> > > > > > >>> > > > > > >
> > >> > > > > > >>> > > > > > >
> > >> > > > > > >>> > > > > > > Attachments:
> > >> > > > > > >>> > > > > > > * signature.asc
> > >> > > > > > >>> > > > > >
> > >> > > > > > >>> > > >
> > >> > > > > > >>> >
> > >> > > > > > >>>
> > >> > > > > > >>
> > >> > > > > >
> > >> > > > >
> > >> > >
> > >> > >
> > >>
> > >
> >
>

Reply via email to