Hi Richard,

Thanks for the update!

I read it over, and overall it looks good!

I have only a minor concern about the rate metric definition:
> The rate option indicates the ratio of records dropped to actual volume of 
> records passing through the task
That's not the definition of a "rate". It should be something more like
"the average number of dropped idempotent updates per second".

Incidentally, I mentioned this KIP to Guozhang, and he brought up an
interesting concern I'd like to share with the list. He noted that if we filter
out idempotent table updates, stream time will not advance with every
input event anymore. He asked if this would have a negative impact on
operations that depend on stream time.

I think this is a valid concern. For example, you can use Suppress to
buffer KTable updates until a certain amount of stream time passes. 
Specifically,
the Suppress processor itself advances stream time as it receives new records
to its `process` method. In the pathological case, all updates are idempotent,
get dropped, and the Suppress operator never emits anything, even though to
an outside observer, stream time should have advanced.

Example scenario:
> inputTable = builder.table(input)
> suppressed = inputTable.suppress(untilTimeLimit(10))
> 
> input: key=A, timestamp=0, value=X
> inputTable: key=A, timestamp=0, value=X
> suppressed buffer: key=A, timestamp=0, value=X (observed stream time = 0)
> output: (nothing)
> 
> input: key=A, timestamp=11, value=X
> // update is idempotent, so it gets dropped
> inputTable: key=A, timestamp=0, value=X
> suppressed buffer: key=A, timestamp=0, value=X (observed stream time = 0)
> output: (nothing)

Note that even though stream time has technically advanced beyond the
suppression config of 10, the suppression buffer doesn't see it because the
KTable source already dropped the idempotent update.

I'm thinking that this situation is indeed concerning, and we should be aware
and make a note of it. However, I don't think that it should change our 
proposal.
To understand why, I'll enumerate all the usages of stream time in Kafka 
Streams:

windowed operations
-----------------------------
KStreamSessionWindowAggregate & KStreamWindowAggregate:
  - used to determine if an incoming record belongs to a window that is already 
closed
Suppressed.untilWindowCloses():
  - used to flush out results for windows, once they are closed
AbstractRocksDBSegmentedBytesStore & InMemorySessionStore & InMemoryWindowStore:
  - used to create new segments and drop old ones that are out of retention

non-windowed operations
-----------------------------------
Suppressed.untilTimeLimit
  - used to flush out prior updates that have passed the configured age, in 
stream time

Note that most of the usages are in windowed operations. One interesting thing
about this context is that records with higher timestamps belong to different 
windows.
In order to advance stream time far enough to close a window or push it out of 
retention, the new records must have timestamps that in later windows, which 
means
that they are updates to _new_ keys, which means they would not be suppressed as
idempotent. 

Updates within a window could still be suppressed, though:
For example, if the window size is 10, and the grace period is 5, and we get 
updates all
for the same key with timestamps 0, 11, and 16, today, we would emit the record 
for
the [0,10) window as soon as we got the update at time 16 (since stream time is 
now
past the window end + grace period time of 15). But if we drop the time=16 
update, we
wouldn't emit the [0,10) window results until we get a record with time >= 20. 

You can see that the maximum amount of (stream) time that dropping idempotent
updates could delay updates is one window. This might be surprising, but it 
doesn't
seem too bad, especially since Suppress explicitly does _not_ promise to emit 
the
results at the earliest possible time, just at some time after the window 
closes.

Even better, though, all the windowed aggregations are _stream_ aggregations
that _produce_ a KTable, and we already decided that (at least for now), we 
would
include the timestamp in the idempotence check for stream aggregation results.
With this strategy, we would actually not suppress _any_ update to a stream 
aggregation (windowed or otherwise) that advances stream time.

So there's no problem at all with windowed operations.

This only leaves non-windowed operations, of which there's only one. I have to 
admit
that I regret Suppressed.untilTimeLimit. It turns out that everyone I've heard 
of who
used this API actually wanted it to be wall-clock based, not stream-time based. 
So,
I'm not sure in practice if this sharp edge will actually cut anyone.

I think a "proper" fix would be to introduce some kind of control message to 
advance
stream time independently of records. We've already talked about this a little 
in
KAFKA-8769, and it would also be necessary for global consistency markers. 
Basically,
once a record is dropped as an idempotent update, we could still forward a time
marker through the topology, which could trigger suppressions, as well as window
boundaries, but wouldn't result in any computations.

But practically speaking, I'm really not confident that there's anyone really 
using the
untilTimeLimit suppression, and even if they are, if they would really see 
consecutive
idempotent updates for long enough to really have an observable impact on what 
gets emitted from the suppression buffer. In fact, if some hypothetical person 
did find
themselves on the wrong end of my assumption, they could _remove_ the
suppression, and rely instead on idempotence-dropping to get the same level of
traffic reduction that they enjoy from the suppression.

Anyway, long story short, I'm advocating to continue with the current proposal
and just make a note of the situations I've outlined above.

Thanks for your time,
-John


On Thu, Feb 27, 2020, at 14:33, Richard Yu wrote:
> Hi all,
> 
> I might've made a minor mistake. The processor node level is level 3, not
> level 1.
> I will correct the KIP accordingly.
> 
> After looking over things, I decided to start the voting thread this
> afternoon.
> 
> Cheers,
> Richard
> 
> On Thu, Feb 27, 2020 at 12:29 PM Richard Yu <yohan.richard...@gmail.com>
> wrote:
> 
> > Hi Bruno, Hi John,
> >
> > Thanks for your comments! I updated the KIP accordingly, and it looks like
> > for quite a few points. I was doing some beating around the bush which
> > could've been avoided.
> >
> > Looks like we can reduce the metric to Level 1 (per processor node) then.
> >
> > I've cleaned up most of the unnecessary info, and we should be fairly
> > close.
> > I will start working on a PR soon for this KIP. (although we might split
> > that up into stages)
> >
> > Cheers,
> > Richard
> >
> > On Thu, Feb 27, 2020 at 6:06 AM Bruno Cadonna <br...@confluent.io> wrote:
> >
> >> Hi John,
> >>
> >> I agree with you. It is better to measure the metric on processor node
> >> level. The users can do the rollup to task-level by themselves.
> >>
> >> Best,
> >> Bruno
> >>
> >> On Thu, Feb 27, 2020 at 12:09 AM John Roesler <vvcep...@apache.org>
> >> wrote:
> >> >
> >> > Hi Richard,
> >> >
> >> > I've been making a final pass over the KIP.
> >> >
> >> > Re: Proposed Behavior Change:
> >> >
> >> > I think this point is controversial and probably doesn't need to be
> >> there at all:
> >> > > 2.b. In certain situations where there is a high volume of idempotent
> >> > > updates throughout the Streams DAG, it will be recommended practice
> >> > > to materialize all operations to reduce traffic overall across the
> >> entire
> >> > >  network of nodes.
> >> >
> >> > Re-reading all the points, it seems like we can sum them up in a way
> >> that's
> >> > a little more straight to the point, and gives us the right amount of
> >> flexibility:
> >> >
> >> > > Proposed Behavior Changes
> >> > >
> >> > > Definition: "idempotent update" is one in which the new result and
> >> prior
> >> > > result,  when serialized, are identical byte arrays
> >> > >
> >> > > Note: an "update" is a concept that only applies to Table operations,
> >> so
> >> > > the concept of an "idempotent update" also only applies to Table
> >> operations.
> >> > > See
> >> https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
> >> > > for more information.
> >> > >
> >> > > Given that definition, we propose for Streams to drop idempotent
> >> updates
> >> > > in any situation where it's possible and convenient to do so. For
> >> example,
> >> > > any time we already have both the prior and new results serialized, we
> >> > > may compare them, and drop the update if it is idempotent.
> >> > >
> >> > > Note that under this proposal, we can implement idempotence checking
> >> > > in the following situations:
> >> > > 1. Any aggregation (for example, KGroupedStream, KGroupedTable,
> >> > >     TimeWindowedKStream, and SessionWindowedKStream operations)
> >> > > 2. Any Materialized KTable operation
> >> > > 3. Repartition operations, when we need to send both prior and new
> >> results
> >> >
> >> > Notice that in my proposed wording, we neither limit ourselves to just
> >> the
> >> > situations enumerated, nor promise to implement the optimization in
> >> every
> >> > possible situation. IMHO, this is the best way to propose such a
> >> feature.
> >> > That way, we have the flexibility to implement it in stages, and also
> >> to add
> >> > on to the implementation in the future.
> >> >
> >> >
> >> > Re: Metrics
> >> >
> >> > I agree with Bruno, although, I think it might just be a confusing
> >> statement.
> >> > It might be clearer to drop all the "discussion", and just say: "We
> >> will add a
> >> > metric to count the number of idempotent updates that we have dropped".
> >> >
> >> > Also, with respect to the metric, I'm wondering if the metric should be
> >> task-
> >> > level or processor-node-level. Since the interesting action takes place
> >> inside
> >> > individual processor nodes, I _think_ it would be higher leverage to
> >> just
> >> > measure it at the node level. WDYT?
> >> >
> >> > Re: Design Reasoning
> >> >
> >> > This section seems to be a little bit outdated. I also just noticed a
> >> "surprise"
> >> > configuration "timestamp.aggregation.selection.policy" hidden in point
> >> 1.a.
> >> > Is that part of the proposal? We haven't discussed it, and I think we
> >> were
> >> > talking about this KIP being "configuration free".
> >> >
> >> > There is also some discussion of discarded alternative in the Design
> >> Reasoning
> >> > section, which is confusing. Finally, there was a point there I didn't
> >> understand
> >> > at all, about stateless operators not being intended to load prior
> >> results.
> >> > This statement doesn't seem to be true, but it also doesn't seem to be
> >> relevant,
> >> > so maybe we can just drop it.
> >> >
> >> > Overall, it might help if you make a pass on this section, and just
> >> discuss as
> >> > briefly as possible the justification for the proposed behavior change,
> >> and
> >> > not adding a configuration. Try to avoid talking about things that we
> >> are not
> >> > proposing, since that will just lead to confusion.
> >> >
> >> > Similarly, I'd just completely remove the "Implementation [discarded]"
> >> section.
> >> > It was good to have this as part of the discussion initially, but as we
> >> move
> >> > toward a vote, it's better to just streamline the KIP document as much
> >> as
> >> > possible. Keeping a "discarded" section in the document will just make
> >> it
> >> > harder for new people to understand the proposal. We did the same thing
> >> > with KIP-441, where there were two prior drafts included at the end of
> >> the
> >> > document, and we just deleted them for clarity.
> >> >
> >> > I liked the "Compatibility" and "Rejected Alternatives" section. Very
> >> clear
> >> > and to the point.
> >> >
> >> > Thanks again for the contribution! I think once the KIP document is
> >> cleaned
> >> > up, we'll be in good shape to finalize the discussion.
> >> > -John
> >> >
> >> >
> >> > On Wed, Feb 26, 2020, at 07:27, Bruno Cadonna wrote:
> >> > > Hi Richard,
> >> > >
> >> > > 1. Could you change "idempotent update operations will only be dropped
> >> > > from KTables, not from other classes." -> idempotent update operations
> >> > > will only be dropped from materialized KTables? For non-materialized
> >> > > KTables -- as they can occur after optimization of the topology -- we
> >> > > cannot drop idempotent updates.
> >> > >
> >> > > 2. I cannot completely follow the metrics section. Do you want to
> >> > > record all idempotent updates or only the dropped ones? In particular,
> >> > > I do not understand the following sentences:
> >> > > "For that matter, even if we don't drop idempotent updates, we should
> >> > > at the very least record the number of idempotent updates that has
> >> > > been seen go through a particular processor."
> >> > > "Therefore, we should add some metrics which will count the number of
> >> > > idempotent updates that each node has seen."
> >> > > I do not see how we can record idempotent updates that we do not drop.
> >> > > If we see them, we should drop them. If we do not see them, we cannot
> >> > > drop them and we cannot record them.
> >> > >
> >> > > Best,
> >> > > Bruno
> >> > >
> >> > > On Wed, Feb 26, 2020 at 4:57 AM Richard Yu <
> >> yohan.richard...@gmail.com> wrote:
> >> > > >
> >> > > > Hi John,
> >> > > >
> >> > > > Sounds goods. It looks like we are close to wrapping things up. If
> >> there
> >> > > > isn't any other revisions which needs to be made. (If so, please
> >> comment in
> >> > > > the thread)
> >> > > > I will start the voting process this Thursday (Pacific Standard
> >> Time).
> >> > > >
> >> > > > Cheers,
> >> > > > Richard
> >> > > >
> >> > > > On Tue, Feb 25, 2020 at 11:59 AM John Roesler <vvcep...@apache.org>
> >> wrote:
> >> > > >
> >> > > > > Hi Richard,
> >> > > > >
> >> > > > > Sorry for the slow reply. I actually think we should avoid
> >> checking
> >> > > > > equals() for now. Your reasoning is good, but the truth is that
> >> > > > > depending on the implementation of equals() is non-trivial,
> >> > > > > semantically, and (though I proposed it before), I'm not convinced
> >> > > > > it's worth the risk. Much better to start with exactly one kind of
> >> > > > > "idempotence detection".
> >> > > > >
> >> > > > > Even if someone does update their serdes, we know that the new
> >> > > > > serde would still be able to _de_serialize the old format, or the
> >> whole
> >> > > > > app would break. The situation is that the new result gets encoded
> >> > > > > in the new binary format, which means we don't detect an
> >> idempotent
> >> > > > > update for what it is. In this case, we'd write the new binary
> >> format to
> >> > > > > disk and the changelog, and forward it downstream. However, we
> >> only
> >> > > > > do this once. Now that the binary format for that record has been
> >> updated,
> >> > > > > we would correctly detect idempotence of any subsequent updates.
> >> > > > >
> >> > > > > Plus, we would still be able to filter out idempotent updates in
> >> > > > > repartition
> >> > > > > sinks, since for those, we use the new serde to serialize both
> >> the "old"
> >> > > > > and
> >> > > > > "new" result.
> >> > > > >
> >> > > > > It's certainly a good observation, but I think we can just make a
> >> note of
> >> > > > > it
> >> > > > > in "rejected alternatives" for now, and plan to refine it later,
> >> if it does
> >> > > > > pose a big performance problem.
> >> > > > >
> >> > > > > Thanks!
> >> > > > > -John
> >> > > > >
> >> > > > > On Sat, Feb 22, 2020, at 18:14, Richard Yu wrote:
> >> > > > > > Hi all,
> >> > > > > >
> >> > > > > > Updated the KIP.
> >> > > > > >
> >> > > > > > Just a question: do you think it would be a good idea if we
> >> check for
> >> > > > > both
> >> > > > > > Object#equals() and binary equality?
> >> > > > > > Because there might be some subtle changes in the serialization
> >> (for
> >> > > > > > example, if the user decides to upgrade their serialization
> >> procedure to
> >> > > > > a
> >> > > > > > new one), but the underlying values of the result might be the
> >> same.
> >> > > > > > (therefore equals() might return true)
> >> > > > > >
> >> > > > > > Do you think this would be plausible?
> >> > > > > >
> >> > > > > > Cheers,
> >> > > > > > Richard
> >> > > > > >
> >> > > > > > On Fri, Feb 21, 2020 at 2:37 PM Richard Yu <
> >> yohan.richard...@gmail.com>
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > Hello,
> >> > > > > > >
> >> > > > > > > Just to make some updates. I changed the name of the metric
> >> so that it
> >> > > > > was
> >> > > > > > > more in line with usual Kafka naming conventions for metrics
> >> / sensors.
> >> > > > > > > Below is the updated description of the metric:
> >> > > > > > >
> >> > > > > > > dropped-idempotent-updates : (Level 2 - Per Task) DEBUG (rate
> >> | total)
> >> > > > > > >
> >> > > > > > > Description: This metric will record the number of updates
> >> that have
> >> > > > > been
> >> > > > > > > dropped since they are essentially re-performing an earlier
> >> operation.
> >> > > > > > >
> >> > > > > > > Note:
> >> > > > > > >
> >> > > > > > >    - The rate option indicates the ratio of records dropped
> >> to actual
> >> > > > > > >    volume of records passing through the task.
> >> > > > > > >    - The total option will just give a raw count of the
> >> number of
> >> > > > > records
> >> > > > > > >    dropped.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > I hope that this is more on point.
> >> > > > > > >
> >> > > > > > > Best,
> >> > > > > > > Richard
> >> > > > > > >
> >> > > > > > > On Fri, Feb 21, 2020 at 2:20 PM Richard Yu <
> >> yohan.richard...@gmail.com
> >> > > > > >
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > >> Hi all,
> >> > > > > > >>
> >> > > > > > >> Thanks for the clarification. I was just confused a little
> >> on what was
> >> > > > > > >> going on.
> >> > > > > > >>
> >> > > > > > >> So I guess then that for the actual proposal. We got the
> >> following:
> >> > > > > > >>
> >> > > > > > >> 1. We check for binary equality, and perform no extra look
> >> ups.
> >> > > > > > >> 2. Emphasize that this applies only to materialized tables.
> >> > > > > > >> 3. We drop aggregation updates if key, value and timestamp
> >> is the
> >> > > > > same.
> >> > > > > > >>
> >> > > > > > >> Then that settles the behavior changes. So it looks like the
> >> Metric
> >> > > > > that
> >> > > > > > >> is the only thing that is left. In this case, I think the
> >> metric
> >> > > > > would be
> >> > > > > > >> named the following: IdempotentUpdateMetric. This is mostly
> >> off the
> >> > > > > top of
> >> > > > > > >> my head. So if you think that we change it, feel free to say
> >> so.
> >> > > > > > >> The metric will report the number of dropped operations
> >> inherently.
> >> > > > > > >>
> >> > > > > > >> It will probably be added as a Sensor, similar to the
> >> dropped records
> >> > > > > > >> sensor we already have.
> >> > > > > > >>
> >> > > > > > >> If there isn't anything else, I will probably start the
> >> voting process
> >> > > > > > >> next week!
> >> > > > > > >>
> >> > > > > > >> Cheers,
> >> > > > > > >> Richard
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> On Fri, Feb 21, 2020 at 11:23 AM John Roesler <
> >> vvcep...@apache.org>
> >> > > > > > >> wrote:
> >> > > > > > >>
> >> > > > > > >>> Hi Bruno,
> >> > > > > > >>>
> >> > > > > > >>> Thanks for the clarification. Indeed, I was thinking two
> >> things:
> >> > > > > > >>> 1. For the initial implementation, we can just avoid adding
> >> any extra
> >> > > > > > >>> lookups, but only do the comparison when we already happen
> >> to have
> >> > > > > > >>> the prior value.
> >> > > > > > >>> 2. I think, as a result of the timestamp semantics, we
> >> actually _do_
> >> > > > > look
> >> > > > > > >>> up the prior value approximately all the time, so the
> >> idempotence
> >> > > > > check
> >> > > > > > >>> should be quite effective.
> >> > > > > > >>>
> >> > > > > > >>> I think that second point is the same thing you're
> >> referring to
> >> > > > > > >>> potentially
> >> > > > > > >>> being unnecessary. It does mean that we do fetch the whole
> >> value in a
> >> > > > > > >>> lot of cases where we really only need the timestamp, so it
> >> could
> >> > > > > > >>> certainly
> >> > > > > > >>> be optimized in the future. In that future, we would need
> >> to weigh
> >> > > > > that
> >> > > > > > >>> optimization against losing the idempotence check. But,
> >> that's a
> >> > > > > problem
> >> > > > > > >>> for tomorrow :)
> >> > > > > > >>>
> >> > > > > > >>> I'm 100% on board with scrutinizing the performance as we
> >> implement
> >> > > > > > >>> this feature.
> >> > > > > > >>>
> >> > > > > > >>> Thanks again,
> >> > > > > > >>> -John
> >> > > > > > >>>
> >> > > > > > >>> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote:
> >> > > > > > >>> > Hi John,
> >> > > > > > >>> >
> >> > > > > > >>> > I am glad to help you with your imagination. With
> >> overhead, I
> >> > > > > mainly
> >> > > > > > >>> > meant the additional lookup into the state store to get
> >> the current
> >> > > > > > >>> > value, but I see now in the code that we do that lookup
> >> anyways
> >> > > > > > >>> > (although I think we could avoid that in the cases where
> >> we do not
> >> > > > > > >>> > need the old value). With or without config, we need to
> >> evaluate
> >> > > > > the
> >> > > > > > >>> > performance benefits of this change, in any case.
> >> > > > > > >>> >
> >> > > > > > >>> > Best,
> >> > > > > > >>> > Bruno
> >> > > > > > >>> >
> >> > > > > > >>> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler <
> >> vvcep...@apache.org>
> >> > > > > > >>> wrote:
> >> > > > > > >>> > >
> >> > > > > > >>> > > Thanks for your remarks, Bruno!
> >> > > > > > >>> > >
> >> > > > > > >>> > > I'm in favor of standardizing on terminology like "not
> >> forwarding
> >> > > > > > >>> > > idempotent updates" or "dropping idempotent updates".
> >> Maybe we
> >> > > > > > >>> > > should make a pass on the KIP and just convert
> >> everything to this
> >> > > > > > >>> > > phrasing. In retrospect, even the term "emit-on-change"
> >> has too
> >> > > > > much
> >> > > > > > >>> > > semantic baggage, since it implies the semantics from
> >> the SECRET
> >> > > > > > >>> > > paper, which we don't really want to imply here.
> >> > > > > > >>> > >
> >> > > > > > >>> > > I'm also in favor of the metric as you propose.
> >> > > > > > >>> > >
> >> > > > > > >>> > > Likewise with stream aggregations, I was also under the
> >> > > > > impression
> >> > > > > > >>> > > that we agreed on dropping idempotent updates to the
> >> aggregation
> >> > > > > > >>> > > result, any time we find that our "new" (key, value,
> >> timestamp)
> >> > > > > > >>> result
> >> > > > > > >>> > > is identical to the prior one.
> >> > > > > > >>> > >
> >> > > > > > >>> > > Also, I'm +1 on all your recommendations for updating
> >> the KIP
> >> > > > > > >>> document
> >> > > > > > >>> > > for clarity.
> >> > > > > > >>> > >
> >> > > > > > >>> > > Regarding the opt-out config. Perhaps I'm suffering
> >> from a
> >> > > > > failure of
> >> > > > > > >>> > > imagination, but I don't see how the current proposal
> >> could
> >> > > > > really
> >> > > > > > >>> have
> >> > > > > > >>> > > a measurable impact on latency. If all we do is make a
> >> single
> >> > > > > extra
> >> > > > > > >>> pass
> >> > > > > > >>> > > to compare two byte arrays for equality, only in the
> >> cases where
> >> > > > > we
> >> > > > > > >>> already
> >> > > > > > >>> > > have the byte arrays available, it seems unlikely to
> >> measurably
> >> > > > > > >>> affect the
> >> > > > > > >>> > > processing of non-idempotent updates. It seems
> >> guaranteed to
> >> > > > > > >>> _decrease_
> >> > > > > > >>> > > the latency of processing idempotent updates, since we
> >> get to
> >> > > > > skip a
> >> > > > > > >>> > > store#put, at least one producer#send, and also all
> >> downstream
> >> > > > > > >>> processing,
> >> > > > > > >>> > > including all the disk and network operations
> >> associated with
> >> > > > > > >>> downstream
> >> > > > > > >>> > > operations.
> >> > > > > > >>> > >
> >> > > > > > >>> > > It seems like if we're pretty sure this change would
> >> only
> >> > > > > _help_, we
> >> > > > > > >>> shouldn't
> >> > > > > > >>> > > introduce the operational burden of an extra
> >> configuration. If we
> >> > > > > > >>> want to
> >> > > > > > >>> > > be more aggressive about dropping idempotent operations
> >> in the
> >> > > > > > >>> future,
> >> > > > > > >>> > > such as depending on equals() or adding a ChangeDetector
> >> > > > > interface,
> >> > > > > > >>> then
> >> > > > > > >>> > > we should consider adding a configuration as part of
> >> that future
> >> > > > > > >>> work. In
> >> > > > > > >>> > > fact, if we add a simple "opt-in/opt-out" switch right
> >> now, we
> >> > > > > might
> >> > > > > > >>> find
> >> > > > > > >>> > > that it's actually insufficient for whatever future
> >> feature we
> >> > > > > might
> >> > > > > > >>> propose,
> >> > > > > > >>> > > then we have a mess of deprecating the opt-out config
> >> and
> >> > > > > replacing
> >> > > > > > >>> it.
> >> > > > > > >>> > >
> >> > > > > > >>> > > What do you think?
> >> > > > > > >>> > > -John
> >> > > > > > >>> > >
> >> > > > > > >>> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna wrote:
> >> > > > > > >>> > > > Hi all,
> >> > > > > > >>> > > >
> >> > > > > > >>> > > > Sorry for the late reply!
> >> > > > > > >>> > > >
> >> > > > > > >>> > > > I am also in favour of baby steps.
> >> > > > > > >>> > > >
> >> > > > > > >>> > > > I am undecided whether the KIP should contain a
> >> opt-out config
> >> > > > > or
> >> > > > > > >>> not.
> >> > > > > > >>> > > > The overhead of emit-on-change might affect latency.
> >> For
> >> > > > > > >>> applications
> >> > > > > > >>> > > > where low latency is crucial and there are not too
> >> many
> >> > > > > idempotent
> >> > > > > > >>> > > > updates, it would be better to fall back to
> >> emit-on-update.
> >> > > > > > >>> However,
> >> > > > > > >>> > > > we do not know how much emit-on-change impacts
> >> latency. We
> >> > > > > would
> >> > > > > > >>> first
> >> > > > > > >>> > > > need to benchmark that before we can decide about the
> >> > > > > > >>> opt-out-config.
> >> > > > > > >>> > > >
> >> > > > > > >>> > > > A metric of dropped idempotent updates seems useful
> >> to me to be
> >> > > > > > >>> > > > informed about potential upstream applications or
> >> upstream
> >> > > > > > >>> operators
> >> > > > > > >>> > > > that produce too many idempotent updates. The KIP
> >> should state
> >> > > > > the
> >> > > > > > >>> > > > name of the metric, its group, its tags, and its
> >> recording
> >> > > > > level
> >> > > > > > >>> (see
> >> > > > > > >>> > > > KIP-444 or KIP-471 for examples). I propose DEBUG as
> >> reporting
> >> > > > > > >>> level.
> >> > > > > > >>> > > >
> >> > > > > > >>> > > > Richard, what competing proposals for emit-on-change
> >> for
> >> > > > > > >>> aggregations
> >> > > > > > >>> > > > do you mean? I have the feeling that we agreed to get
> >> rid of
> >> > > > > > >>> > > > idempotent updates if the aggregate is updated with
> >> the same
> >> > > > > key,
> >> > > > > > >>> > > > value, AND timestamp. I am also fine if we do not
> >> include this
> >> > > > > into
> >> > > > > > >>> > > > this KIP (remember: baby steps).
> >> > > > > > >>> > > >
> >> > > > > > >>> > > > You write that "emit-on-change is more correct".
> >> Since we
> >> > > > > agreed
> >> > > > > > >>> that
> >> > > > > > >>> > > > this is an optimization, IMO you cannot argue this
> >> way.
> >> > > > > > >>> > > >
> >> > > > > > >>> > > > Please put "Alternative Approaches" under "Rejected
> >> > > > > Alternatives",
> >> > > > > > >>> so
> >> > > > > > >>> > > > that it becomes clear that we are not going to
> >> implement them.
> >> > > > > In
> >> > > > > > >>> > > > general, I think the KIP needs a bit of clean-up
> >> (probably, you
> >> > > > > > >>> > > > already planned for it). "Design Reasoning" is a bit
> >> of
> >> > > > > behavior
> >> > > > > > >>> > > > changes, rejected alternatives and duplicates a bit
> >> the
> >> > > > > content in
> >> > > > > > >>> > > > those sections.
> >> > > > > > >>> > > >
> >> > > > > > >>> > > > I do not like the name "no-op operations" or
> >> "no-ops", because
> >> > > > > they
> >> > > > > > >>> > > > are rather generic. I like more "idempotent updates".
> >> > > > > > >>> > > >
> >> > > > > > >>> > > > Best,
> >> > > > > > >>> > > > Bruno
> >> > > > > > >>> > > >
> >> > > > > > >>> > > >
> >> > > > > > >>> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu <
> >> > > > > > >>> yohan.richard...@gmail.com> wrote:
> >> > > > > > >>> > > > >
> >> > > > > > >>> > > > > Hi all,
> >> > > > > > >>> > > > >
> >> > > > > > >>> > > > > We are definitely making progress!
> >> > > > > > >>> > > > >
> >> > > > > > >>> > > > > @John should I emphasize in the proposed behavior
> >> changes
> >> > > > > that
> >> > > > > > >>> we are only
> >> > > > > > >>> > > > > doing binary equality checks for stateful operators?
> >> > > > > > >>> > > > > It looks like we have come close to finalizing this
> >> part of
> >> > > > > the
> >> > > > > > >>> KIP. (I
> >> > > > > > >>> > > > > will note in the KIP that this proposal is intended
> >> for
> >> > > > > > >>> optimization, not
> >> > > > > > >>> > > > > semantics correctness)
> >> > > > > > >>> > > > >
> >> > > > > > >>> > > > > I do think maybe we still have one other detail we
> >> need to
> >> > > > > > >>> discuss. So far,
> >> > > > > > >>> > > > > there has been quite a bit of back and forth about
> >> what the
> >> > > > > > >>> behavior of
> >> > > > > > >>> > > > > aggregations should look like in emit on change. I
> >> have seen
> >> > > > > > >>> > > > > multiple competing proposals, so I am not
> >> completely certain
> >> > > > > > >>> which one we
> >> > > > > > >>> > > > > should go with, or how we will be able to
> >> compromise in
> >> > > > > between
> >> > > > > > >>> them.
> >> > > > > > >>> > > > >
> >> > > > > > >>> > > > > Let me know what your thoughts are on this matter,
> >> since we
> >> > > > > are
> >> > > > > > >>> probably
> >> > > > > > >>> > > > > close to wrapping up most other stuff.
> >> > > > > > >>> > > > > @Matthias J. Sax <matth...@confluent.io>  and
> >> @Bruno, see
> >> > > > > what
> >> > > > > > >>> you think
> >> > > > > > >>> > > > > about this.
> >> > > > > > >>> > > > >
> >> > > > > > >>> > > > > Best,
> >> > > > > > >>> > > > > Richard
> >> > > > > > >>> > > > >
> >> > > > > > >>> > > > >
> >> > > > > > >>> > > > >
> >> > > > > > >>> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler <
> >> > > > > > >>> vvcep...@apache.org> wrote:
> >> > > > > > >>> > > > >
> >> > > > > > >>> > > > > > Thanks, Matthias!
> >> > > > > > >>> > > > > >
> >> > > > > > >>> > > > > > Regarding numbers, it would be hard to know how
> >> many
> >> > > > > > >>> applications
> >> > > > > > >>> > > > > > would benefit, since we don't know how many
> >> applications
> >> > > > > there
> >> > > > > > >>> are,
> >> > > > > > >>> > > > > > or anything about their data sets or topologies.
> >> We could
> >> > > > > do a
> >> > > > > > >>> survey,
> >> > > > > > >>> > > > > > but it seems overkill if we take the conservative
> >> approach.
> >> > > > > > >>> > > > > >
> >> > > > > > >>> > > > > > I have my own practical stream processing
> >> experience that
> >> > > > > > >>> tells me this
> >> > > > > > >>> > > > > > is absolutely critical for any moderate-to-large
> >> relational
> >> > > > > > >>> stream
> >> > > > > > >>> > > > > > processing use cases. I'll leave it to you to
> >> decide if you
> >> > > > > > >>> find that
> >> > > > > > >>> > > > > > convincing, but it's definitely not an
> >> _assumption_. I've
> >> > > > > also
> >> > > > > > >>> heard from
> >> > > > > > >>> > > > > > a few Streams users who have already had to
> >> implement
> >> > > > > their own
> >> > > > > > >>> > > > > > noop-suppression transformers in order to get to
> >> production
> >> > > > > > >>> scale.
> >> > > > > > >>> > > > > >
> >> > > > > > >>> > > > > > Regardless, it sounds like we can agree on taking
> >> an
> >> > > > > > >>> opportunistic approach
> >> > > > > > >>> > > > > > and targeting the optimization just to use a
> >> > > > > binary-equality
> >> > > > > > >>> check at
> >> > > > > > >>> > > > > > stateful operators. (I'd also suggest in sink
> >> nodes, when
> >> > > > > we
> >> > > > > > >>> are about to
> >> > > > > > >>> > > > > > send old and new values, since they are also
> >> already
> >> > > > > present
> >> > > > > > >>> and serialized
> >> > > > > > >>> > > > > > at that point.) We could make the KIP even more
> >> vague, and
> >> > > > > > >>> just say that
> >> > > > > > >>> > > > > > we'll drop no-op updates "when possible".
> >> > > > > > >>> > > > > >
> >> > > > > > >>> > > > > > I'm curious what Bruno and the others think about
> >> this. If
> >> > > > > it
> >> > > > > > >>> seems like
> >> > > > > > >>> > > > > > a good starting point, perhaps we could move to a
> >> vote soon
> >> > > > > > >>> and get to
> >> > > > > > >>> > > > > > work on the implementation!
> >> > > > > > >>> > > > > >
> >> > > > > > >>> > > > > > Thanks,
> >> > > > > > >>> > > > > > -John
> >> > > > > > >>> > > > > >
> >> > > > > > >>> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J. Sax
> >> wrote:
> >> > > > > > >>> > > > > > > Talking about optimizations and reducing
> >> downstream load:
> >> > > > > > >>> > > > > > >
> >> > > > > > >>> > > > > > > Do we actually have any numbers? I have the
> >> impression
> >> > > > > that
> >> > > > > > >>> this KIP is
> >> > > > > > >>> > > > > > > more or less build on the _assumption_ that
> >> there is a
> >> > > > > > >>> problem. Yes,
> >> > > > > > >>> > > > > > > there are some use cases that would benefit
> >> from this;
> >> > > > > But
> >> > > > > > >>> how many
> >> > > > > > >>> > > > > > > applications would actually benefit? And how
> >> much load
> >> > > > > > >>> reduction would
> >> > > > > > >>> > > > > > > they get?
> >> > > > > > >>> > > > > > >
> >> > > > > > >>> > > > > > > The simplest approach (following John idea to
> >> make baby
> >> > > > > > >>> steps) would be
> >> > > > > > >>> > > > > > > to apply the emit-on-change pattern only if
> >> there is a
> >> > > > > > >>> store. For this
> >> > > > > > >>> > > > > > > case we need to serialize old and new result
> >> anyway and
> >> > > > > thus
> >> > > > > > >>> a simple
> >> > > > > > >>> > > > > > > byte-array comparison is no overhead.
> >> > > > > > >>> > > > > > >
> >> > > > > > >>> > > > > > > Sending `oldValues` by default would become
> >> expensive
> >> > > > > > >>> because we would
> >> > > > > > >>> > > > > > > need to serialize the recomputed old result, as
> >> well as
> >> > > > > the
> >> > > > > > >>> new result,
> >> > > > > > >>> > > > > > > to make the comparison (and we now the
> >> serialization is
> >> > > > > not
> >> > > > > > >>> cheap). We
> >> > > > > > >>> > > > > > > are facing a trade-off between CPU overhead and
> >> > > > > downstream
> >> > > > > > >>> load and I am
> >> > > > > > >>> > > > > > > not sure if we should hard code this. My
> >> original
> >> > > > > argument
> >> > > > > > >>> for sending
> >> > > > > > >>> > > > > > > `oldValues` was about semantics; but for an
> >> > > > > optimization, I
> >> > > > > > >>> am not sure
> >> > > > > > >>> > > > > > > if this would be the right choice.
> >> > > > > > >>> > > > > > >
> >> > > > > > >>> > > > > > > For now, users who want to opt-in can force a
> >> > > > > > >>> materialization. A
> >> > > > > > >>> > > > > > > materialization may be expensive and if we see
> >> future
> >> > > > > > >>> demand, we could
> >> > > > > > >>> > > > > > > still add an option to send `oldValues` instead
> >> of
> >> > > > > > >>> materialization (this
> >> > > > > > >>> > > > > > > would at least save the store overhead). As we
> >> consider
> >> > > > > the
> >> > > > > > >>> KIP an
> >> > > > > > >>> > > > > > > optimization, a "config" seems to make sense.
> >> > > > > > >>> > > > > > >
> >> > > > > > >>> > > > > > >
> >> > > > > > >>> > > > > > > -Matthias
> >> > > > > > >>> > > > > > >
> >> > > > > > >>> > > > > > >
> >> > > > > > >>> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote:
> >> > > > > > >>> > > > > > > > Hi John!
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > Thanks for the reply.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > About the changes we have discussed so far. I
> >> think
> >> > > > > upon
> >> > > > > > >>> further
> >> > > > > > >>> > > > > > > > consideration, we have been mostly talking
> >> about this
> >> > > > > from
> >> > > > > > >>> the
> >> > > > > > >>> > > > > > perspective
> >> > > > > > >>> > > > > > > > that no stop-gap effort is acceptable.
> >> However, in
> >> > > > > recent
> >> > > > > > >>> discussion,
> >> > > > > > >>> > > > > > > if we
> >> > > > > > >>> > > > > > > > consider optimization, then it appears that
> >> the
> >> > > > > > >>> perspective I
> >> > > > > > >>> > > > > > mentioned no
> >> > > > > > >>> > > > > > > > longer applies. After all, we are no longer
> >> concerned
> >> > > > > so
> >> > > > > > >>> much about
> >> > > > > > >>> > > > > > > > semantics correctness, then reducing traffic
> >> as much as
> >> > > > > > >>> possible
> >> > > > > > >>> > > > > > without
> >> > > > > > >>> > > > > > > > performance tradeoffs.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > In this case, I think a cache would be a good
> >> idea for
> >> > > > > > >>> stateless
> >> > > > > > >>> > > > > > > > operations. This cache will not be backed by
> >> a store
> >> > > > > > >>> obviously. We can
> >> > > > > > >>> > > > > > > > probably use Kafka's ThreadCache. We should
> >> be able to
> >> > > > > > >>> catch a large
> >> > > > > > >>> > > > > > > > portion of the no-ops if we at least store
> >> some
> >> > > > > results in
> >> > > > > > >>> the cache.
> >> > > > > > >>> > > > > > Not
> >> > > > > > >>> > > > > > > > all will be caught, but I think the impact
> >> will be
> >> > > > > > >>> significant.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > On another note, I think that we should
> >> implement
> >> > > > > > >>> competing proposals
> >> > > > > > >>> > > > > > i.e.
> >> > > > > > >>> > > > > > > > one where we forward both old and new values
> >> with a
> >> > > > > > >>> reasonable
> >> > > > > > >>> > > > > > proportion
> >> > > > > > >>> > > > > > > > of artificial no-ops (we do not necessarily
> >> have to
> >> > > > > rely
> >> > > > > > >>> on equals so
> >> > > > > > >>> > > > > > much
> >> > > > > > >>> > > > > > > > as comparing the serialized binary data after
> >> the
> >> > > > > > >>> operation), and in
> >> > > > > > >>> > > > > > > > another scenario, the cache for stateless
> >> ops. It
> >> > > > > would be
> >> > > > > > >>> > > > > > unreasonable if
> >> > > > > > >>> > > > > > > > we completely disregard either approach,
> >> since they
> >> > > > > both
> >> > > > > > >>> have merit.
> >> > > > > > >>> > > > > > The
> >> > > > > > >>> > > > > > > > reason for implementing both is to perform
> >> benchmark
> >> > > > > tests
> >> > > > > > >>> on them, and
> >> > > > > > >>> > > > > > > > compare them with the original. This way, we
> >> can more
> >> > > > > > >>> clearly see what
> >> > > > > > >>> > > > > > is
> >> > > > > > >>> > > > > > > > the drawbacks and the gains. So far, we have
> >> been
> >> > > > > > >>> discussing only
> >> > > > > > >>> > > > > > > > hypotheticals, and if we continue to do so, I
> >> think it
> >> > > > > is
> >> > > > > > >>> likely no
> >> > > > > > >>> > > > > > ground
> >> > > > > > >>> > > > > > > > will be gained.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > After all, what we seek is optimization, and
> >> > > > > performance
> >> > > > > > >>> benchmarks
> >> > > > > > >>> > > > > > > will be
> >> > > > > > >>> > > > > > > > mandatory for a KIP of this nature.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > Hope this helps,
> >> > > > > > >>> > > > > > > > Richard
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John Roesler <
> >> > > > > > >>> vvcep...@apache.org>
> >> > > > > > >>> > > > > > wrote:
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >> Hi again, all,
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> Sorry on my part for my silence.
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> I've just taken another look over the recent
> >> history
> >> > > > > of
> >> > > > > > >>> this
> >> > > > > > >>> > > > > > > discussion. It
> >> > > > > > >>> > > > > > > >> seems like the #1 point to clarify (because
> >> it affect
> >> > > > > > >>> everything
> >> > > > > > >>> > > > > > else) is
> >> > > > > > >>> > > > > > > >> that,
> >> > > > > > >>> > > > > > > >> yes, I was 100% envisioning this as an
> >> _optimization_.
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> As a consequence, I don't think it's
> >> critical to make
> >> > > > > any
> >> > > > > > >>> hard
> >> > > > > > >>> > > > > > guarantees
> >> > > > > > >>> > > > > > > >> about
> >> > > > > > >>> > > > > > > >> what results get forwarded and what (no-op
> >> updates)
> >> > > > > get
> >> > > > > > >>> dropped. I'd
> >> > > > > > >>> > > > > > > >> initially
> >> > > > > > >>> > > > > > > >> just been thinking about doing this
> >> opportunistically
> >> > > > > in
> >> > > > > > >>> cases where
> >> > > > > > >>> > > > > > we
> >> > > > > > >>> > > > > > > >> already
> >> > > > > > >>> > > > > > > >> had the "old" and "new" result in memory,
> >> thanks to a
> >> > > > > > >>> request to
> >> > > > > > >>> > > > > > > "emit old
> >> > > > > > >>> > > > > > > >> values", or to the implementation of
> >> timestamp
> >> > > > > semantics.
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> However, whether or not it's semantically
> >> critical, I
> >> > > > > do
> >> > > > > > >>> think that
> >> > > > > > >>> > > > > > > >> Matthias's
> >> > > > > > >>> > > > > > > >> idea to use the change-forwarding mechanism
> >> to check
> >> > > > > for
> >> > > > > > >>> no-ops even
> >> > > > > > >>> > > > > > on
> >> > > > > > >>> > > > > > > >> stateless operations is pretty interesting.
> >> > > > > Specifically,
> >> > > > > > >>> this would
> >> > > > > > >>> > > > > > > >> _really_
> >> > > > > > >>> > > > > > > >> let you pare down useless updates by using
> >> mapValues
> >> > > > > to
> >> > > > > > >>> strip down
> >> > > > > > >>> > > > > > > records
> >> > > > > > >>> > > > > > > >> only
> >> > > > > > >>> > > > > > > >> to what you really need. However, the
> >> dependence on
> >> > > > > the
> >> > > > > > >>> > > > > > implementation of
> >> > > > > > >>> > > > > > > >> equals() is troubling.
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> It might make sense to table this idea, as
> >> well as my
> >> > > > > > >>> complicated
> >> > > > > > >>> > > > > > no-op
> >> > > > > > >>> > > > > > > >> detection algorithm, and initially propose
> >> just a
> >> > > > > > >>> nonconfigurable
> >> > > > > > >>> > > > > > feature
> >> > > > > > >>> > > > > > > >> to
> >> > > > > > >>> > > > > > > >> check "old" and "new" results for binary
> >> equality
> >> > > > > before
> >> > > > > > >>> forwarding.
> >> > > > > > >>> > > > > > > I.e.,
> >> > > > > > >>> > > > > > > >> if
> >> > > > > > >>> > > > > > > >> any operation determines that the old and
> >> new results
> >> > > > > are
> >> > > > > > >>> > > > > > > >> binary-identical, we
> >> > > > > > >>> > > > > > > >> would not forward.
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> I'll admit that this doesn't serve Tommy's
> >> use case
> >> > > > > very
> >> > > > > > >>> well, but it
> >> > > > > > >>> > > > > > > >> might be
> >> > > > > > >>> > > > > > > >> better to take baby steps with an
> >> optimization like
> >> > > > > this
> >> > > > > > >>> and not risk
> >> > > > > > >>> > > > > > > >> over-reaching in a way that actually harms
> >> > > > > performance or
> >> > > > > > >>> > > > > > correctness. We
> >> > > > > > >>> > > > > > > >> could
> >> > > > > > >>> > > > > > > >> always expand the feature to use equals() or
> >> some
> >> > > > > kind of
> >> > > > > > >>> > > > > > ChangeDetector
> >> > > > > > >>> > > > > > > >> later
> >> > > > > > >>> > > > > > > >> on, in a more focused discussion.
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> Regarding metrics or debug logs, I guess I
> >> don't feel
> >> > > > > > >>> strongly, but it
> >> > > > > > >>> > > > > > > >> feels
> >> > > > > > >>> > > > > > > >> like two things will happen that make it
> >> nicer to add
> >> > > > > > >>> them:
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> 1. This feature is going to surprise/annoy
> >> _somebody_,
> >> > > > > > >>> and it would be
> >> > > > > > >>> > > > > > > >> nice to
> >> > > > > > >>> > > > > > > >> be able to definitively say the reason that
> >> updates
> >> > > > > are
> >> > > > > > >>> dropped is
> >> > > > > > >>> > > > > > that
> >> > > > > > >>> > > > > > > >> they
> >> > > > > > >>> > > > > > > >> were no-ops. The easiest smoking gun is if
> >> there are
> >> > > > > > >>> debug-logs that
> >> > > > > > >>> > > > > > > can be
> >> > > > > > >>> > > > > > > >> enabled. This person might just be looking
> >> at the
> >> > > > > > >>> dashboards,
> >> > > > > > >>> > > > > > > wondering why
> >> > > > > > >>> > > > > > > >> there are 100K updates per second going into
> >> their
> >> > > > > app,
> >> > > > > > >>> but only 1K
> >> > > > > > >>> > > > > > > >> results per
> >> > > > > > >>> > > > > > > >> second coming out. Having the metric there
> >> makes the
> >> > > > > > >>> accounting
> >> > > > > > >>> > > > > > easier.
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> 2. Somebody is going to struggle with
> >> high-volume
> >> > > > > > >>> updates, and it
> >> > > > > > >>> > > > > > > would be
> >> > > > > > >>> > > > > > > >> nice
> >> > > > > > >>> > > > > > > >> for them to know that this feature is saving
> >> them
> >> > > > > > >>> X-thousand updates
> >> > > > > > >>> > > > > > per
> >> > > > > > >>> > > > > > > >> second,
> >> > > > > > >>> > > > > > > >> etc.
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> What does everyone think about this? Note,
> >> as I read
> >> > > > > it,
> >> > > > > > >>> what I've
> >> > > > > > >>> > > > > > said
> >> > > > > > >>> > > > > > > >> above is
> >> > > > > > >>> > > > > > > >> already reflected in the text of the KIP.
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> Thanks,
> >> > > > > > >>> > > > > > > >> -John
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard Yu
> >> wrote:
> >> > > > > > >>> > > > > > > >>> Hi all,
> >> > > > > > >>> > > > > > > >>>
> >> > > > > > >>> > > > > > > >>> Bumping this. If you feel that this KIP is
> >> not too
> >> > > > > > >>> urgent. Then let
> >> > > > > > >>> > > > > > me
> >> > > > > > >>> > > > > > > >>> know. :)
> >> > > > > > >>> > > > > > > >>>
> >> > > > > > >>> > > > > > > >>> Cheers,
> >> > > > > > >>> > > > > > > >>> Richard
> >> > > > > > >>> > > > > > > >>>
> >> > > > > > >>> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard Yu <
> >> > > > > > >>> > > > > > yohan.richard...@gmail.com>
> >> > > > > > >>> > > > > > > >>> wrote:
> >> > > > > > >>> > > > > > > >>>
> >> > > > > > >>> > > > > > > >>>> Hi all,
> >> > > > > > >>> > > > > > > >>>>
> >> > > > > > >>> > > > > > > >>>> I've had just a few thoughts regarding the
> >> > > > > forwarding
> >> > > > > > >>> of <key,
> >> > > > > > >>> > > > > > > >>>> change<old_value, new_value>>. As Matthias
> >> already
> >> > > > > > >>> mentioned, there
> >> > > > > > >>> > > > > > > >> are two
> >> > > > > > >>> > > > > > > >>>> separate priorities by which we can judge
> >> this KIP:
> >> > > > > > >>> > > > > > > >>>>
> >> > > > > > >>> > > > > > > >>>> 1. A optimization perspective: In this
> >> case, the
> >> > > > > user
> >> > > > > > >>> would prefer
> >> > > > > > >>> > > > > > the
> >> > > > > > >>> > > > > > > >>>> impact of this KIP to be as minimal as
> >> possible. By
> >> > > > > > >>> such logic, if
> >> > > > > > >>> > > > > > > >>>> stateless operations are performed twice,
> >> that could
> >> > > > > > >>> prove
> >> > > > > > >>> > > > > > > >> unacceptable for
> >> > > > > > >>> > > > > > > >>>> them. (since operations can prove
> >> expensive)
> >> > > > > > >>> > > > > > > >>>>
> >> > > > > > >>> > > > > > > >>>> 2. Semantics correctness perspective:
> >> Unlike the
> >> > > > > > >>> optimization
> >> > > > > > >>> > > > > > > >> approach, we
> >> > > > > > >>> > > > > > > >>>> are more concerned with all KTable
> >> operations
> >> > > > > obeying
> >> > > > > > >>> the same
> >> > > > > > >>> > > > > > emission
> >> > > > > > >>> > > > > > > >>>> policy. i.e. emit on change. In this case,
> >> a
> >> > > > > > >>> discrepancy would not
> >> > > > > > >>> > > > > > be
> >> > > > > > >>> > > > > > > >>>> tolerated, even though an extra
> >> performance cost
> >> > > > > will
> >> > > > > > >>> be incurred.
> >> > > > > > >>> > > > > > > >>>> Therefore, we will follow Matthias's
> >> approach, and
> >> > > > > then
> >> > > > > > >>> perform the
> >> > > > > > >>> > > > > > > >>>> operation once on the old value, and once
> >> on the
> >> > > > > new.
> >> > > > > > >>> > > > > > > >>>>
> >> > > > > > >>> > > > > > > >>>> The issue here I think is more black and
> >> white than
> >> > > > > in
> >> > > > > > >>> between. The
> >> > > > > > >>> > > > > > > >> second
> >> > > > > > >>> > > > > > > >>>> option in particular would be favorable
> >> for users
> >> > > > > with
> >> > > > > > >>> inexpensive
> >> > > > > > >>> > > > > > > >>>> stateless operations, while for the former
> >> option,
> >> > > > > we
> >> > > > > > >>> are probably
> >> > > > > > >>> > > > > > > >> dealing
> >> > > > > > >>> > > > > > > >>>> with more expensive ones. So the simplest
> >> solution
> >> > > > > is
> >> > > > > > >>> probably to
> >> > > > > > >>> > > > > > > >> allow the
> >> > > > > > >>> > > > > > > >>>> user to choose one of the behaviors, and
> >> have a
> >> > > > > config
> >> > > > > > >>> which can
> >> > > > > > >>> > > > > > > >> switch in
> >> > > > > > >>> > > > > > > >>>> between them.
> >> > > > > > >>> > > > > > > >>>>
> >> > > > > > >>> > > > > > > >>>> Its the simplest compromise I can come up
> >> with at
> >> > > > > the
> >> > > > > > >>> moment, but if
> >> > > > > > >>> > > > > > > >> you
> >> > > > > > >>> > > > > > > >>>> think you have a better plan which could
> >> better
> >> > > > > balance
> >> > > > > > >>> tradeoffs.
> >> > > > > > >>> > > > > > Then
> >> > > > > > >>> > > > > > > >>>> please let us know. :)
> >> > > > > > >>> > > > > > > >>>>
> >> > > > > > >>> > > > > > > >>>> Best,
> >> > > > > > >>> > > > > > > >>>> Richard
> >> > > > > > >>> > > > > > > >>>>
> >> > > > > > >>> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John
> >> Roesler <
> >> > > > > > >>> vvcep...@apache.org>
> >> > > > > > >>> > > > > > > >> wrote:
> >> > > > > > >>> > > > > > > >>>>
> >> > > > > > >>> > > > > > > >>>>> Hi all,
> >> > > > > > >>> > > > > > > >>>>>
> >> > > > > > >>> > > > > > > >>>>> Thanks for the thoughtful comments!
> >> > > > > > >>> > > > > > > >>>>>
> >> > > > > > >>> > > > > > > >>>>> I need more time to reflect on your
> >> thoughts, but
> >> > > > > just
> >> > > > > > >>> wanted to
> >> > > > > > >>> > > > > > offer
> >> > > > > > >>> > > > > > > >>>>> a quick clarification about equals().
> >> > > > > > >>> > > > > > > >>>>>
> >> > > > > > >>> > > > > > > >>>>> I only meant that we can't be sure if a
> >> class's
> >> > > > > > >>> equals()
> >> > > > > > >>> > > > > > > >> implementation
> >> > > > > > >>> > > > > > > >>>>> returns true for two semantically
> >> identical
> >> > > > > instances.
> >> > > > > > >>> I.e., if a
> >> > > > > > >>> > > > > > > >> class
> >> > > > > > >>> > > > > > > >>>>> doesn't
> >> > > > > > >>> > > > > > > >>>>> override the default equals()
> >> implementation, then
> >> > > > > we
> >> > > > > > >>> would see
> >> > > > > > >>> > > > > > > >> behavior
> >> > > > > > >>> > > > > > > >>>>> like:
> >> > > > > > >>> > > > > > > >>>>>
> >> > > > > > >>> > > > > > > >>>>> new MyPair("A", 1).equals(new MyPair("A",
> >> 1))
> >> > > > > returns
> >> > > > > > >>> false
> >> > > > > > >>> > > > > > > >>>>>
> >> > > > > > >>> > > > > > > >>>>> In that case, I would still like to catch
> >> no-op
> >> > > > > > >>> updates by
> >> > > > > > >>> > > > > > comparing
> >> > > > > > >>> > > > > > > >> the
> >> > > > > > >>> > > > > > > >>>>> serialized form of the records when we
> >> happen to
> >> > > > > have
> >> > > > > > >>> it serialized
> >> > > > > > >>> > > > > > > >> anyway
> >> > > > > > >>> > > > > > > >>>>> (such as when the operation is stateful,
> >> or when
> >> > > > > we're
> >> > > > > > >>> sending to a
> >> > > > > > >>> > > > > > > >>>>> repartition topic and we have both the
> >> "new" and
> >> > > > > "old"
> >> > > > > > >>> value from
> >> > > > > > >>> > > > > > > >>>>> upstream).
> >> > > > > > >>> > > > > > > >>>>>
> >> > > > > > >>> > > > > > > >>>>> I didn't mean to suggest we'd try to use
> >> > > > > reflection to
> >> > > > > > >>> detect
> >> > > > > > >>> > > > > > whether
> >> > > > > > >>> > > > > > > >>>>> equals
> >> > > > > > >>> > > > > > > >>>>> is implemented, although that is a neat
> >> trick. I
> >> > > > > was
> >> > > > > > >>> thinking more
> >> > > > > > >>> > > > > > of
> >> > > > > > >>> > > > > > > >> a
> >> > > > > > >>> > > > > > > >>>>> belt-and-suspenders algorithm where we do
> >> the check
> >> > > > > > >>> for no-ops
> >> > > > > > >>> > > > > > based
> >> > > > > > >>> > > > > > > >> on
> >> > > > > > >>> > > > > > > >>>>> equals() and then _also_ check the
> >> serialized bytes
> >> > > > > > >>> for equality.
> >> > > > > > >>> > > > > > > >>>>>
> >> > > > > > >>> > > > > > > >>>>> Thanks,
> >> > > > > > >>> > > > > > > >>>>> -John
> >> > > > > > >>> > > > > > > >>>>>
> >> > > > > > >>> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu
> >> wrote:
> >> > > > > > >>> > > > > > > >>>>>> Thanks for the comments, Matthias.
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > > > >>>>>> w.r.t. requirement of an `equals()`
> >> > > > > implementation,
> >> > > > > > >>> each template
> >> > > > > > >>> > > > > > > >> type
> >> > > > > > >>> > > > > > > >>>>>> would have an equals() method. We can
> >> use the
> >> > > > > > >>> following code to
> >> > > > > > >>> > > > > > know
> >> > > > > > >>> > > > > > > >>>>>> whether it is provided by JVM or
> >> provided by user.
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > > > >>>>>> boolean customEquals = false;
> >> > > > > > >>> > > > > > > >>>>>> try {
> >> > > > > > >>> > > > > > > >>>>>>     Class cls =
> >> > > > > value.getClass().getMethod("equals",
> >> > > > > > >>> > > > > > > >>>>>> Object.class).getDeclaringClass();
> >> > > > > > >>> > > > > > > >>>>>>     if (!Object.class.equals(cls)) {
> >> > > > > > >>> > > > > > > >>>>>>         customEquals = true;
> >> > > > > > >>> > > > > > > >>>>>>     }
> >> > > > > > >>> > > > > > > >>>>>> } catch (NoSuchMethodException nsme) {
> >> > > > > > >>> > > > > > > >>>>>>     // equals is always defined, this
> >> wouldn't hit
> >> > > > > > >>> > > > > > > >>>>>> }
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > > > >>>>>> The next question is: what if the user
> >> doesn't
> >> > > > > > >>> provide equals()
> >> > > > > > >>> > > > > > > >> method ?
> >> > > > > > >>> > > > > > > >>>>>> Would we automatically fall back to
> >> > > > > emit-on-update ?
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > > > >>>>>> Cheers
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM Matthias
> >> J. Sax <
> >> > > > > > >>> mj...@apache.org>
> >> > > > > > >>> > > > > > > >>>>> wrote:
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > > > > First a high level comment:
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > Overall, I would like to make one step back,
> >> and make
> >> > > > > sure
> >> > > > > > >>> we are
> >> > > > > > >>> > > > > > > > discussion on the same level. Originally, I
> >> understood
> >> > > > > > >>> this KIP
> >> > > > > > >>> > > > > > > >>> as a
> >> > > > > > >>> > > > > > > > proposed change of _semantics_, however,
> >> given the
> >> > > > > latest
> >> > > > > > >>> > > > > > > >>> discussion
> >> > > > > > >>> > > > > > > > it seems it's actually not -- it's more an
> >> > > > > _optimization_
> >> > > > > > >>> > > > > > > >>> proposal.
> >> > > > > > >>> > > > > > > > Hence, we only need to make sure that this
> >> optimization
> >> > > > > > >>> does not
> >> > > > > > >>> > > > > > > >>> break
> >> > > > > > >>> > > > > > > > existing semantics. It this the right way to
> >> think
> >> > > > > about
> >> > > > > > >>> it?
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > If yes, than it might actually be ok to have
> >> different
> >> > > > > > >>> behavior
> >> > > > > > >>> > > > > > > > depending if there is a materialized KTable
> >> or not. So
> >> > > > > > >>> far, we
> >> > > > > > >>> > > > > > > >>> never
> >> > > > > > >>> > > > > > > > defined a public contract about our emit
> >> strategy and
> >> > > > > it
> >> > > > > > >>> seems
> >> > > > > > >>> > > > > > > >>> this
> >> > > > > > >>> > > > > > > > KIP does not define one either.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > Hence, I don't have as strong of an opinion
> >> about
> >> > > > > sending
> >> > > > > > >>> > > > > > > >>> oldValues
> >> > > > > > >>> > > > > > > > for example any longer. I guess the question
> >> is really,
> >> > > > > > >>> what can
> >> > > > > > >>> > > > > > > >>> we
> >> > > > > > >>> > > > > > > > implement in a reasonable way.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > Other comments:
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > @Richard:
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > Can you please add the KIP to the KIP
> >> overview table:
> >> > > > > It's
> >> > > > > > >>> missing
> >> > > > > > >>> > > > > > > > (
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > > > >>>
> >> > > > > > >>> > > > > >
> >> > > > > > >>>
> >> > > > >
> >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro
> >> > > > > > >>> > > > > > > > posals).
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > @Bruno:
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > You mentioned caching. I think it's irrelevant
> >> > > > > > >>> (orthogonal) and
> >> > > > > > >>> > > > > > > >>> we can
> >> > > > > > >>> > > > > > > > discuss this KIP without considering it.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > @John:
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >>>>>>>>> Even in the source table, we forward
> >> the
> >> > > > > updated
> >> > > > > > >>> record with
> >> > > > > > >>> > > > > > the
> >> > > > > > >>> > > > > > > >>>>>>>>> higher of the two timestamps. So the
> >> example is
> >> > > > > > >>> more like:
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > That is not correct. Currently, we forward
> >> with the
> >> > > > > smaller
> >> > > > > > >>> > > > > > > > out-of-order timestamp (changing the
> >> timestamp would
> >> > > > > > >>> corrupt the
> >> > > > > > >>> > > > > > > >>> data
> >> > > > > > >>> > > > > > > > -- we don't know, because we don't check, if
> >> the value
> >> > > > > is
> >> > > > > > >>> the
> >> > > > > > >>> > > > > > > >>> same
> >> > > > > > >>> > > > > > > >>>>>> or
> >> > > > > > >>> > > > > > > > a different one, hence, we must emit the
> >> out-of-order
> >> > > > > > >>> record
> >> > > > > > >>> > > > > > > >>> as-is).
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > If we start to do emit-on-change, we also
> >> need to emit
> >> > > > > a
> >> > > > > > >>> new
> >> > > > > > >>> > > > > > > >>> record if
> >> > > > > > >>> > > > > > > > the timestamp changes due to out-of-order
> >> data, hence,
> >> > > > > we
> >> > > > > > >>> would
> >> > > > > > >>> > > > > > > >>> still
> >> > > > > > >>> > > > > > > > need to emit <K,V,T1> because that give us
> >> correct
> >> > > > > > >>> semantics:
> >> > > > > > >>> > > > > > > >>> assume
> >> > > > > > >>> > > > > > > > you have a filter() and afterward use the
> >> filter
> >> > > > > KTable in
> >> > > > > > >>> a
> >> > > > > > >>> > > > > > > > stream-table join -- the lower T1 timestamp
> >> must be
> >> > > > > > >>> propagated to
> >> > > > > > >>> > > > > > > >>> the
> >> > > > > > >>> > > > > > > > filtered KTable to ensure that that the
> >> stream-table
> >> > > > > join
> >> > > > > > >>> compute
> >> > > > > > >>> > > > > > > >>> the
> >> > > > > > >>> > > > > > > > correct result.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > Your point about requiring an `equals()`
> >> > > > > implementation is
> >> > > > > > >>> > > > > > > >>> actually a
> >> > > > > > >>> > > > > > > > quite interesting one and boils down to my
> >> statement
> >> > > > > from
> >> > > > > > >>> above
> >> > > > > > >>> > > > > > > >>> about
> >> > > > > > >>> > > > > > > > "what can we actually implement". What I don't
> >> > > > > understand
> >> > > > > > >>> is:
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >>>>>>>>> This way, we still don't have to rely
> >> on the
> >> > > > > > >>> existence of an
> >> > > > > > >>> > > > > > > >>>>>>>>> equals() method, but if it is there,
> >> we can
> >> > > > > > >>> benefit from it.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > Your bullet point (2) says it uses `equals()`
> >> --
> >> > > > > hence, it
> >> > > > > > >>> seems
> >> > > > > > >>> > > > > > > >>> we
> >> > > > > > >>> > > > > > > > actually to rely on it? Also, how can we
> >> detect if
> >> > > > > there
> >> > > > > > >>> is an
> >> > > > > > >>> > > > > > > > `equals()` method to do the comparison? Would
> >> be fail
> >> > > > > if
> >> > > > > > >>> we don't
> >> > > > > > >>> > > > > > > >>> have
> >> > > > > > >>> > > > > > > > `equals()` nor corresponding serializes to do
> >> the
> >> > > > > > >>> comparison?
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we
> >> absolutely need
> >> > > > > > >>> metrics and
> >> > > > > > >>> > > > > > > >>> logs if
> >> > > > > > >>> > > > > > > >>>>>>>>> we're going to drop any records. And,
> >> yes, we
> >> > > > > > >>> should propose
> >> > > > > > >>> > > > > > > >>>>>>>>> metrics and logs that are similar to
> >> the
> >> > > > > existing
> >> > > > > > >>> ones when we
> >> > > > > > >>> > > > > > > >>> drop
> >> > > > > > >>> > > > > > > >>>>>>>>> records for other reasons.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > I am not sure about this point. In fact, we
> >> have
> >> > > > > already
> >> > > > > > >>> some
> >> > > > > > >>> > > > > > > >>> no-ops
> >> > > > > > >>> > > > > > > > in Kafka Streams in our join-operators and
> >> don't report
> >> > > > > > >>> any of
> >> > > > > > >>> > > > > > > >>> those
> >> > > > > > >>> > > > > > > > either. Emit-on-change is operator semantics
> >> and I
> >> > > > > don't
> >> > > > > > >>> see why
> >> > > > > > >>> > > > > > > >>> we
> >> > > > > > >>> > > > > > > > would need to have a metric for it? It seems
> >> to be
> >> > > > > quite
> >> > > > > > >>> different
> >> > > > > > >>> > > > > > > > compared to dropping late or malformed
> >> records.
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > -Matthias
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote:
> >> > > > > > >>> > > > > > > >>>>>>>>> Thanks John for your thoughtful
> >> reply. Some
> >> > > > > > >>> comments inline.
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 -0600,
> >> John Roesler
> >> > > > > > >>> wrote:
> >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This
> >> email was
> >> > > > > sent
> >> > > > > > >>> from outside
> >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or
> >> attachments
> >> > > > > > >>> unless you
> >> > > > > > >>> > > > > > expected
> >> > > > > > >>> > > > > > > >>>>>>>>>> them.
> >> ________________________________
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Tommy,
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> Thanks for the context. I can see the
> >> > > > > attraction
> >> > > > > > >>> of
> >> > > > > > >>> > > > > > considering
> >> > > > > > >>> > > > > > > >>>>>>>>>> these use cases together.
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> To answer your question, if a part
> >> of the
> >> > > > > record
> >> > > > > > >>> is not
> >> > > > > > >>> > > > > > > >>> relevant
> >> > > > > > >>> > > > > > > >>>>>>>>>> to downstream consumers, I was
> >> thinking you
> >> > > > > could
> >> > > > > > >>> just use a
> >> > > > > > >>> > > > > > > >>>>>>>>>> mapValue to remove it.
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a
> >> join between
> >> > > > > two
> >> > > > > > >>> tables.
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> employeeInfo.join( employeePayroll,
> >> (info,
> >> > > > > > >>> payroll) -> new
> >> > > > > > >>> > > > > > > >>>>>>>>>> Result(info.name(),
> >> payroll.salary()) )
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> We only care about one attribute
> >> from the Info
> >> > > > > > >>> table (name),
> >> > > > > > >>> > > > > > > >>> and
> >> > > > > > >>> > > > > > > >>>>>>>>>> one from the Payroll table (salary),
> >> and these
> >> > > > > > >>> attributes
> >> > > > > > >>> > > > > > > >>> change
> >> > > > > > >>> > > > > > > >>>>>>>>>> rarely. On the other hand, there
> >> might be many
> >> > > > > > >>> other
> >> > > > > > >>> > > > > > attributes
> >> > > > > > >>> > > > > > > >>>>>>>>>> that change frequently of these
> >> tables. We can
> >> > > > > > >>> avoid
> >> > > > > > >>> > > > > > triggering
> >> > > > > > >>> > > > > > > >>>>>>>>>> the join unnecessarily by mapping
> >> the input
> >> > > > > > >>> tables to drop the
> >> > > > > > >>> > > > > > > >>>>>>>>>> unnecessary information before the
> >> join:
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> names = employeeInfo.mapValues(info
> >> ->
> >> > > > > info.name())
> >> > > > > > >>> salaries
> >> > > > > > >>> > > > > > =
> >> > > > > > >>> > > > > > > >>>>>>>>>> employeePayroll.mapValues(payroll ->
> >> > > > > > >>> payroll.salary())
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> names.join( salaries, (name, salary)
> >> -> new
> >> > > > > > >>> Result(name,
> >> > > > > > >>> > > > > > > >>> salary)
> >> > > > > > >>> > > > > > > >>>>>>>>>> )
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> Ahh yes I see. This works, but in the
> >> case
> >> > > > > where
> >> > > > > > >>> you're using
> >> > > > > > >>> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it
> >> seems like
> >> > > > > this
> >> > > > > > >>> approach
> >> > > > > > >>> > > > > > could
> >> > > > > > >>> > > > > > > >>>>>>>>> lead to a proliferation of "skinny"
> >> record
> >> > > > > types
> >> > > > > > >>> that just drop
> >> > > > > > >>> > > > > > > >>>>>>>>> various fields.
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> Especially if we take Matthias's
> >> idea to drop
> >> > > > > > >>> non-changes even
> >> > > > > > >>> > > > > > > >>>>>>>>>> for stateless operations, this would
> >> be quite
> >> > > > > > >>> efficient and is
> >> > > > > > >>> > > > > > > >>>>>>>>>> also a very straightforward
> >> optimization to
> >> > > > > > >>> understand once
> >> > > > > > >>> > > > > > you
> >> > > > > > >>> > > > > > > >>>>>>>>>> know that Streams provides
> >> emit-on-change.
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> From the context that you provided,
> >> it seems
> >> > > > > like
> >> > > > > > >>> a slightly
> >> > > > > > >>> > > > > > > >>>>>>>>>> different situation, though. Reading
> >> between
> >> > > > > the
> >> > > > > > >>> lines a
> >> > > > > > >>> > > > > > > >>> little,
> >> > > > > > >>> > > > > > > >>>>>>>>>> it sounds like: in contrast to the
> >> example
> >> > > > > above,
> >> > > > > > >>> in which we
> >> > > > > > >>> > > > > > > >>> are
> >> > > > > > >>> > > > > > > >>>>>>>>>> filtering out extra _data_, you have
> >> some
> >> > > > > extra
> >> > > > > > >>> _metadata_
> >> > > > > > >>> > > > > > that
> >> > > > > > >>> > > > > > > >>>>>>>>>> you still wish to pass down with the
> >> data when
> >> > > > > > >>> there is a
> >> > > > > > >>> > > > > > > >>> "real"
> >> > > > > > >>> > > > > > > >>>>>>>>>> update, but you don't want the
> >> metadata
> >> > > > > itself to
> >> > > > > > >>> cause an
> >> > > > > > >>> > > > > > > >>>>>>>>>> update.
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> Despite my lack of clarity, yes
> >> you've got it
> >> > > > > > >>> right ;) This
> >> > > > > > >>> > > > > > > >>>>>>>>> particular processor is the first
> >> stop for this
> >> > > > > > >>> data after
> >> > > > > > >>> > > > > > > >>> coming
> >> > > > > > >>> > > > > > > >>>>>>>>> in from external users, who often
> >> simply post
> >> > > > > the
> >> > > > > > >>> same content
> >> > > > > > >>> > > > > > > >>> each
> >> > > > > > >>> > > > > > > >>>>>>>>> time and we're trying to shield
> >> downstream
> >> > > > > > >>> consumers from
> >> > > > > > >>> > > > > > > >>>>>>>>> unnecessary churn.
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> It does seem handy to be able to
> >> plug in a
> >> > > > > custom
> >> > > > > > >>> > > > > > > >>> ChangeDetector
> >> > > > > > >>> > > > > > > >>>>>>>>>> for this purpose, but I worry about
> >> the API
> >> > > > > > >>> complexity. Maybe
> >> > > > > > >>> > > > > > > >>> you
> >> > > > > > >>> > > > > > > >>>>>>>>>> can help think though how to provide
> >> the same
> >> > > > > > >>> benefit while
> >> > > > > > >>> > > > > > > >>>>>>>>>> limiting user-facing complexity.
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> Here's some extra context to
> >> consider:
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> We currently don't make any extra
> >> requirements
> >> > > > > > >>> about the
> >> > > > > > >>> > > > > > nature
> >> > > > > > >>> > > > > > > >>>>>>>>>> of data that you can use in Streams.
> >> For
> >> > > > > example,
> >> > > > > > >>> you don't
> >> > > > > > >>> > > > > > > >>> have
> >> > > > > > >>> > > > > > > >>>>>>>>>> to implement hashCode and equals, or
> >> > > > > compareTo,
> >> > > > > > >>> etc. With the
> >> > > > > > >>> > > > > > > >>>>>>>>>> current proposal, we can do an
> >> airtight
> >> > > > > > >>> comparison based only
> >> > > > > > >>> > > > > > > >>> on
> >> > > > > > >>> > > > > > > >>>>>>>>>> the serialized form of the values,
> >> and we
> >> > > > > > >>> actually don't have
> >> > > > > > >>> > > > > > > >>> to
> >> > > > > > >>> > > > > > > >>>>>>>>>> deserialize the "prior" value at all
> >> for a
> >> > > > > large
> >> > > > > > >>> number of
> >> > > > > > >>> > > > > > > >>>>>>>>>> operations. Admitedly, if we extend
> >> the
> >> > > > > proposal
> >> > > > > > >>> to include
> >> > > > > > >>> > > > > > > >>> no-op
> >> > > > > > >>> > > > > > > >>>>>>>>>> detection for stateless operations,
> >> we'd
> >> > > > > probably
> >> > > > > > >>> need to rely
> >> > > > > > >>> > > > > > > >>> on
> >> > > > > > >>> > > > > > > >>>>>>>>>> equals() for no-op checking,
> >> otherwise we'd
> >> > > > > wind
> >> > > > > > >>> up requiring
> >> > > > > > >>> > > > > > > >>>>>>>>>> serdes for stateless operations as
> >> well.
> >> > > > > > >>> Actually, I'd
> >> > > > > > >>> > > > > > probably
> >> > > > > > >>> > > > > > > >>>>>>>>>> argue for doing exactly that:
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> 1. In stateful operations, drop if
> >> the
> >> > > > > serialized
> >> > > > > > >>> byte[]s are
> >> > > > > > >>> > > > > > > >>> the
> >> > > > > > >>> > > > > > > >>>>>>>>>> same. After deserializing, also drop
> >> if the
> >> > > > > > >>> objects are equal
> >> > > > > > >>> > > > > > > >>>>>>>>>> according to Object#equals().
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> 2. In stateless operations, compare
> >> the "new"
> >> > > > > and
> >> > > > > > >>> "old" values
> >> > > > > > >>> > > > > > > >>>>>>>>>> (if "old" is available) based on
> >> > > > > Object#equals().
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> 3. As a final optimization, after
> >> serializing
> >> > > > > and
> >> > > > > > >>> before
> >> > > > > > >>> > > > > > > >>> sending
> >> > > > > > >>> > > > > > > >>>>>>>>>> repartition records, compare the
> >> serialized
> >> > > > > data
> >> > > > > > >>> and drop
> >> > > > > > >>> > > > > > > >>>>>>>>>> no-ops.
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> This way, we still don't have to
> >> rely on the
> >> > > > > > >>> existence of an
> >> > > > > > >>> > > > > > > >>>>>>>>>> equals() method, but if it is there,
> >> we can
> >> > > > > > >>> benefit from it.
> >> > > > > > >>> > > > > > > >>>>>>>>>> Also, we don't require a serde in
> >> any new
> >> > > > > > >>> situations, but we
> >> > > > > > >>> > > > > > > >>> can
> >> > > > > > >>> > > > > > > >>>>>>>>>> still leverage it when it is
> >> available.
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> For clarity, in my example above,
> >> even if the
> >> > > > > > >>> employeeInfo and
> >> > > > > > >>> > > > > > > >>>>>>>>>> employeePayroll and Result records
> >> all have
> >> > > > > > >>> serdes, we need
> >> > > > > > >>> > > > > > the
> >> > > > > > >>> > > > > > > >>>>>>>>>> "name" field (presumably String) and
> >> the
> >> > > > > "salary"
> >> > > > > > >>> field
> >> > > > > > >>> > > > > > > >>>>>>>>>> (presumable a Double) to have serdes
> >> as well
> >> > > > > in
> >> > > > > > >>> the naive
> >> > > > > > >>> > > > > > > >>>>>>>>>> implementation. But if we can
> >> leverage
> >> > > > > equals(),
> >> > > > > > >>> then the
> >> > > > > > >>> > > > > > > >>> "right
> >> > > > > > >>> > > > > > > >>>>>>>>>> thing" happens automatically.
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> I still don't totally follow why the
> >> individual
> >> > > > > > >>> components
> >> > > > > > >>> > > > > > > >>> (name,
> >> > > > > > >>> > > > > > > >>>>>>>>> salary) would have to have serdes
> >> here. If
> >> > > > > Result
> >> > > > > > >>> has one, we
> >> > > > > > >>> > > > > > > >>>>>>>>> compare bytes, and if Result
> >> additionally has
> >> > > > > an
> >> > > > > > >>> equals()
> >> > > > > > >>> > > > > > method
> >> > > > > > >>> > > > > > > >>>>>>>>> (which presumably includes equals
> >> comparisons
> >> > > > > on
> >> > > > > > >>> the
> >> > > > > > >>> > > > > > constituent
> >> > > > > > >>> > > > > > > >>>>>>>>> fields), have we not covered our
> >> bases?
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> This dovetails in with my primary UX
> >> concern;
> >> > > > > > >>> where would the
> >> > > > > > >>> > > > > > > >>>>>>>>>> ChangeDetector actually be
> >> registered? None of
> >> > > > > > >>> the operators
> >> > > > > > >>> > > > > > in
> >> > > > > > >>> > > > > > > >>>>>>>>>> my example have names or topics or
> >> any other
> >> > > > > > >>> identifiable
> >> > > > > > >>> > > > > > > >>>>>>>>>> characteristic that could be passed
> >> to a
> >> > > > > > >>> ChangeDetector class
> >> > > > > > >>> > > > > > > >>>>>>>>>> registered via config. You could say
> >> that we
> >> > > > > make
> >> > > > > > >>> > > > > > > >>> ChangeDetector
> >> > > > > > >>> > > > > > > >>>>>>>>>> an optional parameter to every
> >> operation in
> >> > > > > > >>> Streams, but this
> >> > > > > > >>> > > > > > > >>>>>>>>>> seems to carry quite a bit of mental
> >> burden
> >> > > > > with
> >> > > > > > >>> it. People
> >> > > > > > >>> > > > > > > >>> will
> >> > > > > > >>> > > > > > > >>>>>>>>>> wonder what it's for and whether or
> >> not they
> >> > > > > > >>> should be using
> >> > > > > > >>> > > > > > > >>> it.
> >> > > > > > >>> > > > > > > >>>>>>>>>> There would almost certainly be a
> >> > > > > misconception
> >> > > > > > >>> that it's
> >> > > > > > >>> > > > > > > >>>>>>>>>> preferable to implement it always,
> >> which
> >> > > > > would be
> >> > > > > > >>> unfortunate.
> >> > > > > > >>> > > > > > > >>>>>>>>>> Plus, to actually implment metadata
> >> flowing
> >> > > > > > >>> through the
> >> > > > > > >>> > > > > > > >>> topology
> >> > > > > > >>> > > > > > > >>>>>>>>>> as in your use case, you'd have to
> >> do two
> >> > > > > things:
> >> > > > > > >>> 1. make sure
> >> > > > > > >>> > > > > > > >>>>>>>>>> that all operations actually
> >> preserve the
> >> > > > > > >>> metadata alongside
> >> > > > > > >>> > > > > > > >>> the
> >> > > > > > >>> > > > > > > >>>>>>>>>> data (e.g., don't accidentally add a
> >> mapValues
> >> > > > > > >>> like I did, or
> >> > > > > > >>> > > > > > > >>> you
> >> > > > > > >>> > > > > > > >>>>>>>>>> drop the metadata). 2. implement a
> >> > > > > ChangeDetector
> >> > > > > > >>> for every
> >> > > > > > >>> > > > > > > >>>>>>>>>> single operation in the topology, or
> >> you don't
> >> > > > > > >>> get the benefit
> >> > > > > > >>> > > > > > > >>> of
> >> > > > > > >>> > > > > > > >>>>>>>>>> dropping non-changes internally 2b.
> >> > > > > > >>> Alternatively, you could
> >> > > > > > >>> > > > > > > >>> just
> >> > > > > > >>> > > > > > > >>>>>>>>>> add the ChangeDetector to one
> >> operation toward
> >> > > > > > >>> the end of the
> >> > > > > > >>> > > > > > > >>>>>>>>>> topology. This would not drop
> >> redundant
> >> > > > > > >>> computation
> >> > > > > > >>> > > > > > internally,
> >> > > > > > >>> > > > > > > >>>>>>>>>> but only drop redundant _outputs_.
> >> But this is
> >> > > > > > >>> just about the
> >> > > > > > >>> > > > > > > >>>>>>>>>> same as your current solution.
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> I definitely see your point regarding
> >> > > > > > >>> configuration. I was
> >> > > > > > >>> > > > > > > >>>>>>>>> originally thinking about this when
> >> the
> >> > > > > > >>> deduplication was going
> >> > > > > > >>> > > > > > > >>> to
> >> > > > > > >>> > > > > > > >>>>>>>>> be opt-in, and it seemed very natural
> >> to say
> >> > > > > > >>> something like:
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> employeeInfo.join(employeePayroll,
> >> (info,
> >> > > > > payroll)
> >> > > > > > >>> -> new
> >> > > > > > >>> > > > > > > >>>>>>>>> Result(info.name(),
> >> payroll.salary()))
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> .suppress(duplicatesAccordingTo(someChangeDetector))
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> Alternatively you can imagine a
> >> similar method
> >> > > > > > >>> being on
> >> > > > > > >>> > > > > > > >>>>>>>>> Materialized, though obviously this
> >> makes less
> >> > > > > > >>> sense if we
> >> > > > > > >>> > > > > > don't
> >> > > > > > >>> > > > > > > >>>>>>>>> want to require materialization. If
> >> we're now
> >> > > > > > >>> talking about
> >> > > > > > >>> > > > > > > >>>>>>>>> changing the default behavior and not
> >> having
> >> > > > > any
> >> > > > > > >>> configuration
> >> > > > > > >>> > > > > > > >>>>>>>>> options, it's harder to find a place
> >> for this.
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> A final thought; if it really is a
> >> metadata
> >> > > > > > >>> question, can we
> >> > > > > > >>> > > > > > > >>> just
> >> > > > > > >>> > > > > > > >>>>>>>>>> plan to finish up the support for
> >> headers in
> >> > > > > > >>> Streams? I.e.,
> >> > > > > > >>> > > > > > > >>> give
> >> > > > > > >>> > > > > > > >>>>>>>>>> you a way to control the way that
> >> headers flow
> >> > > > > > >>> through the
> >> > > > > > >>> > > > > > > >>>>>>>>>> topology? Then, we could treat
> >> headers the
> >> > > > > same
> >> > > > > > >>> way we treat
> >> > > > > > >>> > > > > > > >>>>>>>>>> timestamps in the no-op checking...
> >> We
> >> > > > > completely
> >> > > > > > >>> ignore them
> >> > > > > > >>> > > > > > > >>>>>>>>>> for the sake of comparison. Thus,
> >> neither the
> >> > > > > > >>> timestamp nor
> >> > > > > > >>> > > > > > the
> >> > > > > > >>> > > > > > > >>>>>>>>>> headers would get updated in
> >> internal state
> >> > > > > or in
> >> > > > > > >>> downstream
> >> > > > > > >>> > > > > > > >>>>>>>>>> views as long as the value itself
> >> doesn't
> >> > > > > change.
> >> > > > > > >>> This seems
> >> > > > > > >>> > > > > > to
> >> > > > > > >>> > > > > > > >>>>>>>>>> give us a way to support your use
> >> case without
> >> > > > > > >>> adding to the
> >> > > > > > >>> > > > > > > >>>>>>>>>> mental overhead of using Streams for
> >> simple
> >> > > > > > >>> things.
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> Agree headers could be a decent fit
> >> for this
> >> > > > > > >>> particular case
> >> > > > > > >>> > > > > > > >>>>>>>>> because it's mostly metadata, though
> >> to be
> >> > > > > honest
> >> > > > > > >>> we haven't
> >> > > > > > >>> > > > > > > >>> looked
> >> > > > > > >>> > > > > > > >>>>>>>>> at headers much (mostly because, and
> >> to your
> >> > > > > > >>> point, support
> >> > > > > > >>> > > > > > > >>> seems
> >> > > > > > >>> > > > > > > >>>>>>>>> to be lacking). I feel like there
> >> would be
> >> > > > > other
> >> > > > > > >>> cases where
> >> > > > > > >>> > > > > > > >>> this
> >> > > > > > >>> > > > > > > >>>>>>>>> feature could be valuable, but I
> >> admit I can't
> >> > > > > > >>> come up with
> >> > > > > > >>> > > > > > > >>>>>>>>> anything right this second. Perhaps
> >> yuzhihong
> >> > > > > had
> >> > > > > > >>> an example in
> >> > > > > > >>> > > > > > > >>>>>>>>> mind?
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., simple things should be easy,
> >> and
> >> > > > > complex
> >> > > > > > >>> things should
> >> > > > > > >>> > > > > > > >>> be
> >> > > > > > >>> > > > > > > >>>>>>>>>> possible.
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> What are your thoughts? Thanks, -John
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19,
> >> Thomas Becker
> >> > > > > > >>> wrote:
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> Hi John, Can you describe how you'd
> >> use
> >> > > > > > >>> filtering/mapping to
> >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate records? To give some
> >> background
> >> > > > > on
> >> > > > > > >>> my suggestion
> >> > > > > > >>> > > > > > > >>> we
> >> > > > > > >>> > > > > > > >>>>>>>>>> currently have a small stream
> >> processor that
> >> > > > > > >>> exists solely to
> >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate, which we do using a
> >> process that
> >> > > > > I
> >> > > > > > >>> assume would
> >> > > > > > >>> > > > > > be
> >> > > > > > >>> > > > > > > >>>>>>>>>> similar to what would be done here
> >> (with a
> >> > > > > store
> >> > > > > > >>> of keys and
> >> > > > > > >>> > > > > > > >>> hash
> >> > > > > > >>> > > > > > > >>>>>>>>>> values). But the records we are
> >> deduplicating
> >> > > > > > >>> have some
> >> > > > > > >>> > > > > > > >>> metadata
> >> > > > > > >>> > > > > > > >>>>>>>>>> fields (such as timestamps of when
> >> the record
> >> > > > > was
> >> > > > > > >>> posted) that
> >> > > > > > >>> > > > > > > >>> we
> >> > > > > > >>> > > > > > > >>>>>>>>>> don't consider semantically
> >> meaningful for
> >> > > > > > >>> downstream
> >> > > > > > >>> > > > > > > >>> consumers,
> >> > > > > > >>> > > > > > > >>>>>>>>>> and therefore we also suppress
> >> updates that
> >> > > > > only
> >> > > > > > >>> touch those
> >> > > > > > >>> > > > > > > >>>>>>>>>> fields.
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> -Tommy
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30 -0600,
> >> John
> >> > > > > Roesler
> >> > > > > > >>> wrote:
> >> > > > > > >>> > > > > > > >>> [EXTERNAL
> >> > > > > > >>> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was
> >> sent from
> >> > > > > > >>> outside TiVo. DO
> >> > > > > > >>> > > > > > NOT
> >> > > > > > >>> > > > > > > >>>>>>>>>> CLICK any links or attachments
> >> unless you
> >> > > > > > >>> expected them.
> >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s an
> >> interesting
> >> > > > > > >>> idea. Can you
> >> > > > > > >>> > > > > > > >>> help
> >> > > > > > >>> > > > > > > >>>>>>>>>> think of a use case that isn’t also
> >> served by
> >> > > > > > >>> filtering or
> >> > > > > > >>> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for
> >> helping to
> >> > > > > design
> >> > > > > > >>> this feature!
> >> > > > > > >>> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at 18:56,
> >> > > > > > >>> yuzhih...@gmail.com
> >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com> wrote:
> >> I think
> >> > > > > this
> >> > > > > > >>> is good
> >> > > > > > >>> > > > > > > >>> idea. On
> >> > > > > > >>> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas
> >> Becker <
> >> > > > > > >>> > > > > > > >>> thomas.bec...@tivo.com
> >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com>>
> >> wrote: How
> >> > > > > do
> >> > > > > > >>> folks feel
> >> > > > > > >>> > > > > > > >>> about
> >> > > > > > >>> > > > > > > >>>>>>>>>> allowing the mechanism by which
> >> no-ops are
> >> > > > > > >>> detected to be
> >> > > > > > >>> > > > > > > >>>>>>>>>> pluggable? Meaning use something
> >> like a hash
> >> > > > > by
> >> > > > > > >>> default, but
> >> > > > > > >>> > > > > > > >>> you
> >> > > > > > >>> > > > > > > >>>>>>>>>> could optionally provide an
> >> implementation of
> >> > > > > > >>> something to use
> >> > > > > > >>> > > > > > > >>>>>>>>>> instead, like a ChangeDetector. This
> >> could be
> >> > > > > > >>> useful for
> >> > > > > > >>> > > > > > > >>> example
> >> > > > > > >>> > > > > > > >>>>>>>>>> to ignore changes to certain fields,
> >> which may
> >> > > > > > >>> not be relevant
> >> > > > > > >>> > > > > > > >>> to
> >> > > > > > >>> > > > > > > >>>>>>>>>> the operation being performed.
> >> > > > > > >>> > > > > > ________________________________
> >> > > > > > >>> > > > > > > >>>>>>>>>> From: John Roesler <
> >> vvcep...@apache.org
> >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>> Sent:
> >> Friday,
> >> > > > > > >>> January 31, 2020
> >> > > > > > >>> > > > > > > >>> 4:51
> >> > > > > > >>> > > > > > > >>>>>>>>>> PM To: dev@kafka.apache.org <mailto:
> >> > > > > > >>> dev@kafka.apache.org>
> >> > > > > > >>> > > > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto:
> >> > > > > > >>> dev@kafka.apache.org>> Subject:
> >> > > > > > >>> > > > > > > >>> Re:
> >> > > > > > >>> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change
> >> support for
> >> > > > > Kafka
> >> > > > > > >>> Streams
> >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This
> >> email was
> >> > > > > sent
> >> > > > > > >>> from outside
> >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or
> >> attachments
> >> > > > > > >>> unless you
> >> > > > > > >>> > > > > > expected
> >> > > > > > >>> > > > > > > >>>>>>>>>> them.
> >> ________________________________
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> Hello all, Sorry for my silence. It
> >> seems
> >> > > > > like we
> >> > > > > > >>> are getting
> >> > > > > > >>> > > > > > > >>>>>>>>>> close to consensus. Hopefully, we
> >> could move
> >> > > > > to a
> >> > > > > > >>> vote soon!
> >> > > > > > >>> > > > > > > >>> All
> >> > > > > > >>> > > > > > > >>>>>>>>>> of the reasoning from Matthias and
> >> Bruno
> >> > > > > around
> >> > > > > > >>> timestamp is
> >> > > > > > >>> > > > > > > >>>>>>>>>> compelling. I would be strongly in
> >> favor of
> >> > > > > > >>> stating a few
> >> > > > > > >>> > > > > > > >>> things
> >> > > > > > >>> > > > > > > >>>>>>>>>> very clearly in the KIP: 1. Streams
> >> will drop
> >> > > > > > >>> no-op updates
> >> > > > > > >>> > > > > > > >>> only
> >> > > > > > >>> > > > > > > >>>>>>>>>> for KTable operations. That is, we
> >> won't make
> >> > > > > any
> >> > > > > > >>> changes to
> >> > > > > > >>> > > > > > > >>>>>>>>>> KStream aggregations at the moment.
> >> It does
> >> > > > > seem
> >> > > > > > >>> like we can
> >> > > > > > >>> > > > > > > >>>>>>>>>> potentially revisit the time
> >> semantics of that
> >> > > > > > >>> operation in
> >> > > > > > >>> > > > > > the
> >> > > > > > >>> > > > > > > >>>>>>>>>> future, but we don't need to do it
> >> now. On the
> >> > > > > > >>> other hand, the
> >> > > > > > >>> > > > > > > >>>>>>>>>> proposed semantics for KTable
> >> timestamps
> >> > > > > (marking
> >> > > > > > >>> the
> >> > > > > > >>> > > > > > beginning
> >> > > > > > >>> > > > > > > >>>>>>>>>> of the validity of that record)
> >> makes sense to
> >> > > > > > >>> me. 2. Streams
> >> > > > > > >>> > > > > > > >>>>>>>>>> will only drop no-op updates for
> >> _stateful_
> >> > > > > > >>> KTable operations.
> >> > > > > > >>> > > > > > > >>> We
> >> > > > > > >>> > > > > > > >>>>>>>>>> don't want to add a hard guarantee
> >> that
> >> > > > > Streams
> >> > > > > > >>> will _never_
> >> > > > > > >>> > > > > > > >>>>>>>>>> emit a no-op table update because it
> >> would
> >> > > > > > >>> require adding
> >> > > > > > >>> > > > > > state
> >> > > > > > >>> > > > > > > >>>>>>>>>> to otherwise stateless operations.
> >> If someone
> >> > > > > is
> >> > > > > > >>> really
> >> > > > > > >>> > > > > > > >>> concerned
> >> > > > > > >>> > > > > > > >>>>>>>>>> about a particular stateless
> >> operation
> >> > > > > producing
> >> > > > > > >>> a lot of
> >> > > > > > >>> > > > > > no-op
> >> > > > > > >>> > > > > > > >>>>>>>>>> results, all they have to do is
> >> materialize
> >> > > > > it,
> >> > > > > > >>> and Streams
> >> > > > > > >>> > > > > > > >>> would
> >> > > > > > >>> > > > > > > >>>>>>>>>> automatically drop the no-ops.
> >> Additionally,
> >> > > > > I'm
> >> > > > > > >>> +1 on not
> >> > > > > > >>> > > > > > > >>> adding
> >> > > > > > >>> > > > > > > >>>>>>>>>> an opt-out at this time. Regarding
> >> the KIP
> >> > > > > > >>> itself, I would
> >> > > > > > >>> > > > > > > >>> clean
> >> > > > > > >>> > > > > > > >>>>>>>>>> it up a bit before calling for a
> >> vote. There
> >> > > > > is a
> >> > > > > > >>> lot of
> >> > > > > > >>> > > > > > > >>>>>>>>>> "discussion"-type language there,
> >> which is
> >> > > > > very
> >> > > > > > >>> natural to
> >> > > > > > >>> > > > > > > >>> read,
> >> > > > > > >>> > > > > > > >>>>>>>>>> but makes it a bit hard to see what
> >> _exactly_
> >> > > > > the
> >> > > > > > >>> kip is
> >> > > > > > >>> > > > > > > >>>>>>>>>> proposing. Richard, would you mind
> >> just making
> >> > > > > > >>> the "proposed
> >> > > > > > >>> > > > > > > >>>>>>>>>> behavior change" a simple and
> >> succinct list of
> >> > > > > > >>> bullet points?
> >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., please drop glue phrases like
> >> "there has
> >> > > > > > >>> been some
> >> > > > > > >>> > > > > > > >>>>>>>>>> discussion" or "possibly we could do
> >> X". For
> >> > > > > the
> >> > > > > > >>> final version
> >> > > > > > >>> > > > > > > >>> of
> >> > > > > > >>> > > > > > > >>>>>>>>>> the KIP, it should just say,
> >> "Streams will do
> >> > > > > X,
> >> > > > > > >>> Streams will
> >> > > > > > >>> > > > > > > >>> do
> >> > > > > > >>> > > > > > > >>>>>>>>>> Y". Feel free to add an elaboration
> >> section to
> >> > > > > > >>> explain more
> >> > > > > > >>> > > > > > > >>> about
> >> > > > > > >>> > > > > > > >>>>>>>>>> what X and Y mean, but we don't need
> >> to talk
> >> > > > > about
> >> > > > > > >>> > > > > > > >>> possibilities
> >> > > > > > >>> > > > > > > >>>>>>>>>> or alternatives except in the
> >> "rejected
> >> > > > > > >>> alternatives" section.
> >> > > > > > >>> > > > > > > >>>>>>>>>> Accordingly, can you also move the
> >> options you
> >> > > > > > >>> presented in
> >> > > > > > >>> > > > > > the
> >> > > > > > >>> > > > > > > >>>>>>>>>> intro to the "rejected alternatives"
> >> section
> >> > > > > and
> >> > > > > > >>> only mention
> >> > > > > > >>> > > > > > > >>> the
> >> > > > > > >>> > > > > > > >>>>>>>>>> final proposal itself? This just
> >> really helps
> >> > > > > > >>> reviewers to
> >> > > > > > >>> > > > > > know
> >> > > > > > >>> > > > > > > >>>>>>>>>> what they are voting for, and it
> >> helps
> >> > > > > everyone
> >> > > > > > >>> after the fact
> >> > > > > > >>> > > > > > > >>>>>>>>>> when they are trying to get clarity
> >> on what
> >> > > > > > >>> exactly the
> >> > > > > > >>> > > > > > > >>> proposal
> >> > > > > > >>> > > > > > > >>>>>>>>>> is, versus all the things it could
> >> have been.
> >> > > > > > >>> Thanks, -John
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14,
> >> Richard Yu
> >> > > > > wrote:
> >> > > > > > >>> Hello to
> >> > > > > > >>> > > > > > all,
> >> > > > > > >>> > > > > > > >>>>>>>>>> I've finished making some initial
> >> > > > > modifications
> >> > > > > > >>> to the KIP. I
> >> > > > > > >>> > > > > > > >>>>>>>>>> have decided to keep the
> >> implementation
> >> > > > > section
> >> > > > > > >>> in the KIP for
> >> > > > > > >>> > > > > > > >>>>>>>>>> record-keeping purposes. For now, we
> >> should
> >> > > > > focus
> >> > > > > > >>> on only the
> >> > > > > > >>> > > > > > > >>>>>>>>>> proposed behavior changes instead.
> >> See if you
> >> > > > > > >>> have any
> >> > > > > > >>> > > > > > > >>> comments!
> >> > > > > > >>> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25, 2020
> >> at 11:12
> >> > > > > AM
> >> > > > > > >>> Richard Yu
> >> > > > > > >>> > > > > > > >>>>>>>>>> <yohan.richard...@gmail.com <mailto:
> >> > > > > > >>> > > > > > yohan.richard...@gmail.com
> >> > > > > > >>> > > > > > > >>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all the
> >> discussion!
> >> > > > > > >>> @John and @Bruno
> >> > > > > > >>> > > > > > > >>> I
> >> > > > > > >>> > > > > > > >>>>>>>>>> will survey other possible systems
> >> and see
> >> > > > > what I
> >> > > > > > >>> can do. Just
> >> > > > > > >>> > > > > > > >>> a
> >> > > > > > >>> > > > > > > >>>>>>>>>> question, by systems, I suppose you
> >> would mean
> >> > > > > > >>> the pros and
> >> > > > > > >>> > > > > > > >>> cons
> >> > > > > > >>> > > > > > > >>>>>>>>>> of different reporting strategies?
> >> I'm not
> >> > > > > > >>> completely certain
> >> > > > > > >>> > > > > > > >>> on
> >> > > > > > >>> > > > > > > >>>>>>>>>> this point, so it would be great if
> >> you can
> >> > > > > > >>> clarify on that.
> >> > > > > > >>> > > > > > So
> >> > > > > > >>> > > > > > > >>>>>>>>>> here's what I got from all the
> >> discussion so
> >> > > > > far:
> >> > > > > > >>> - Since both
> >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias and John seems to have come
> >> to a
> >> > > > > > >>> consensus on this,
> >> > > > > > >>> > > > > > > >>> then
> >> > > > > > >>> > > > > > > >>>>>>>>>> we will go for an all-round
> >> behavorial change
> >> > > > > for
> >> > > > > > >>> KTables.
> >> > > > > > >>> > > > > > > >>> After
> >> > > > > > >>> > > > > > > >>>>>>>>>> some thought, I decided that for
> >> now, an
> >> > > > > opt-out
> >> > > > > > >>> config will
> >> > > > > > >>> > > > > > > >>> not
> >> > > > > > >>> > > > > > > >>>>>>>>>> be added. As John have pointed out,
> >> no-op
> >> > > > > changes
> >> > > > > > >>> tend to
> >> > > > > > >>> > > > > > > >>>>>>>>>> explode further down the topology as
> >> they are
> >> > > > > > >>> forwarded to
> >> > > > > > >>> > > > > > more
> >> > > > > > >>> > > > > > > >>>>>>>>>> and more processor nodes downstream.
> >> - About
> >> > > > > > >>> using hash codes,
> >> > > > > > >>> > > > > > > >>>>>>>>>> after some explanation from John, it
> >> looks
> >> > > > > like
> >> > > > > > >>> hash codes
> >> > > > > > >>> > > > > > > >>> might
> >> > > > > > >>> > > > > > > >>>>>>>>>> not be as ideal (for
> >> implementation). For
> >> > > > > now, we
> >> > > > > > >>> will omit
> >> > > > > > >>> > > > > > > >>> that
> >> > > > > > >>> > > > > > > >>>>>>>>>> detail, and save it for the PR. -
> >> @Bruno You
> >> > > > > do
> >> > > > > > >>> have valid
> >> > > > > > >>> > > > > > > >>>>>>>>>> concerns. Though, I am not
> >> completely certain
> >> > > > > if
> >> > > > > > >>> we want to do
> >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change only for materialized
> >> KTables.
> >> > > > > I
> >> > > > > > >>> will put it
> >> > > > > > >>> > > > > > > >>> down
> >> > > > > > >>> > > > > > > >>>>>>>>>> in the KIP regardless. I will do my
> >> best to
> >> > > > > > >>> address all points
> >> > > > > > >>> > > > > > > >>>>>>>>>> raised so far on the discussion.
> >> Hope we could
> >> > > > > > >>> keep this
> >> > > > > > >>> > > > > > going!
> >> > > > > > >>> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24, 2020
> >> at 6:07 PM
> >> > > > > > >>> Bruno Cadonna
> >> > > > > > >>> > > > > > > >>>>>>>>>> <br...@confluent.io <mailto:
> >> > > > > br...@confluent.io>>
> >> > > > > > >>> wrote: Thank
> >> > > > > > >>> > > > > > > >>> you
> >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias for the use cases! Looking
> >> at both
> >> > > > > use
> >> > > > > > >>> cases, I think
> >> > > > > > >>> > > > > > > >>>>>>>>>> you need to elaborate on them in the
> >> KIP,
> >> > > > > > >>> Richard. Emit from
> >> > > > > > >>> > > > > > > >>>>>>>>>> plain KTable: I agree with Matthias
> >> that the
> >> > > > > > >>> lower timestamp
> >> > > > > > >>> > > > > > > >>>>>>>>>> makes sense because it marks the
> >> start of the
> >> > > > > > >>> validity of the
> >> > > > > > >>> > > > > > > >>>>>>>>>> record. Idempotent records with a
> >> higher
> >> > > > > > >>> timestamp can be
> >> > > > > > >>> > > > > > > >>> safely
> >> > > > > > >>> > > > > > > >>>>>>>>>> ignored. A corner case that I
> >> discussed with
> >> > > > > > >>> Matthias offline
> >> > > > > > >>> > > > > > > >>> is
> >> > > > > > >>> > > > > > > >>>>>>>>>> when we do not materialize a KTable
> >> due to
> >> > > > > > >>> optimization. Then
> >> > > > > > >>> > > > > > > >>> we
> >> > > > > > >>> > > > > > > >>>>>>>>>> cannot avoid the idempotent records
> >> because
> >> > > > > we do
> >> > > > > > >>> not keep the
> >> > > > > > >>> > > > > > > >>>>>>>>>> first record with the lower
> >> timestamp to
> >> > > > > compare
> >> > > > > > >>> to. Emit from
> >> > > > > > >>> > > > > > > >>>>>>>>>> KTable with aggregations: If we
> >> specify that
> >> > > > > an
> >> > > > > > >>> aggregation
> >> > > > > > >>> > > > > > > >>>>>>>>>> result should have the highest
> >> timestamp of
> >> > > > > the
> >> > > > > > >>> records that
> >> > > > > > >>> > > > > > > >>>>>>>>>> participated in the aggregation, we
> >> cannot
> >> > > > > ignore
> >> > > > > > >>> any
> >> > > > > > >>> > > > > > > >>> idempotent
> >> > > > > > >>> > > > > > > >>>>>>>>>> records. Admittedly, the result of an
> >> > > > > aggregation
> >> > > > > > >>> usually
> >> > > > > > >>> > > > > > > >>>>>>>>>> changes, but there are aggregations
> >> where the
> >> > > > > > >>> result may not
> >> > > > > > >>> > > > > > > >>>>>>>>>> change like min and max, or sum when
> >> the
> >> > > > > incoming
> >> > > > > > >>> records have
> >> > > > > > >>> > > > > > > >>> a
> >> > > > > > >>> > > > > > > >>>>>>>>>> value of zero. In those cases, we
> >> could
> >> > > > > benefit
> >> > > > > > >>> of the emit on
> >> > > > > > >>> > > > > > > >>>>>>>>>> change, but only if we define the
> >> semantics
> >> > > > > of the
> >> > > > > > >>> > > > > > aggregations
> >> > > > > > >>> > > > > > > >>>>>>>>>> to not use the highest timestamp of
> >> the
> >> > > > > > >>> participating records
> >> > > > > > >>> > > > > > > >>> for
> >> > > > > > >>> > > > > > > >>>>>>>>>> the result. In Kafka Streams, we do
> >> not have
> >> > > > > min,
> >> > > > > > >>> max, and sum
> >> > > > > > >>> > > > > > > >>> as
> >> > > > > > >>> > > > > > > >>>>>>>>>> explicit aggregations, but we need
> >> to provide
> >> > > > > an
> >> > > > > > >>> API to define
> >> > > > > > >>> > > > > > > >>>>>>>>>> what timestamp should be used for
> >> the result
> >> > > > > of
> >> > > > > > >>> an aggregation
> >> > > > > > >>> > > > > > > >>> if
> >> > > > > > >>> > > > > > > >>>>>>>>>> we want to go down this path. All of
> >> this does
> >> > > > > > >>> not block this
> >> > > > > > >>> > > > > > > >>> KIP
> >> > > > > > >>> > > > > > > >>>>>>>>>> and I just wanted to put this
> >> aspects up for
> >> > > > > > >>> discussion. The
> >> > > > > > >>> > > > > > > >>> KIP
> >> > > > > > >>> > > > > > > >>>>>>>>>> can limit itself to emit from
> >> materialized
> >> > > > > > >>> KTables. However,
> >> > > > > > >>> > > > > > > >>> the
> >> > > > > > >>> > > > > > > >>>>>>>>>> limits should be explicitly stated
> >> in the KIP.
> >> > > > > > >>> Best, Bruno
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM
> >> Matthias J.
> >> > > > > Sax
> >> > > > > > >>> > > > > > > >>>>>>>>>> <matth...@confluent.io <mailto:
> >> > > > > > >>> matth...@confluent.io>> wrote:
> >> > > > > > >>> > > > > > > >>>>>>>>>> IMHO, the question about semantics
> >> depends on
> >> > > > > the
> >> > > > > > >>> use case, in
> >> > > > > > >>> > > > > > > >>>>>>>>>> particular on the origin of a
> >> KTable. If
> >> > > > > there is
> >> > > > > > >>> a changlog
> >> > > > > > >>> > > > > > > >>>>>>>>>> topic that one reads directly into a
> >> KTable,
> >> > > > > > >>> emit-on-change
> >> > > > > > >>> > > > > > > >>> does
> >> > > > > > >>> > > > > > > >>>>>>>>>> actually make sense, because the
> >> timestamp
> >> > > > > > >>> indicates _when_
> >> > > > > > >>> > > > > > the
> >> > > > > > >>> > > > > > > >>>>>>>>>> update was _effective_. For this
> >> case, it is
> >> > > > > > >>> semantically
> >> > > > > > >>> > > > > > sound
> >> > > > > > >>> > > > > > > >>>>>>>>>> to _not_ update the timestamp in the
> >> store,
> >> > > > > > >>> because the second
> >> > > > > > >>> > > > > > > >>>>>>>>>> update is actually idempotent and
> >> advancing
> >> > > > > the
> >> > > > > > >>> timestamp is
> >> > > > > > >>> > > > > > > >>> not
> >> > > > > > >>> > > > > > > >>>>>>>>>> ideal (one could even consider it to
> >> be wrong
> >> > > > > to
> >> > > > > > >>> advance the
> >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp) because the "valid time"
> >> of the
> >> > > > > record
> >> > > > > > >>> pair did not
> >> > > > > > >>> > > > > > > >>>>>>>>>> change. This reasoning also applies
> >> to
> >> > > > > > >>> KTable-KTable joins.
> >> > > > > > >>> > > > > > > >>>>>>>>>> However, if the KTable is the result
> >> of an
> >> > > > > > >>> aggregation, I
> >> > > > > > >>> > > > > > think
> >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-update is more natural,
> >> because the
> >> > > > > > >>> timestamp reflects
> >> > > > > > >>> > > > > > > >>>>>>>>>> the _last_ time (ie, highest
> >> timestamp) of all
> >> > > > > > >>> input records
> >> > > > > > >>> > > > > > > >>> the
> >> > > > > > >>> > > > > > > >>>>>>>>>> contributed to the result. Hence,
> >> updating the
> >> > > > > > >>> timestamp and
> >> > > > > > >>> > > > > > > >>>>>>>>>> emitting a new record actually
> >> sounds correct
> >> > > > > to
> >> > > > > > >>> me. This
> >> > > > > > >>> > > > > > > >>> applies
> >> > > > > > >>> > > > > > > >>>>>>>>>> to windowed and non-windowed
> >> aggregations
> >> > > > > IMHO.
> >> > > > > > >>> However,
> >> > > > > > >>> > > > > > > >>>>>>>>>> considering the argument that the
> >> timestamp
> >> > > > > > >>> should not be
> >> > > > > > >>> > > > > > > >>> update
> >> > > > > > >>> > > > > > > >>>>>>>>>> in the first case in the store to
> >> begin with,
> >> > > > > > >>> both cases are
> >> > > > > > >>> > > > > > > >>>>>>>>>> actually the same, and both can be
> >> modeled as
> >> > > > > > >>> emit-on-change:
> >> > > > > > >>> > > > > > > >>> if
> >> > > > > > >>> > > > > > > >>>>>>>>>> a `table()` operator does not update
> >> the
> >> > > > > > >>> timestamp if the
> >> > > > > > >>> > > > > > value
> >> > > > > > >>> > > > > > > >>>>>>>>>> does not change, there is _no_
> >> change and thus
> >> > > > > > >>> nothing is
> >> > > > > > >>> > > > > > > >>>>>>>>>> emitted. At the same time, if an
> >> aggregation
> >> > > > > > >>> operator does
> >> > > > > > >>> > > > > > > >>> update
> >> > > > > > >>> > > > > > > >>>>>>>>>> the timestamp (even if the value
> >> does not
> >> > > > > change)
> >> > > > > > >>> there _is_ a
> >> > > > > > >>> > > > > > > >>>>>>>>>> change and we emit. Note that
> >> handling
> >> > > > > > >>> out-of-order data for
> >> > > > > > >>> > > > > > > >>>>>>>>>> aggregations would also work
> >> seamlessly with
> >> > > > > this
> >> > > > > > >>> approach --
> >> > > > > > >>> > > > > > > >>> for
> >> > > > > > >>> > > > > > > >>>>>>>>>> out-of-order records, the timestamp
> >> does never
> >> > > > > > >>> change, and
> >> > > > > > >>> > > > > > > >>> thus,
> >> > > > > > >>> > > > > > > >>>>>>>>>> we only emit if the result itself
> >> changes.
> >> > > > > > >>> Therefore, I would
> >> > > > > > >>> > > > > > > >>>>>>>>>> argue that we might not even need
> >> any config,
> >> > > > > > >>> because the
> >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change behavior is just
> >> correct and
> >> > > > > > >>> reduced the
> >> > > > > > >>> > > > > > > >>>>>>>>>> downstream load, while our current
> >> behavior is
> >> > > > > > >>> not ideal (even
> >> > > > > > >>> > > > > > > >>> if
> >> > > > > > >>> > > > > > > >>>>>>>>>> it's also correct). Thoughts?
> >> -Matthias On
> >> > > > > > >>> 1/24/20 9:37 AM,
> >> > > > > > >>> > > > > > > >>> John
> >> > > > > > >>> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks for
> >> that
> >> > > > > idea. I
> >> > > > > > >>> hadn't
> >> > > > > > >>> > > > > > > >>>>>>>>>> considered that option before, and
> >> it does
> >> > > > > seem
> >> > > > > > >>> like that
> >> > > > > > >>> > > > > > would
> >> > > > > > >>> > > > > > > >>>>>>>>>> be the right place to put it if we
> >> think it
> >> > > > > might
> >> > > > > > >>> be
> >> > > > > > >>> > > > > > > >>> semantically
> >> > > > > > >>> > > > > > > >>>>>>>>>> important to control on a
> >> table-by-table
> >> > > > > basis. I
> >> > > > > > >>> had been
> >> > > > > > >>> > > > > > > >>>>>>>>>> thinking of it less semantically and
> >> more
> >> > > > > > >>> practically. In the
> >> > > > > > >>> > > > > > > >>>>>>>>>> context of a large topology, or more
> >> > > > > generally, a
> >> > > > > > >>> large
> >> > > > > > >>> > > > > > > >>> software
> >> > > > > > >>> > > > > > > >>>>>>>>>> system that contains many topologies
> >> and other
> >> > > > > > >>> event-driven
> >> > > > > > >>> > > > > > > >>>>>>>>>> systems, each no-op result becomes
> >> an input
> >> > > > > that
> >> > > > > > >>> is destined
> >> > > > > > >>> > > > > > to
> >> > > > > > >>> > > > > > > >>>>>>>>>> itself become a no-op result, and so
> >> on, all
> >> > > > > the
> >> > > > > > >>> way through
> >> > > > > > >>> > > > > > > >>> the
> >> > > > > > >>> > > > > > > >>>>>>>>>> system. Thus, a single pointless
> >> processing
> >> > > > > > >>> result becomes
> >> > > > > > >>> > > > > > > >>>>>>>>>> amplified into a large number of
> >> pointless
> >> > > > > > >>> computations, cache
> >> > > > > > >>> > > > > > > >>>>>>>>>> perturbations, and network and disk
> >> I/O
> >> > > > > > >>> operations. If you
> >> > > > > > >>> > > > > > also
> >> > > > > > >>> > > > > > > >>>>>>>>>> consider operations with fan-out
> >> implications,
> >> > > > > > >>> like branching
> >> > > > > > >>> > > > > > > >>> or
> >> > > > > > >>> > > > > > > >>>>>>>>>> foreign-key joins, the wasted
> >> resources are
> >> > > > > > >>> amplified not just
> >> > > > > > >>> > > > > > > >>> in
> >> > > > > > >>> > > > > > > >>>>>>>>>> proportion to the size of the
> >> system, but the
> >> > > > > > >>> size of the
> >> > > > > > >>> > > > > > > >>> system
> >> > > > > > >>> > > > > > > >>>>>>>>>> times the average fan-out (to the
> >> power of the
> >> > > > > > >>> number of
> >> > > > > > >>> > > > > > > >>> fan-out
> >> > > > > > >>> > > > > > > >>>>>>>>>> operations on the path(s) through the
> >> > > > > system). In
> >> > > > > > >>> my time
> >> > > > > > >>> > > > > > > >>>>>>>>>> operating such systems, I've
> >> observed these
> >> > > > > > >>> effects to be very
> >> > > > > > >>> > > > > > > >>>>>>>>>> real, and actually, the system and
> >> use case
> >> > > > > > >>> doesn't have to be
> >> > > > > > >>> > > > > > > >>>>>>>>>> very large before the amplification
> >> poses an
> >> > > > > > >>> existential
> >> > > > > > >>> > > > > > threat
> >> > > > > > >>> > > > > > > >>>>>>>>>> to the system as a whole. This is
> >> the basis
> >> > > > > of my
> >> > > > > > >>> advocating
> >> > > > > > >>> > > > > > > >>> for
> >> > > > > > >>> > > > > > > >>>>>>>>>> a simple behavior change, rather
> >> than an
> >> > > > > opt-in
> >> > > > > > >>> config of any
> >> > > > > > >>> > > > > > > >>>>>>>>>> kind. It seems like Streams should
> >> "do the
> >> > > > > right
> >> > > > > > >>> thing" for
> >> > > > > > >>> > > > > > the
> >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case. My theory (which
> >> may be
> >> > > > > wrong)
> >> > > > > > >>> is that the
> >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case is more like
> >> "relational
> >> > > > > > >>> queries" than "CEP
> >> > > > > > >>> > > > > > > >>>>>>>>>> queries". Even if you were doing some
> >> > > > > > >>> event-sensitive
> >> > > > > > >>> > > > > > > >>>>>>>>>> computation, wouldn't you do them as
> >> Stream
> >> > > > > > >>> operations (where
> >> > > > > > >>> > > > > > > >>>>>>>>>> this feature is inapplicable
> >> anyway)? In
> >> > > > > keeping
> >> > > > > > >>> with the
> >> > > > > > >>> > > > > > > >>>>>>>>>> "practical" perspective, I suggested
> >> the
> >> > > > > opt-out
> >> > > > > > >>> config only
> >> > > > > > >>> > > > > > in
> >> > > > > > >>> > > > > > > >>>>>>>>>> the (I think unlikely) event that
> >> filtering
> >> > > > > out
> >> > > > > > >>> pointless
> >> > > > > > >>> > > > > > > >>> updates
> >> > > > > > >>> > > > > > > >>>>>>>>>> actually harms performance. I'd also
> >> be
> >> > > > > perfectly
> >> > > > > > >>> fine without
> >> > > > > > >>> > > > > > > >>>>>>>>>> the opt-out config. I really think
> >> that
> >> > > > > (because
> >> > > > > > >>> of the
> >> > > > > > >>> > > > > > > >>> timestamp
> >> > > > > > >>> > > > > > > >>>>>>>>>> semantics work already underway),
> >> we're
> >> > > > > already
> >> > > > > > >>> pre-fetching
> >> > > > > > >>> > > > > > > >>> the
> >> > > > > > >>> > > > > > > >>>>>>>>>> prior result most of the time, so
> >> there would
> >> > > > > > >>> actually be very
> >> > > > > > >>> > > > > > > >>>>>>>>>> little extra I/O involved in
> >> implementing
> >> > > > > > >>> emit-on-change.
> >> > > > > > >>> > > > > > > >>>>>>>>>> However, we should consider whether
> >> my
> >> > > > > experience
> >> > > > > > >>> is likely to
> >> > > > > > >>> > > > > > > >>>>>>>>>> be general. Do you have some use
> >> case in mind
> >> > > > > for
> >> > > > > > >>> which you'd
> >> > > > > > >>> > > > > > > >>>>>>>>>> actually want some KTable results to
> >> be
> >> > > > > > >>> emit-on-update for
> >> > > > > > >>> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02,
> >> Bruno Cadonna
> >> > > > > > >>> wrote: Hi
> >> > > > > > >>> > > > > > > >>> Richard,
> >> > > > > > >>> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree with
> >> John that
> >> > > > > we
> >> > > > > > >>> should focus
> >> > > > > > >>> > > > > > > >>> on
> >> > > > > > >>> > > > > > > >>>>>>>>>> the interface and behavior change in
> >> a KIP. We
> >> > > > > > >>> can discuss the
> >> > > > > > >>> > > > > > > >>>>>>>>>> implementation later. I am also +1
> >> for the
> >> > > > > > >>> survey. I had a
> >> > > > > > >>> > > > > > > >>>>>>>>>> thought about this. Couldn't we
> >> consider
> >> > > > > > >>> emit-on-change to be
> >> > > > > > >>> > > > > > > >>> one
> >> > > > > > >>> > > > > > > >>>>>>>>>> config of suppress (like
> >> `untilWindowCloses`)?
> >> > > > > > >>> What you
> >> > > > > > >>> > > > > > > >>>>>>>>>> basically propose is to suppress
> >> updates if
> >> > > > > they
> >> > > > > > >>> do not change
> >> > > > > > >>> > > > > > > >>>>>>>>>> the result. Considering emit on
> >> change as a
> >> > > > > > >>> flavour of
> >> > > > > > >>> > > > > > suppress
> >> > > > > > >>> > > > > > > >>>>>>>>>> would be more flexible because it
> >> would
> >> > > > > specify
> >> > > > > > >>> the behavior
> >> > > > > > >>> > > > > > > >>>>>>>>>> locally for a KTable instead of
> >> globally for
> >> > > > > all
> >> > > > > > >>> KTables.
> >> > > > > > >>> > > > > > > >>>>>>>>>> Additionally, specifying the
> >> behavior in one
> >> > > > > > >>> place instead of
> >> > > > > > >>> > > > > > > >>>>>>>>>> multiple places feels more intuitive
> >> and
> >> > > > > > >>> consistent to me.
> >> > > > > > >>> > > > > > > >>> Best,
> >> > > > > > >>> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at 7:49
> >> AM John
> >> > > > > Roesler
> >> > > > > > >>> > > > > > > >>>>>>>>>> <vvcep...@apache.org <mailto:
> >> > > > > vvcep...@apache.org>>
> >> > > > > > >>> wrote: Hi
> >> > > > > > >>> > > > > > > >>>>>>>>>> Richard, Thanks for picking this up!
> >> I know
> >> > > > > of at
> >> > > > > > >>> least one
> >> > > > > > >>> > > > > > > >>> large
> >> > > > > > >>> > > > > > > >>>>>>>>>> community member for which this
> >> feature is
> >> > > > > > >>> absolutely
> >> > > > > > >>> > > > > > > >>> essential.
> >> > > > > > >>> > > > > > > >>>>>>>>>> If I understand your two options, it
> >> seems
> >> > > > > like
> >> > > > > > >>> the proposal
> >> > > > > > >>> > > > > > is
> >> > > > > > >>> > > > > > > >>>>>>>>>> to implement it as a behavior change
> >> > > > > regardless,
> >> > > > > > >>> and the
> >> > > > > > >>> > > > > > > >>> question
> >> > > > > > >>> > > > > > > >>>>>>>>>> is whether to provide an opt-out
> >> config or
> >> > > > > not.
> >> > > > > > >>> Given that any
> >> > > > > > >>> > > > > > > >>>>>>>>>> implementation of this feature would
> >> have some
> >> > > > > > >>> performance
> >> > > > > > >>> > > > > > > >>> impact
> >> > > > > > >>> > > > > > > >>>>>>>>>> under some workloads, and also that
> >> we don't
> >> > > > > know
> >> > > > > > >>> if anyone
> >> > > > > > >>> > > > > > > >>>>>>>>>> really depends on emit-on-update time
> >> > > > > semantics,
> >> > > > > > >>> it seems like
> >> > > > > > >>> > > > > > > >>> we
> >> > > > > > >>> > > > > > > >>>>>>>>>> should propose to add an opt-out
> >> config. Can
> >> > > > > you
> >> > > > > > >>> update the
> >> > > > > > >>> > > > > > KIP
> >> > > > > > >>> > > > > > > >>>>>>>>>> to mention the exact config key and
> >> value(s)
> >> > > > > > >>> you'd propose?
> >> > > > > > >>> > > > > > > >>> Just
> >> > > > > > >>> > > > > > > >>>>>>>>>> to move the discussion forward, maybe
> >> > > > > something
> >> > > > > > >>> like: emit.on
> >> > > > > > >>> > > > > > > >>> :=
> >> > > > > > >>> > > > > > > >>>>>>>>>> change|update with the new default
> >> being
> >> > > > > "change"
> >> > > > > > >>> Thanks for
> >> > > > > > >>> > > > > > > >>>>>>>>>> pointing out the timestamp issue in
> >> > > > > particular. I
> >> > > > > > >>> agree that
> >> > > > > > >>> > > > > > if
> >> > > > > > >>> > > > > > > >>>>>>>>>> we discard the latter update as a
> >> no-op, then
> >> > > > > we
> >> > > > > > >>> also have to
> >> > > > > > >>> > > > > > > >>>>>>>>>> discard its timestamp (obviously, we
> >> don't
> >> > > > > > >>> forward the
> >> > > > > > >>> > > > > > > >>> timestamp
> >> > > > > > >>> > > > > > > >>>>>>>>>> update, as that's the whole point,
> >> but we also
> >> > > > > > >>> can't update
> >> > > > > > >>> > > > > > the
> >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp in the store, as the store
> >> must
> >> > > > > remain
> >> > > > > > >>> consistent
> >> > > > > > >>> > > > > > > >>> with
> >> > > > > > >>> > > > > > > >>>>>>>>>> what has been emitted). I have to
> >> confess
> >> > > > > that I
> >> > > > > > >>> disagree with
> >> > > > > > >>> > > > > > > >>>>>>>>>> your implementation proposal, but
> >> it's also
> >> > > > > not
> >> > > > > > >>> necessary to
> >> > > > > > >>> > > > > > > >>>>>>>>>> discuss implementation in the KIP.
> >> Maybe it
> >> > > > > would
> >> > > > > > >>> be less
> >> > > > > > >>> > > > > > > >>>>>>>>>> controversial if you just drop that
> >> section
> >> > > > > for
> >> > > > > > >>> now, so that
> >> > > > > > >>> > > > > > > >>> the
> >> > > > > > >>> > > > > > > >>>>>>>>>> KIP discussion can focus on the
> >> behavior
> >> > > > > change
> >> > > > > > >>> and config.
> >> > > > > > >>> > > > > > > >>> Just
> >> > > > > > >>> > > > > > > >>>>>>>>>> for reference, there is some
> >> research into
> >> > > > > this
> >> > > > > > >>> domain. For
> >> > > > > > >>> > > > > > > >>>>>>>>>> example, see the "Report" section
> >> (3.2.3) of
> >> > > > > the
> >> > > > > > >>> SECRET paper:
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > >
> >> > > > > > >>>
> >> > > > >
> >> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop
> >> > > > > > >>> > > > > > > > le.csail.mit.edu
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data
> >> > > > > > >>> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com
> >> > > > > > >>> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > > > >>>
> >> > > > > > >>> > > > > >
> >> > > > > > >>>
> >> > > > >
> >> Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&amp;sdata
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>>
> >> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&amp;reserved=0
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > > It might help to round out the proposal if
> >> you take a
> >> > > > > brief
> >> > > > > > >>> > > > > > > >>> survey of
> >> > > > > > >>> > > > > > > >>>>>>>>>> the behaviors of other systems,
> >> along with
> >> > > > > pros
> >> > > > > > >>> and cons if
> >> > > > > > >>> > > > > > any
> >> > > > > > >>> > > > > > > >>>>>>>>>> are reported. Thanks, -John
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27,
> >> Richard Yu
> >> > > > > wrote:
> >> > > > > > >>> Hi
> >> > > > > > >>> > > > > > everybody!
> >> > > > > > >>> > > > > > > >>>>>>>>>> I'd like to propose a change that we
> >> probably
> >> > > > > > >>> should've added
> >> > > > > > >>> > > > > > > >>> for
> >> > > > > > >>> > > > > > > >>>>>>>>>> a long time now. The key benefit of
> >> this KIP
> >> > > > > > >>> would be reduced
> >> > > > > > >>> > > > > > > >>>>>>>>>> traffic in Kafka Streams since a lot
> >> of no-op
> >> > > > > > >>> results would no
> >> > > > > > >>> > > > > > > >>>>>>>>>> longer be sent downstream. Here is
> >> the KIP for
> >> > > > > > >>> reference.
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > >
> >> > > > > > >>>
> >> > > > >
> >> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi
> >> > > > > > >>> > > > > > > > ki.apache.org
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > > > >>>
> >> > > > > > >>> > > > > >
> >> > > > > > >>>
> >> > > > >
> >> %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThom
> >> > > > > > >>> > > > > > > > as.Becker%40tivo.com
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > > > >>>
> >> > > > > > >>> > > > > >
> >> > > > > > >>>
> >> > > > >
> >> 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&amp;sdata=zYpCSFOsyN4%2B
> >> > > > > > >>> > > > > > > >
> >> 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&amp;reserved=0
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > > Currently, I seek to formalize our approach
> >> for this
> >> > > > > KIP
> >> > > > > > >>> first
> >> > > > > > >>> > > > > > > >>> before
> >> > > > > > >>> > > > > > > >>>>>>>>>> we determine concrete API additions /
> >> > > > > > >>> configurations. Some
> >> > > > > > >>> > > > > > > >>>>>>>>>> configs might warrant adding, whiles
> >> others
> >> > > > > are
> >> > > > > > >>> not necessary
> >> > > > > > >>> > > > > > > >>>>>>>>>> since adding them would only increase
> >> > > > > complexity
> >> > > > > > >>> of Kafka
> >> > > > > > >>> > > > > > > >>>>>>>>>> Streams. Cheers, Richard
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________
> >> This email
> >> > > > > and
> >> > > > > > >>> any
> >> > > > > > >>> > > > > > attachments
> >> > > > > > >>> > > > > > > >>>>>>>>>> may contain confidential and
> >> privileged
> >> > > > > material
> >> > > > > > >>> for the sole
> >> > > > > > >>> > > > > > > >>> use
> >> > > > > > >>> > > > > > > >>>>>>>>>> of the intended recipient. Any
> >> review,
> >> > > > > copying, or
> >> > > > > > >>> > > > > > distribution
> >> > > > > > >>> > > > > > > >>>>>>>>>> of this email (or any attachments)
> >> by others
> >> > > > > is
> >> > > > > > >>> prohibited. If
> >> > > > > > >>> > > > > > > >>>>>>>>>> you are not the intended recipient,
> >> please
> >> > > > > > >>> contact the sender
> >> > > > > > >>> > > > > > > >>>>>>>>>> immediately and permanently delete
> >> this email
> >> > > > > and
> >> > > > > > >>> any
> >> > > > > > >>> > > > > > > >>>>>>>>>> attachments. No employee or agent of
> >> TiVo is
> >> > > > > > >>> authorized to
> >> > > > > > >>> > > > > > > >>>>>>>>>> conclude any binding agreement on
> >> behalf of
> >> > > > > TiVo
> >> > > > > > >>> by email.
> >> > > > > > >>> > > > > > > >>>>>>>>>> Binding agreements with TiVo may
> >> only be made
> >> > > > > by
> >> > > > > > >>> a signed
> >> > > > > > >>> > > > > > > >>> written
> >> > > > > > >>> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker*
> >> *Principal
> >> > > > > Engineer *
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> *Personalized Content Discovery*
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
> >> > > > > > >>> http://www.tivo.com/>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>> This email and any attachments may
> >> contain
> >> > > > > > >>> confidential and
> >> > > > > > >>> > > > > > > >>>>>>>>>> privileged material for the sole use
> >> of the
> >> > > > > > >>> intended
> >> > > > > > >>> > > > > > recipient.
> >> > > > > > >>> > > > > > > >>>>>>>>>> Any review, copying, or distribution
> >> of this
> >> > > > > > >>> email (or any
> >> > > > > > >>> > > > > > > >>>>>>>>>> attachments) by others is
> >> prohibited. If you
> >> > > > > are
> >> > > > > > >>> not the
> >> > > > > > >>> > > > > > > >>> intended
> >> > > > > > >>> > > > > > > >>>>>>>>>> recipient, please contact the sender
> >> > > > > immediately
> >> > > > > > >>> and
> >> > > > > > >>> > > > > > > >>> permanently
> >> > > > > > >>> > > > > > > >>>>>>>>>> delete this email and any
> >> attachments. No
> >> > > > > > >>> employee or agent of
> >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo is authorized to conclude any
> >> binding
> >> > > > > > >>> agreement on behalf
> >> > > > > > >>> > > > > > > >>> of
> >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo by email. Binding agreements
> >> with TiVo
> >> > > > > may
> >> > > > > > >>> only be made
> >> > > > > > >>> > > > > > > >>> by a
> >> > > > > > >>> > > > > > > >>>>>>>>>> signed written agreement.
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> --
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> *Tommy Becker* /Principal Engineer /
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> /Personalized Content Discovery/
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
> >> > > > > > >>> http://www.tivo.com/>
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > >
> >> > > > > > >>>
> >> > > > >
> >> ----------------------------------------------------------------------
> >> > > > > > >>> > > > > > > >>>>>>>
> >> > > > > > >>> > > > > > > >>>>>>
> >> > > > > > >>> > > > > > > >>>>>
> >> > > > > > >>> > > > > > > >>>>
> >> > > > > > >>> > > > > > > >>>
> >> > > > > > >>> > > > > > > >>
> >> > > > > > >>> > > > > > > >
> >> > > > > > >>> > > > > > >
> >> > > > > > >>> > > > > > >
> >> > > > > > >>> > > > > > > Attachments:
> >> > > > > > >>> > > > > > > * signature.asc
> >> > > > > > >>> > > > > >
> >> > > > > > >>> > > >
> >> > > > > > >>> >
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > >
> >> > >
> >>
> >
>

Reply via email to