Hi John,

Sounds goods. It looks like we are close to wrapping things up. If there
isn't any other revisions which needs to be made. (If so, please comment in
the thread)
I will start the voting process this Thursday (Pacific Standard Time).

Cheers,
Richard

On Tue, Feb 25, 2020 at 11:59 AM John Roesler <vvcep...@apache.org> wrote:

> Hi Richard,
>
> Sorry for the slow reply. I actually think we should avoid checking
> equals() for now. Your reasoning is good, but the truth is that
> depending on the implementation of equals() is non-trivial,
> semantically, and (though I proposed it before), I'm not convinced
> it's worth the risk. Much better to start with exactly one kind of
> "idempotence detection".
>
> Even if someone does update their serdes, we know that the new
> serde would still be able to _de_serialize the old format, or the whole
> app would break. The situation is that the new result gets encoded
> in the new binary format, which means we don't detect an idempotent
> update for what it is. In this case, we'd write the new binary format to
> disk and the changelog, and forward it downstream. However, we only
> do this once. Now that the binary format for that record has been updated,
> we would correctly detect idempotence of any subsequent updates.
>
> Plus, we would still be able to filter out idempotent updates in
> repartition
> sinks, since for those, we use the new serde to serialize both the "old"
> and
> "new" result.
>
> It's certainly a good observation, but I think we can just make a note of
> it
> in "rejected alternatives" for now, and plan to refine it later, if it does
> pose a big performance problem.
>
> Thanks!
> -John
>
> On Sat, Feb 22, 2020, at 18:14, Richard Yu wrote:
> > Hi all,
> >
> > Updated the KIP.
> >
> > Just a question: do you think it would be a good idea if we check for
> both
> > Object#equals() and binary equality?
> > Because there might be some subtle changes in the serialization (for
> > example, if the user decides to upgrade their serialization procedure to
> a
> > new one), but the underlying values of the result might be the same.
> > (therefore equals() might return true)
> >
> > Do you think this would be plausible?
> >
> > Cheers,
> > Richard
> >
> > On Fri, Feb 21, 2020 at 2:37 PM Richard Yu <yohan.richard...@gmail.com>
> > wrote:
> >
> > > Hello,
> > >
> > > Just to make some updates. I changed the name of the metric so that it
> was
> > > more in line with usual Kafka naming conventions for metrics / sensors.
> > > Below is the updated description of the metric:
> > >
> > > dropped-idempotent-updates : (Level 2 - Per Task) DEBUG (rate | total)
> > >
> > > Description: This metric will record the number of updates that have
> been
> > > dropped since they are essentially re-performing an earlier operation.
> > >
> > > Note:
> > >
> > >    - The rate option indicates the ratio of records dropped to actual
> > >    volume of records passing through the task.
> > >    - The total option will just give a raw count of the number of
> records
> > >    dropped.
> > >
> > >
> > > I hope that this is more on point.
> > >
> > > Best,
> > > Richard
> > >
> > > On Fri, Feb 21, 2020 at 2:20 PM Richard Yu <yohan.richard...@gmail.com
> >
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> Thanks for the clarification. I was just confused a little on what was
> > >> going on.
> > >>
> > >> So I guess then that for the actual proposal. We got the following:
> > >>
> > >> 1. We check for binary equality, and perform no extra look ups.
> > >> 2. Emphasize that this applies only to materialized tables.
> > >> 3. We drop aggregation updates if key, value and timestamp is the
> same.
> > >>
> > >> Then that settles the behavior changes. So it looks like the Metric
> that
> > >> is the only thing that is left. In this case, I think the metric
> would be
> > >> named the following: IdempotentUpdateMetric. This is mostly off the
> top of
> > >> my head. So if you think that we change it, feel free to say so.
> > >> The metric will report the number of dropped operations inherently.
> > >>
> > >> It will probably be added as a Sensor, similar to the dropped records
> > >> sensor we already have.
> > >>
> > >> If there isn't anything else, I will probably start the voting process
> > >> next week!
> > >>
> > >> Cheers,
> > >> Richard
> > >>
> > >>
> > >> On Fri, Feb 21, 2020 at 11:23 AM John Roesler <vvcep...@apache.org>
> > >> wrote:
> > >>
> > >>> Hi Bruno,
> > >>>
> > >>> Thanks for the clarification. Indeed, I was thinking two things:
> > >>> 1. For the initial implementation, we can just avoid adding any extra
> > >>> lookups, but only do the comparison when we already happen to have
> > >>> the prior value.
> > >>> 2. I think, as a result of the timestamp semantics, we actually _do_
> look
> > >>> up the prior value approximately all the time, so the idempotence
> check
> > >>> should be quite effective.
> > >>>
> > >>> I think that second point is the same thing you're referring to
> > >>> potentially
> > >>> being unnecessary. It does mean that we do fetch the whole value in a
> > >>> lot of cases where we really only need the timestamp, so it could
> > >>> certainly
> > >>> be optimized in the future. In that future, we would need to weigh
> that
> > >>> optimization against losing the idempotence check. But, that's a
> problem
> > >>> for tomorrow :)
> > >>>
> > >>> I'm 100% on board with scrutinizing the performance as we implement
> > >>> this feature.
> > >>>
> > >>> Thanks again,
> > >>> -John
> > >>>
> > >>> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote:
> > >>> > Hi John,
> > >>> >
> > >>> > I am glad to help you with your imagination. With overhead, I
> mainly
> > >>> > meant the additional lookup into the state store to get the current
> > >>> > value, but I see now in the code that we do that lookup anyways
> > >>> > (although I think we could avoid that in the cases where we do not
> > >>> > need the old value). With or without config, we need to evaluate
> the
> > >>> > performance benefits of this change, in any case.
> > >>> >
> > >>> > Best,
> > >>> > Bruno
> > >>> >
> > >>> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler <vvcep...@apache.org>
> > >>> wrote:
> > >>> > >
> > >>> > > Thanks for your remarks, Bruno!
> > >>> > >
> > >>> > > I'm in favor of standardizing on terminology like "not forwarding
> > >>> > > idempotent updates" or "dropping idempotent updates". Maybe we
> > >>> > > should make a pass on the KIP and just convert everything to this
> > >>> > > phrasing. In retrospect, even the term "emit-on-change" has too
> much
> > >>> > > semantic baggage, since it implies the semantics from the SECRET
> > >>> > > paper, which we don't really want to imply here.
> > >>> > >
> > >>> > > I'm also in favor of the metric as you propose.
> > >>> > >
> > >>> > > Likewise with stream aggregations, I was also under the
> impression
> > >>> > > that we agreed on dropping idempotent updates to the aggregation
> > >>> > > result, any time we find that our "new" (key, value, timestamp)
> > >>> result
> > >>> > > is identical to the prior one.
> > >>> > >
> > >>> > > Also, I'm +1 on all your recommendations for updating the KIP
> > >>> document
> > >>> > > for clarity.
> > >>> > >
> > >>> > > Regarding the opt-out config. Perhaps I'm suffering from a
> failure of
> > >>> > > imagination, but I don't see how the current proposal could
> really
> > >>> have
> > >>> > > a measurable impact on latency. If all we do is make a single
> extra
> > >>> pass
> > >>> > > to compare two byte arrays for equality, only in the cases where
> we
> > >>> already
> > >>> > > have the byte arrays available, it seems unlikely to measurably
> > >>> affect the
> > >>> > > processing of non-idempotent updates. It seems guaranteed to
> > >>> _decrease_
> > >>> > > the latency of processing idempotent updates, since we get to
> skip a
> > >>> > > store#put, at least one producer#send, and also all downstream
> > >>> processing,
> > >>> > > including all the disk and network operations associated with
> > >>> downstream
> > >>> > > operations.
> > >>> > >
> > >>> > > It seems like if we're pretty sure this change would only
> _help_, we
> > >>> shouldn't
> > >>> > > introduce the operational burden of an extra configuration. If we
> > >>> want to
> > >>> > > be more aggressive about dropping idempotent operations in the
> > >>> future,
> > >>> > > such as depending on equals() or adding a ChangeDetector
> interface,
> > >>> then
> > >>> > > we should consider adding a configuration as part of that future
> > >>> work. In
> > >>> > > fact, if we add a simple "opt-in/opt-out" switch right now, we
> might
> > >>> find
> > >>> > > that it's actually insufficient for whatever future feature we
> might
> > >>> propose,
> > >>> > > then we have a mess of deprecating the opt-out config and
> replacing
> > >>> it.
> > >>> > >
> > >>> > > What do you think?
> > >>> > > -John
> > >>> > >
> > >>> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna wrote:
> > >>> > > > Hi all,
> > >>> > > >
> > >>> > > > Sorry for the late reply!
> > >>> > > >
> > >>> > > > I am also in favour of baby steps.
> > >>> > > >
> > >>> > > > I am undecided whether the KIP should contain a opt-out config
> or
> > >>> not.
> > >>> > > > The overhead of emit-on-change might affect latency. For
> > >>> applications
> > >>> > > > where low latency is crucial and there are not too many
> idempotent
> > >>> > > > updates, it would be better to fall back to emit-on-update.
> > >>> However,
> > >>> > > > we do not know how much emit-on-change impacts latency. We
> would
> > >>> first
> > >>> > > > need to benchmark that before we can decide about the
> > >>> opt-out-config.
> > >>> > > >
> > >>> > > > A metric of dropped idempotent updates seems useful to me to be
> > >>> > > > informed about potential upstream applications or upstream
> > >>> operators
> > >>> > > > that produce too many idempotent updates. The KIP should state
> the
> > >>> > > > name of the metric, its group, its tags, and its recording
> level
> > >>> (see
> > >>> > > > KIP-444 or KIP-471 for examples). I propose DEBUG as reporting
> > >>> level.
> > >>> > > >
> > >>> > > > Richard, what competing proposals for emit-on-change for
> > >>> aggregations
> > >>> > > > do you mean? I have the feeling that we agreed to get rid of
> > >>> > > > idempotent updates if the aggregate is updated with the same
> key,
> > >>> > > > value, AND timestamp. I am also fine if we do not include this
> into
> > >>> > > > this KIP (remember: baby steps).
> > >>> > > >
> > >>> > > > You write that "emit-on-change is more correct". Since we
> agreed
> > >>> that
> > >>> > > > this is an optimization, IMO you cannot argue this way.
> > >>> > > >
> > >>> > > > Please put "Alternative Approaches" under "Rejected
> Alternatives",
> > >>> so
> > >>> > > > that it becomes clear that we are not going to implement them.
> In
> > >>> > > > general, I think the KIP needs a bit of clean-up (probably, you
> > >>> > > > already planned for it). "Design Reasoning" is a bit of
> behavior
> > >>> > > > changes, rejected alternatives and duplicates a bit the
> content in
> > >>> > > > those sections.
> > >>> > > >
> > >>> > > > I do not like the name "no-op operations" or "no-ops", because
> they
> > >>> > > > are rather generic. I like more "idempotent updates".
> > >>> > > >
> > >>> > > > Best,
> > >>> > > > Bruno
> > >>> > > >
> > >>> > > >
> > >>> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu <
> > >>> yohan.richard...@gmail.com> wrote:
> > >>> > > > >
> > >>> > > > > Hi all,
> > >>> > > > >
> > >>> > > > > We are definitely making progress!
> > >>> > > > >
> > >>> > > > > @John should I emphasize in the proposed behavior changes
> that
> > >>> we are only
> > >>> > > > > doing binary equality checks for stateful operators?
> > >>> > > > > It looks like we have come close to finalizing this part of
> the
> > >>> KIP. (I
> > >>> > > > > will note in the KIP that this proposal is intended for
> > >>> optimization, not
> > >>> > > > > semantics correctness)
> > >>> > > > >
> > >>> > > > > I do think maybe we still have one other detail we need to
> > >>> discuss. So far,
> > >>> > > > > there has been quite a bit of back and forth about what the
> > >>> behavior of
> > >>> > > > > aggregations should look like in emit on change. I have seen
> > >>> > > > > multiple competing proposals, so I am not completely certain
> > >>> which one we
> > >>> > > > > should go with, or how we will be able to compromise in
> between
> > >>> them.
> > >>> > > > >
> > >>> > > > > Let me know what your thoughts are on this matter, since we
> are
> > >>> probably
> > >>> > > > > close to wrapping up most other stuff.
> > >>> > > > > @Matthias J. Sax <matth...@confluent.io>  and @Bruno, see
> what
> > >>> you think
> > >>> > > > > about this.
> > >>> > > > >
> > >>> > > > > Best,
> > >>> > > > > Richard
> > >>> > > > >
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler <
> > >>> vvcep...@apache.org> wrote:
> > >>> > > > >
> > >>> > > > > > Thanks, Matthias!
> > >>> > > > > >
> > >>> > > > > > Regarding numbers, it would be hard to know how many
> > >>> applications
> > >>> > > > > > would benefit, since we don't know how many applications
> there
> > >>> are,
> > >>> > > > > > or anything about their data sets or topologies. We could
> do a
> > >>> survey,
> > >>> > > > > > but it seems overkill if we take the conservative approach.
> > >>> > > > > >
> > >>> > > > > > I have my own practical stream processing experience that
> > >>> tells me this
> > >>> > > > > > is absolutely critical for any moderate-to-large relational
> > >>> stream
> > >>> > > > > > processing use cases. I'll leave it to you to decide if you
> > >>> find that
> > >>> > > > > > convincing, but it's definitely not an _assumption_. I've
> also
> > >>> heard from
> > >>> > > > > > a few Streams users who have already had to implement
> their own
> > >>> > > > > > noop-suppression transformers in order to get to production
> > >>> scale.
> > >>> > > > > >
> > >>> > > > > > Regardless, it sounds like we can agree on taking an
> > >>> opportunistic approach
> > >>> > > > > > and targeting the optimization just to use a
> binary-equality
> > >>> check at
> > >>> > > > > > stateful operators. (I'd also suggest in sink nodes, when
> we
> > >>> are about to
> > >>> > > > > > send old and new values, since they are also already
> present
> > >>> and serialized
> > >>> > > > > > at that point.) We could make the KIP even more vague, and
> > >>> just say that
> > >>> > > > > > we'll drop no-op updates "when possible".
> > >>> > > > > >
> > >>> > > > > > I'm curious what Bruno and the others think about this. If
> it
> > >>> seems like
> > >>> > > > > > a good starting point, perhaps we could move to a vote soon
> > >>> and get to
> > >>> > > > > > work on the implementation!
> > >>> > > > > >
> > >>> > > > > > Thanks,
> > >>> > > > > > -John
> > >>> > > > > >
> > >>> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J. Sax wrote:
> > >>> > > > > > > Talking about optimizations and reducing downstream load:
> > >>> > > > > > >
> > >>> > > > > > > Do we actually have any numbers? I have the impression
> that
> > >>> this KIP is
> > >>> > > > > > > more or less build on the _assumption_ that there is a
> > >>> problem. Yes,
> > >>> > > > > > > there are some use cases that would benefit from this;
> But
> > >>> how many
> > >>> > > > > > > applications would actually benefit? And how much load
> > >>> reduction would
> > >>> > > > > > > they get?
> > >>> > > > > > >
> > >>> > > > > > > The simplest approach (following John idea to make baby
> > >>> steps) would be
> > >>> > > > > > > to apply the emit-on-change pattern only if there is a
> > >>> store. For this
> > >>> > > > > > > case we need to serialize old and new result anyway and
> thus
> > >>> a simple
> > >>> > > > > > > byte-array comparison is no overhead.
> > >>> > > > > > >
> > >>> > > > > > > Sending `oldValues` by default would become expensive
> > >>> because we would
> > >>> > > > > > > need to serialize the recomputed old result, as well as
> the
> > >>> new result,
> > >>> > > > > > > to make the comparison (and we now the serialization is
> not
> > >>> cheap). We
> > >>> > > > > > > are facing a trade-off between CPU overhead and
> downstream
> > >>> load and I am
> > >>> > > > > > > not sure if we should hard code this. My original
> argument
> > >>> for sending
> > >>> > > > > > > `oldValues` was about semantics; but for an
> optimization, I
> > >>> am not sure
> > >>> > > > > > > if this would be the right choice.
> > >>> > > > > > >
> > >>> > > > > > > For now, users who want to opt-in can force a
> > >>> materialization. A
> > >>> > > > > > > materialization may be expensive and if we see future
> > >>> demand, we could
> > >>> > > > > > > still add an option to send `oldValues` instead of
> > >>> materialization (this
> > >>> > > > > > > would at least save the store overhead). As we consider
> the
> > >>> KIP an
> > >>> > > > > > > optimization, a "config" seems to make sense.
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > -Matthias
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote:
> > >>> > > > > > > > Hi John!
> > >>> > > > > > > >
> > >>> > > > > > > > Thanks for the reply.
> > >>> > > > > > > >
> > >>> > > > > > > > About the changes we have discussed so far. I think
> upon
> > >>> further
> > >>> > > > > > > > consideration, we have been mostly talking about this
> from
> > >>> the
> > >>> > > > > > perspective
> > >>> > > > > > > > that no stop-gap effort is acceptable. However, in
> recent
> > >>> discussion,
> > >>> > > > > > > if we
> > >>> > > > > > > > consider optimization, then it appears that the
> > >>> perspective I
> > >>> > > > > > mentioned no
> > >>> > > > > > > > longer applies. After all, we are no longer concerned
> so
> > >>> much about
> > >>> > > > > > > > semantics correctness, then reducing traffic as much as
> > >>> possible
> > >>> > > > > > without
> > >>> > > > > > > > performance tradeoffs.
> > >>> > > > > > > >
> > >>> > > > > > > > In this case, I think a cache would be a good idea for
> > >>> stateless
> > >>> > > > > > > > operations. This cache will not be backed by a store
> > >>> obviously. We can
> > >>> > > > > > > > probably use Kafka's ThreadCache. We should be able to
> > >>> catch a large
> > >>> > > > > > > > portion of the no-ops if we at least store some
> results in
> > >>> the cache.
> > >>> > > > > > Not
> > >>> > > > > > > > all will be caught, but I think the impact will be
> > >>> significant.
> > >>> > > > > > > >
> > >>> > > > > > > > On another note, I think that we should implement
> > >>> competing proposals
> > >>> > > > > > i.e.
> > >>> > > > > > > > one where we forward both old and new values with a
> > >>> reasonable
> > >>> > > > > > proportion
> > >>> > > > > > > > of artificial no-ops (we do not necessarily have to
> rely
> > >>> on equals so
> > >>> > > > > > much
> > >>> > > > > > > > as comparing the serialized binary data after the
> > >>> operation), and in
> > >>> > > > > > > > another scenario, the cache for stateless ops. It
> would be
> > >>> > > > > > unreasonable if
> > >>> > > > > > > > we completely disregard either approach, since they
> both
> > >>> have merit.
> > >>> > > > > > The
> > >>> > > > > > > > reason for implementing both is to perform benchmark
> tests
> > >>> on them, and
> > >>> > > > > > > > compare them with the original. This way, we can more
> > >>> clearly see what
> > >>> > > > > > is
> > >>> > > > > > > > the drawbacks and the gains. So far, we have been
> > >>> discussing only
> > >>> > > > > > > > hypotheticals, and if we continue to do so, I think it
> is
> > >>> likely no
> > >>> > > > > > ground
> > >>> > > > > > > > will be gained.
> > >>> > > > > > > >
> > >>> > > > > > > > After all, what we seek is optimization, and
> performance
> > >>> benchmarks
> > >>> > > > > > > will be
> > >>> > > > > > > > mandatory for a KIP of this nature.
> > >>> > > > > > > >
> > >>> > > > > > > > Hope this helps,
> > >>> > > > > > > > Richard
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John Roesler <
> > >>> vvcep...@apache.org>
> > >>> > > > > > wrote:
> > >>> > > > > > > >
> > >>> > > > > > > >> Hi again, all,
> > >>> > > > > > > >>
> > >>> > > > > > > >> Sorry on my part for my silence.
> > >>> > > > > > > >>
> > >>> > > > > > > >> I've just taken another look over the recent history
> of
> > >>> this
> > >>> > > > > > > discussion. It
> > >>> > > > > > > >> seems like the #1 point to clarify (because it affect
> > >>> everything
> > >>> > > > > > else) is
> > >>> > > > > > > >> that,
> > >>> > > > > > > >> yes, I was 100% envisioning this as an _optimization_.
> > >>> > > > > > > >>
> > >>> > > > > > > >> As a consequence, I don't think it's critical to make
> any
> > >>> hard
> > >>> > > > > > guarantees
> > >>> > > > > > > >> about
> > >>> > > > > > > >> what results get forwarded and what (no-op updates)
> get
> > >>> dropped. I'd
> > >>> > > > > > > >> initially
> > >>> > > > > > > >> just been thinking about doing this opportunistically
> in
> > >>> cases where
> > >>> > > > > > we
> > >>> > > > > > > >> already
> > >>> > > > > > > >> had the "old" and "new" result in memory, thanks to a
> > >>> request to
> > >>> > > > > > > "emit old
> > >>> > > > > > > >> values", or to the implementation of timestamp
> semantics.
> > >>> > > > > > > >>
> > >>> > > > > > > >> However, whether or not it's semantically critical, I
> do
> > >>> think that
> > >>> > > > > > > >> Matthias's
> > >>> > > > > > > >> idea to use the change-forwarding mechanism to check
> for
> > >>> no-ops even
> > >>> > > > > > on
> > >>> > > > > > > >> stateless operations is pretty interesting.
> Specifically,
> > >>> this would
> > >>> > > > > > > >> _really_
> > >>> > > > > > > >> let you pare down useless updates by using mapValues
> to
> > >>> strip down
> > >>> > > > > > > records
> > >>> > > > > > > >> only
> > >>> > > > > > > >> to what you really need. However, the dependence on
> the
> > >>> > > > > > implementation of
> > >>> > > > > > > >> equals() is troubling.
> > >>> > > > > > > >>
> > >>> > > > > > > >> It might make sense to table this idea, as well as my
> > >>> complicated
> > >>> > > > > > no-op
> > >>> > > > > > > >> detection algorithm, and initially propose just a
> > >>> nonconfigurable
> > >>> > > > > > feature
> > >>> > > > > > > >> to
> > >>> > > > > > > >> check "old" and "new" results for binary equality
> before
> > >>> forwarding.
> > >>> > > > > > > I.e.,
> > >>> > > > > > > >> if
> > >>> > > > > > > >> any operation determines that the old and new results
> are
> > >>> > > > > > > >> binary-identical, we
> > >>> > > > > > > >> would not forward.
> > >>> > > > > > > >>
> > >>> > > > > > > >> I'll admit that this doesn't serve Tommy's use case
> very
> > >>> well, but it
> > >>> > > > > > > >> might be
> > >>> > > > > > > >> better to take baby steps with an optimization like
> this
> > >>> and not risk
> > >>> > > > > > > >> over-reaching in a way that actually harms
> performance or
> > >>> > > > > > correctness. We
> > >>> > > > > > > >> could
> > >>> > > > > > > >> always expand the feature to use equals() or some
> kind of
> > >>> > > > > > ChangeDetector
> > >>> > > > > > > >> later
> > >>> > > > > > > >> on, in a more focused discussion.
> > >>> > > > > > > >>
> > >>> > > > > > > >> Regarding metrics or debug logs, I guess I don't feel
> > >>> strongly, but it
> > >>> > > > > > > >> feels
> > >>> > > > > > > >> like two things will happen that make it nicer to add
> > >>> them:
> > >>> > > > > > > >>
> > >>> > > > > > > >> 1. This feature is going to surprise/annoy _somebody_,
> > >>> and it would be
> > >>> > > > > > > >> nice to
> > >>> > > > > > > >> be able to definitively say the reason that updates
> are
> > >>> dropped is
> > >>> > > > > > that
> > >>> > > > > > > >> they
> > >>> > > > > > > >> were no-ops. The easiest smoking gun is if there are
> > >>> debug-logs that
> > >>> > > > > > > can be
> > >>> > > > > > > >> enabled. This person might just be looking at the
> > >>> dashboards,
> > >>> > > > > > > wondering why
> > >>> > > > > > > >> there are 100K updates per second going into their
> app,
> > >>> but only 1K
> > >>> > > > > > > >> results per
> > >>> > > > > > > >> second coming out. Having the metric there makes the
> > >>> accounting
> > >>> > > > > > easier.
> > >>> > > > > > > >>
> > >>> > > > > > > >> 2. Somebody is going to struggle with high-volume
> > >>> updates, and it
> > >>> > > > > > > would be
> > >>> > > > > > > >> nice
> > >>> > > > > > > >> for them to know that this feature is saving them
> > >>> X-thousand updates
> > >>> > > > > > per
> > >>> > > > > > > >> second,
> > >>> > > > > > > >> etc.
> > >>> > > > > > > >>
> > >>> > > > > > > >> What does everyone think about this? Note, as I read
> it,
> > >>> what I've
> > >>> > > > > > said
> > >>> > > > > > > >> above is
> > >>> > > > > > > >> already reflected in the text of the KIP.
> > >>> > > > > > > >>
> > >>> > > > > > > >> Thanks,
> > >>> > > > > > > >> -John
> > >>> > > > > > > >>
> > >>> > > > > > > >>
> > >>> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard Yu wrote:
> > >>> > > > > > > >>> Hi all,
> > >>> > > > > > > >>>
> > >>> > > > > > > >>> Bumping this. If you feel that this KIP is not too
> > >>> urgent. Then let
> > >>> > > > > > me
> > >>> > > > > > > >>> know. :)
> > >>> > > > > > > >>>
> > >>> > > > > > > >>> Cheers,
> > >>> > > > > > > >>> Richard
> > >>> > > > > > > >>>
> > >>> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard Yu <
> > >>> > > > > > yohan.richard...@gmail.com>
> > >>> > > > > > > >>> wrote:
> > >>> > > > > > > >>>
> > >>> > > > > > > >>>> Hi all,
> > >>> > > > > > > >>>>
> > >>> > > > > > > >>>> I've had just a few thoughts regarding the
> forwarding
> > >>> of <key,
> > >>> > > > > > > >>>> change<old_value, new_value>>. As Matthias already
> > >>> mentioned, there
> > >>> > > > > > > >> are two
> > >>> > > > > > > >>>> separate priorities by which we can judge this KIP:
> > >>> > > > > > > >>>>
> > >>> > > > > > > >>>> 1. A optimization perspective: In this case, the
> user
> > >>> would prefer
> > >>> > > > > > the
> > >>> > > > > > > >>>> impact of this KIP to be as minimal as possible. By
> > >>> such logic, if
> > >>> > > > > > > >>>> stateless operations are performed twice, that could
> > >>> prove
> > >>> > > > > > > >> unacceptable for
> > >>> > > > > > > >>>> them. (since operations can prove expensive)
> > >>> > > > > > > >>>>
> > >>> > > > > > > >>>> 2. Semantics correctness perspective: Unlike the
> > >>> optimization
> > >>> > > > > > > >> approach, we
> > >>> > > > > > > >>>> are more concerned with all KTable operations
> obeying
> > >>> the same
> > >>> > > > > > emission
> > >>> > > > > > > >>>> policy. i.e. emit on change. In this case, a
> > >>> discrepancy would not
> > >>> > > > > > be
> > >>> > > > > > > >>>> tolerated, even though an extra performance cost
> will
> > >>> be incurred.
> > >>> > > > > > > >>>> Therefore, we will follow Matthias's approach, and
> then
> > >>> perform the
> > >>> > > > > > > >>>> operation once on the old value, and once on the
> new.
> > >>> > > > > > > >>>>
> > >>> > > > > > > >>>> The issue here I think is more black and white than
> in
> > >>> between. The
> > >>> > > > > > > >> second
> > >>> > > > > > > >>>> option in particular would be favorable for users
> with
> > >>> inexpensive
> > >>> > > > > > > >>>> stateless operations, while for the former option,
> we
> > >>> are probably
> > >>> > > > > > > >> dealing
> > >>> > > > > > > >>>> with more expensive ones. So the simplest solution
> is
> > >>> probably to
> > >>> > > > > > > >> allow the
> > >>> > > > > > > >>>> user to choose one of the behaviors, and have a
> config
> > >>> which can
> > >>> > > > > > > >> switch in
> > >>> > > > > > > >>>> between them.
> > >>> > > > > > > >>>>
> > >>> > > > > > > >>>> Its the simplest compromise I can come up with at
> the
> > >>> moment, but if
> > >>> > > > > > > >> you
> > >>> > > > > > > >>>> think you have a better plan which could better
> balance
> > >>> tradeoffs.
> > >>> > > > > > Then
> > >>> > > > > > > >>>> please let us know. :)
> > >>> > > > > > > >>>>
> > >>> > > > > > > >>>> Best,
> > >>> > > > > > > >>>> Richard
> > >>> > > > > > > >>>>
> > >>> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John Roesler <
> > >>> vvcep...@apache.org>
> > >>> > > > > > > >> wrote:
> > >>> > > > > > > >>>>
> > >>> > > > > > > >>>>> Hi all,
> > >>> > > > > > > >>>>>
> > >>> > > > > > > >>>>> Thanks for the thoughtful comments!
> > >>> > > > > > > >>>>>
> > >>> > > > > > > >>>>> I need more time to reflect on your thoughts, but
> just
> > >>> wanted to
> > >>> > > > > > offer
> > >>> > > > > > > >>>>> a quick clarification about equals().
> > >>> > > > > > > >>>>>
> > >>> > > > > > > >>>>> I only meant that we can't be sure if a class's
> > >>> equals()
> > >>> > > > > > > >> implementation
> > >>> > > > > > > >>>>> returns true for two semantically identical
> instances.
> > >>> I.e., if a
> > >>> > > > > > > >> class
> > >>> > > > > > > >>>>> doesn't
> > >>> > > > > > > >>>>> override the default equals() implementation, then
> we
> > >>> would see
> > >>> > > > > > > >> behavior
> > >>> > > > > > > >>>>> like:
> > >>> > > > > > > >>>>>
> > >>> > > > > > > >>>>> new MyPair("A", 1).equals(new MyPair("A", 1))
> returns
> > >>> false
> > >>> > > > > > > >>>>>
> > >>> > > > > > > >>>>> In that case, I would still like to catch no-op
> > >>> updates by
> > >>> > > > > > comparing
> > >>> > > > > > > >> the
> > >>> > > > > > > >>>>> serialized form of the records when we happen to
> have
> > >>> it serialized
> > >>> > > > > > > >> anyway
> > >>> > > > > > > >>>>> (such as when the operation is stateful, or when
> we're
> > >>> sending to a
> > >>> > > > > > > >>>>> repartition topic and we have both the "new" and
> "old"
> > >>> value from
> > >>> > > > > > > >>>>> upstream).
> > >>> > > > > > > >>>>>
> > >>> > > > > > > >>>>> I didn't mean to suggest we'd try to use
> reflection to
> > >>> detect
> > >>> > > > > > whether
> > >>> > > > > > > >>>>> equals
> > >>> > > > > > > >>>>> is implemented, although that is a neat trick. I
> was
> > >>> thinking more
> > >>> > > > > > of
> > >>> > > > > > > >> a
> > >>> > > > > > > >>>>> belt-and-suspenders algorithm where we do the check
> > >>> for no-ops
> > >>> > > > > > based
> > >>> > > > > > > >> on
> > >>> > > > > > > >>>>> equals() and then _also_ check the serialized bytes
> > >>> for equality.
> > >>> > > > > > > >>>>>
> > >>> > > > > > > >>>>> Thanks,
> > >>> > > > > > > >>>>> -John
> > >>> > > > > > > >>>>>
> > >>> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu wrote:
> > >>> > > > > > > >>>>>> Thanks for the comments, Matthias.
> > >>> > > > > > > >>>>>>
> > >>> > > > > > > >>>>>> w.r.t. requirement of an `equals()`
> implementation,
> > >>> each template
> > >>> > > > > > > >> type
> > >>> > > > > > > >>>>>> would have an equals() method. We can use the
> > >>> following code to
> > >>> > > > > > know
> > >>> > > > > > > >>>>>> whether it is provided by JVM or provided by user.
> > >>> > > > > > > >>>>>>
> > >>> > > > > > > >>>>>> boolean customEquals = false;
> > >>> > > > > > > >>>>>> try {
> > >>> > > > > > > >>>>>>     Class cls =
> value.getClass().getMethod("equals",
> > >>> > > > > > > >>>>>> Object.class).getDeclaringClass();
> > >>> > > > > > > >>>>>>     if (!Object.class.equals(cls)) {
> > >>> > > > > > > >>>>>>         customEquals = true;
> > >>> > > > > > > >>>>>>     }
> > >>> > > > > > > >>>>>> } catch (NoSuchMethodException nsme) {
> > >>> > > > > > > >>>>>>     // equals is always defined, this wouldn't hit
> > >>> > > > > > > >>>>>> }
> > >>> > > > > > > >>>>>>
> > >>> > > > > > > >>>>>> The next question is: what if the user doesn't
> > >>> provide equals()
> > >>> > > > > > > >> method ?
> > >>> > > > > > > >>>>>> Would we automatically fall back to
> emit-on-update ?
> > >>> > > > > > > >>>>>>
> > >>> > > > > > > >>>>>> Cheers
> > >>> > > > > > > >>>>>>
> > >>> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM Matthias J. Sax <
> > >>> mj...@apache.org>
> > >>> > > > > > > >>>>> wrote:
> > >>> > > > > > > >>>>>>
> > >>> > > > > > > > First a high level comment:
> > >>> > > > > > > >
> > >>> > > > > > > > Overall, I would like to make one step back, and make
> sure
> > >>> we are
> > >>> > > > > > > > discussion on the same level. Originally, I understood
> > >>> this KIP
> > >>> > > > > > > >>> as a
> > >>> > > > > > > > proposed change of _semantics_, however, given the
> latest
> > >>> > > > > > > >>> discussion
> > >>> > > > > > > > it seems it's actually not -- it's more an
> _optimization_
> > >>> > > > > > > >>> proposal.
> > >>> > > > > > > > Hence, we only need to make sure that this optimization
> > >>> does not
> > >>> > > > > > > >>> break
> > >>> > > > > > > > existing semantics. It this the right way to think
> about
> > >>> it?
> > >>> > > > > > > >
> > >>> > > > > > > > If yes, than it might actually be ok to have different
> > >>> behavior
> > >>> > > > > > > > depending if there is a materialized KTable or not. So
> > >>> far, we
> > >>> > > > > > > >>> never
> > >>> > > > > > > > defined a public contract about our emit strategy and
> it
> > >>> seems
> > >>> > > > > > > >>> this
> > >>> > > > > > > > KIP does not define one either.
> > >>> > > > > > > >
> > >>> > > > > > > > Hence, I don't have as strong of an opinion about
> sending
> > >>> > > > > > > >>> oldValues
> > >>> > > > > > > > for example any longer. I guess the question is really,
> > >>> what can
> > >>> > > > > > > >>> we
> > >>> > > > > > > > implement in a reasonable way.
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > > Other comments:
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > > @Richard:
> > >>> > > > > > > >
> > >>> > > > > > > > Can you please add the KIP to the KIP overview table:
> It's
> > >>> missing
> > >>> > > > > > > > (
> > >>> > > > > > > >>>>>>
> > >>> > > > > > > >>>
> > >>> > > > > >
> > >>>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro
> > >>> > > > > > > > posals).
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > > @Bruno:
> > >>> > > > > > > >
> > >>> > > > > > > > You mentioned caching. I think it's irrelevant
> > >>> (orthogonal) and
> > >>> > > > > > > >>> we can
> > >>> > > > > > > > discuss this KIP without considering it.
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > > @John:
> > >>> > > > > > > >
> > >>> > > > > > > >>>>>>>>> Even in the source table, we forward the
> updated
> > >>> record with
> > >>> > > > > > the
> > >>> > > > > > > >>>>>>>>> higher of the two timestamps. So the example is
> > >>> more like:
> > >>> > > > > > > >
> > >>> > > > > > > > That is not correct. Currently, we forward with the
> smaller
> > >>> > > > > > > > out-of-order timestamp (changing the timestamp would
> > >>> corrupt the
> > >>> > > > > > > >>> data
> > >>> > > > > > > > -- we don't know, because we don't check, if the value
> is
> > >>> the
> > >>> > > > > > > >>> same
> > >>> > > > > > > >>>>>> or
> > >>> > > > > > > > a different one, hence, we must emit the out-of-order
> > >>> record
> > >>> > > > > > > >>> as-is).
> > >>> > > > > > > >
> > >>> > > > > > > > If we start to do emit-on-change, we also need to emit
> a
> > >>> new
> > >>> > > > > > > >>> record if
> > >>> > > > > > > > the timestamp changes due to out-of-order data, hence,
> we
> > >>> would
> > >>> > > > > > > >>> still
> > >>> > > > > > > > need to emit <K,V,T1> because that give us correct
> > >>> semantics:
> > >>> > > > > > > >>> assume
> > >>> > > > > > > > you have a filter() and afterward use the filter
> KTable in
> > >>> a
> > >>> > > > > > > > stream-table join -- the lower T1 timestamp must be
> > >>> propagated to
> > >>> > > > > > > >>> the
> > >>> > > > > > > > filtered KTable to ensure that that the stream-table
> join
> > >>> compute
> > >>> > > > > > > >>> the
> > >>> > > > > > > > correct result.
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > > Your point about requiring an `equals()`
> implementation is
> > >>> > > > > > > >>> actually a
> > >>> > > > > > > > quite interesting one and boils down to my statement
> from
> > >>> above
> > >>> > > > > > > >>> about
> > >>> > > > > > > > "what can we actually implement". What I don't
> understand
> > >>> is:
> > >>> > > > > > > >
> > >>> > > > > > > >>>>>>>>> This way, we still don't have to rely on the
> > >>> existence of an
> > >>> > > > > > > >>>>>>>>> equals() method, but if it is there, we can
> > >>> benefit from it.
> > >>> > > > > > > >
> > >>> > > > > > > > Your bullet point (2) says it uses `equals()` --
> hence, it
> > >>> seems
> > >>> > > > > > > >>> we
> > >>> > > > > > > > actually to rely on it? Also, how can we detect if
> there
> > >>> is an
> > >>> > > > > > > > `equals()` method to do the comparison? Would be fail
> if
> > >>> we don't
> > >>> > > > > > > >>> have
> > >>> > > > > > > > `equals()` nor corresponding serializes to do the
> > >>> comparison?
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we absolutely need
> > >>> metrics and
> > >>> > > > > > > >>> logs if
> > >>> > > > > > > >>>>>>>>> we're going to drop any records. And, yes, we
> > >>> should propose
> > >>> > > > > > > >>>>>>>>> metrics and logs that are similar to the
> existing
> > >>> ones when we
> > >>> > > > > > > >>> drop
> > >>> > > > > > > >>>>>>>>> records for other reasons.
> > >>> > > > > > > >
> > >>> > > > > > > > I am not sure about this point. In fact, we have
> already
> > >>> some
> > >>> > > > > > > >>> no-ops
> > >>> > > > > > > > in Kafka Streams in our join-operators and don't report
> > >>> any of
> > >>> > > > > > > >>> those
> > >>> > > > > > > > either. Emit-on-change is operator semantics and I
> don't
> > >>> see why
> > >>> > > > > > > >>> we
> > >>> > > > > > > > would need to have a metric for it? It seems to be
> quite
> > >>> different
> > >>> > > > > > > > compared to dropping late or malformed records.
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > > -Matthias
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote:
> > >>> > > > > > > >>>>>>>>> Thanks John for your thoughtful reply. Some
> > >>> comments inline.
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51 -0600, John Roesler
> > >>> wrote:
> > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was
> sent
> > >>> from outside
> > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments
> > >>> unless you
> > >>> > > > > > expected
> > >>> > > > > > > >>>>>>>>>> them. ________________________________
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> Hi Tommy,
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> Thanks for the context. I can see the
> attraction
> > >>> of
> > >>> > > > > > considering
> > >>> > > > > > > >>>>>>>>>> these use cases together.
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> To answer your question, if a part of the
> record
> > >>> is not
> > >>> > > > > > > >>> relevant
> > >>> > > > > > > >>>>>>>>>> to downstream consumers, I was thinking you
> could
> > >>> just use a
> > >>> > > > > > > >>>>>>>>>> mapValue to remove it.
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a join between
> two
> > >>> tables.
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> employeeInfo.join( employeePayroll, (info,
> > >>> payroll) -> new
> > >>> > > > > > > >>>>>>>>>> Result(info.name(), payroll.salary()) )
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> We only care about one attribute from the Info
> > >>> table (name),
> > >>> > > > > > > >>> and
> > >>> > > > > > > >>>>>>>>>> one from the Payroll table (salary), and these
> > >>> attributes
> > >>> > > > > > > >>> change
> > >>> > > > > > > >>>>>>>>>> rarely. On the other hand, there might be many
> > >>> other
> > >>> > > > > > attributes
> > >>> > > > > > > >>>>>>>>>> that change frequently of these tables. We can
> > >>> avoid
> > >>> > > > > > triggering
> > >>> > > > > > > >>>>>>>>>> the join unnecessarily by mapping the input
> > >>> tables to drop the
> > >>> > > > > > > >>>>>>>>>> unnecessary information before the join:
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> names = employeeInfo.mapValues(info ->
> info.name())
> > >>> salaries
> > >>> > > > > > =
> > >>> > > > > > > >>>>>>>>>> employeePayroll.mapValues(payroll ->
> > >>> payroll.salary())
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> names.join( salaries, (name, salary) -> new
> > >>> Result(name,
> > >>> > > > > > > >>> salary)
> > >>> > > > > > > >>>>>>>>>> )
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> Ahh yes I see. This works, but in the case
> where
> > >>> you're using
> > >>> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it seems like
> this
> > >>> approach
> > >>> > > > > > could
> > >>> > > > > > > >>>>>>>>> lead to a proliferation of "skinny" record
> types
> > >>> that just drop
> > >>> > > > > > > >>>>>>>>> various fields.
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> Especially if we take Matthias's idea to drop
> > >>> non-changes even
> > >>> > > > > > > >>>>>>>>>> for stateless operations, this would be quite
> > >>> efficient and is
> > >>> > > > > > > >>>>>>>>>> also a very straightforward optimization to
> > >>> understand once
> > >>> > > > > > you
> > >>> > > > > > > >>>>>>>>>> know that Streams provides emit-on-change.
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> From the context that you provided, it seems
> like
> > >>> a slightly
> > >>> > > > > > > >>>>>>>>>> different situation, though. Reading between
> the
> > >>> lines a
> > >>> > > > > > > >>> little,
> > >>> > > > > > > >>>>>>>>>> it sounds like: in contrast to the example
> above,
> > >>> in which we
> > >>> > > > > > > >>> are
> > >>> > > > > > > >>>>>>>>>> filtering out extra _data_, you have some
> extra
> > >>> _metadata_
> > >>> > > > > > that
> > >>> > > > > > > >>>>>>>>>> you still wish to pass down with the data when
> > >>> there is a
> > >>> > > > > > > >>> "real"
> > >>> > > > > > > >>>>>>>>>> update, but you don't want the metadata
> itself to
> > >>> cause an
> > >>> > > > > > > >>>>>>>>>> update.
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> Despite my lack of clarity, yes you've got it
> > >>> right ;) This
> > >>> > > > > > > >>>>>>>>> particular processor is the first stop for this
> > >>> data after
> > >>> > > > > > > >>> coming
> > >>> > > > > > > >>>>>>>>> in from external users, who often simply post
> the
> > >>> same content
> > >>> > > > > > > >>> each
> > >>> > > > > > > >>>>>>>>> time and we're trying to shield downstream
> > >>> consumers from
> > >>> > > > > > > >>>>>>>>> unnecessary churn.
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> It does seem handy to be able to plug in a
> custom
> > >>> > > > > > > >>> ChangeDetector
> > >>> > > > > > > >>>>>>>>>> for this purpose, but I worry about the API
> > >>> complexity. Maybe
> > >>> > > > > > > >>> you
> > >>> > > > > > > >>>>>>>>>> can help think though how to provide the same
> > >>> benefit while
> > >>> > > > > > > >>>>>>>>>> limiting user-facing complexity.
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> Here's some extra context to consider:
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> We currently don't make any extra requirements
> > >>> about the
> > >>> > > > > > nature
> > >>> > > > > > > >>>>>>>>>> of data that you can use in Streams. For
> example,
> > >>> you don't
> > >>> > > > > > > >>> have
> > >>> > > > > > > >>>>>>>>>> to implement hashCode and equals, or
> compareTo,
> > >>> etc. With the
> > >>> > > > > > > >>>>>>>>>> current proposal, we can do an airtight
> > >>> comparison based only
> > >>> > > > > > > >>> on
> > >>> > > > > > > >>>>>>>>>> the serialized form of the values, and we
> > >>> actually don't have
> > >>> > > > > > > >>> to
> > >>> > > > > > > >>>>>>>>>> deserialize the "prior" value at all for a
> large
> > >>> number of
> > >>> > > > > > > >>>>>>>>>> operations. Admitedly, if we extend the
> proposal
> > >>> to include
> > >>> > > > > > > >>> no-op
> > >>> > > > > > > >>>>>>>>>> detection for stateless operations, we'd
> probably
> > >>> need to rely
> > >>> > > > > > > >>> on
> > >>> > > > > > > >>>>>>>>>> equals() for no-op checking, otherwise we'd
> wind
> > >>> up requiring
> > >>> > > > > > > >>>>>>>>>> serdes for stateless operations as well.
> > >>> Actually, I'd
> > >>> > > > > > probably
> > >>> > > > > > > >>>>>>>>>> argue for doing exactly that:
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> 1. In stateful operations, drop if the
> serialized
> > >>> byte[]s are
> > >>> > > > > > > >>> the
> > >>> > > > > > > >>>>>>>>>> same. After deserializing, also drop if the
> > >>> objects are equal
> > >>> > > > > > > >>>>>>>>>> according to Object#equals().
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> 2. In stateless operations, compare the "new"
> and
> > >>> "old" values
> > >>> > > > > > > >>>>>>>>>> (if "old" is available) based on
> Object#equals().
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> 3. As a final optimization, after serializing
> and
> > >>> before
> > >>> > > > > > > >>> sending
> > >>> > > > > > > >>>>>>>>>> repartition records, compare the serialized
> data
> > >>> and drop
> > >>> > > > > > > >>>>>>>>>> no-ops.
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> This way, we still don't have to rely on the
> > >>> existence of an
> > >>> > > > > > > >>>>>>>>>> equals() method, but if it is there, we can
> > >>> benefit from it.
> > >>> > > > > > > >>>>>>>>>> Also, we don't require a serde in any new
> > >>> situations, but we
> > >>> > > > > > > >>> can
> > >>> > > > > > > >>>>>>>>>> still leverage it when it is available.
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> For clarity, in my example above, even if the
> > >>> employeeInfo and
> > >>> > > > > > > >>>>>>>>>> employeePayroll and Result records all have
> > >>> serdes, we need
> > >>> > > > > > the
> > >>> > > > > > > >>>>>>>>>> "name" field (presumably String) and the
> "salary"
> > >>> field
> > >>> > > > > > > >>>>>>>>>> (presumable a Double) to have serdes as well
> in
> > >>> the naive
> > >>> > > > > > > >>>>>>>>>> implementation. But if we can leverage
> equals(),
> > >>> then the
> > >>> > > > > > > >>> "right
> > >>> > > > > > > >>>>>>>>>> thing" happens automatically.
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> I still don't totally follow why the individual
> > >>> components
> > >>> > > > > > > >>> (name,
> > >>> > > > > > > >>>>>>>>> salary) would have to have serdes here. If
> Result
> > >>> has one, we
> > >>> > > > > > > >>>>>>>>> compare bytes, and if Result additionally has
> an
> > >>> equals()
> > >>> > > > > > method
> > >>> > > > > > > >>>>>>>>> (which presumably includes equals comparisons
> on
> > >>> the
> > >>> > > > > > constituent
> > >>> > > > > > > >>>>>>>>> fields), have we not covered our bases?
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> This dovetails in with my primary UX concern;
> > >>> where would the
> > >>> > > > > > > >>>>>>>>>> ChangeDetector actually be registered? None of
> > >>> the operators
> > >>> > > > > > in
> > >>> > > > > > > >>>>>>>>>> my example have names or topics or any other
> > >>> identifiable
> > >>> > > > > > > >>>>>>>>>> characteristic that could be passed to a
> > >>> ChangeDetector class
> > >>> > > > > > > >>>>>>>>>> registered via config. You could say that we
> make
> > >>> > > > > > > >>> ChangeDetector
> > >>> > > > > > > >>>>>>>>>> an optional parameter to every operation in
> > >>> Streams, but this
> > >>> > > > > > > >>>>>>>>>> seems to carry quite a bit of mental burden
> with
> > >>> it. People
> > >>> > > > > > > >>> will
> > >>> > > > > > > >>>>>>>>>> wonder what it's for and whether or not they
> > >>> should be using
> > >>> > > > > > > >>> it.
> > >>> > > > > > > >>>>>>>>>> There would almost certainly be a
> misconception
> > >>> that it's
> > >>> > > > > > > >>>>>>>>>> preferable to implement it always, which
> would be
> > >>> unfortunate.
> > >>> > > > > > > >>>>>>>>>> Plus, to actually implment metadata flowing
> > >>> through the
> > >>> > > > > > > >>> topology
> > >>> > > > > > > >>>>>>>>>> as in your use case, you'd have to do two
> things:
> > >>> 1. make sure
> > >>> > > > > > > >>>>>>>>>> that all operations actually preserve the
> > >>> metadata alongside
> > >>> > > > > > > >>> the
> > >>> > > > > > > >>>>>>>>>> data (e.g., don't accidentally add a mapValues
> > >>> like I did, or
> > >>> > > > > > > >>> you
> > >>> > > > > > > >>>>>>>>>> drop the metadata). 2. implement a
> ChangeDetector
> > >>> for every
> > >>> > > > > > > >>>>>>>>>> single operation in the topology, or you don't
> > >>> get the benefit
> > >>> > > > > > > >>> of
> > >>> > > > > > > >>>>>>>>>> dropping non-changes internally 2b.
> > >>> Alternatively, you could
> > >>> > > > > > > >>> just
> > >>> > > > > > > >>>>>>>>>> add the ChangeDetector to one operation toward
> > >>> the end of the
> > >>> > > > > > > >>>>>>>>>> topology. This would not drop redundant
> > >>> computation
> > >>> > > > > > internally,
> > >>> > > > > > > >>>>>>>>>> but only drop redundant _outputs_. But this is
> > >>> just about the
> > >>> > > > > > > >>>>>>>>>> same as your current solution.
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> I definitely see your point regarding
> > >>> configuration. I was
> > >>> > > > > > > >>>>>>>>> originally thinking about this when the
> > >>> deduplication was going
> > >>> > > > > > > >>> to
> > >>> > > > > > > >>>>>>>>> be opt-in, and it seemed very natural to say
> > >>> something like:
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> employeeInfo.join(employeePayroll, (info,
> payroll)
> > >>> -> new
> > >>> > > > > > > >>>>>>>>> Result(info.name(), payroll.salary()))
> > >>> > > > > > > >>>>>>>>>
> > >>> .suppress(duplicatesAccordingTo(someChangeDetector))
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> Alternatively you can imagine a similar method
> > >>> being on
> > >>> > > > > > > >>>>>>>>> Materialized, though obviously this makes less
> > >>> sense if we
> > >>> > > > > > don't
> > >>> > > > > > > >>>>>>>>> want to require materialization. If we're now
> > >>> talking about
> > >>> > > > > > > >>>>>>>>> changing the default behavior and not having
> any
> > >>> configuration
> > >>> > > > > > > >>>>>>>>> options, it's harder to find a place for this.
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> A final thought; if it really is a metadata
> > >>> question, can we
> > >>> > > > > > > >>> just
> > >>> > > > > > > >>>>>>>>>> plan to finish up the support for headers in
> > >>> Streams? I.e.,
> > >>> > > > > > > >>> give
> > >>> > > > > > > >>>>>>>>>> you a way to control the way that headers flow
> > >>> through the
> > >>> > > > > > > >>>>>>>>>> topology? Then, we could treat headers the
> same
> > >>> way we treat
> > >>> > > > > > > >>>>>>>>>> timestamps in the no-op checking... We
> completely
> > >>> ignore them
> > >>> > > > > > > >>>>>>>>>> for the sake of comparison. Thus, neither the
> > >>> timestamp nor
> > >>> > > > > > the
> > >>> > > > > > > >>>>>>>>>> headers would get updated in internal state
> or in
> > >>> downstream
> > >>> > > > > > > >>>>>>>>>> views as long as the value itself doesn't
> change.
> > >>> This seems
> > >>> > > > > > to
> > >>> > > > > > > >>>>>>>>>> give us a way to support your use case without
> > >>> adding to the
> > >>> > > > > > > >>>>>>>>>> mental overhead of using Streams for simple
> > >>> things.
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> Agree headers could be a decent fit for this
> > >>> particular case
> > >>> > > > > > > >>>>>>>>> because it's mostly metadata, though to be
> honest
> > >>> we haven't
> > >>> > > > > > > >>> looked
> > >>> > > > > > > >>>>>>>>> at headers much (mostly because, and to your
> > >>> point, support
> > >>> > > > > > > >>> seems
> > >>> > > > > > > >>>>>>>>> to be lacking). I feel like there would be
> other
> > >>> cases where
> > >>> > > > > > > >>> this
> > >>> > > > > > > >>>>>>>>> feature could be valuable, but I admit I can't
> > >>> come up with
> > >>> > > > > > > >>>>>>>>> anything right this second. Perhaps yuzhihong
> had
> > >>> an example in
> > >>> > > > > > > >>>>>>>>> mind?
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> I.e., simple things should be easy, and
> complex
> > >>> things should
> > >>> > > > > > > >>> be
> > >>> > > > > > > >>>>>>>>>> possible.
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> What are your thoughts? Thanks, -John
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19, Thomas Becker
> > >>> wrote:
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> Hi John, Can you describe how you'd use
> > >>> filtering/mapping to
> > >>> > > > > > > >>>>>>>>>> deduplicate records? To give some background
> on
> > >>> my suggestion
> > >>> > > > > > > >>> we
> > >>> > > > > > > >>>>>>>>>> currently have a small stream processor that
> > >>> exists solely to
> > >>> > > > > > > >>>>>>>>>> deduplicate, which we do using a process that
> I
> > >>> assume would
> > >>> > > > > > be
> > >>> > > > > > > >>>>>>>>>> similar to what would be done here (with a
> store
> > >>> of keys and
> > >>> > > > > > > >>> hash
> > >>> > > > > > > >>>>>>>>>> values). But the records we are deduplicating
> > >>> have some
> > >>> > > > > > > >>> metadata
> > >>> > > > > > > >>>>>>>>>> fields (such as timestamps of when the record
> was
> > >>> posted) that
> > >>> > > > > > > >>> we
> > >>> > > > > > > >>>>>>>>>> don't consider semantically meaningful for
> > >>> downstream
> > >>> > > > > > > >>> consumers,
> > >>> > > > > > > >>>>>>>>>> and therefore we also suppress updates that
> only
> > >>> touch those
> > >>> > > > > > > >>>>>>>>>> fields.
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> -Tommy
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30 -0600, John
> Roesler
> > >>> wrote:
> > >>> > > > > > > >>> [EXTERNAL
> > >>> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was sent from
> > >>> outside TiVo. DO
> > >>> > > > > > NOT
> > >>> > > > > > > >>>>>>>>>> CLICK any links or attachments unless you
> > >>> expected them.
> > >>> > > > > > > >>>>>>>>>> ________________________________
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s an interesting
> > >>> idea. Can you
> > >>> > > > > > > >>> help
> > >>> > > > > > > >>>>>>>>>> think of a use case that isn’t also served by
> > >>> filtering or
> > >>> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for helping to
> design
> > >>> this feature!
> > >>> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at 18:56,
> > >>> yuzhih...@gmail.com
> > >>> > > > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com> wrote: I think
> this
> > >>> is good
> > >>> > > > > > > >>> idea. On
> > >>> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas Becker <
> > >>> > > > > > > >>> thomas.bec...@tivo.com
> > >>> > > > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com>> wrote: How
> do
> > >>> folks feel
> > >>> > > > > > > >>> about
> > >>> > > > > > > >>>>>>>>>> allowing the mechanism by which no-ops are
> > >>> detected to be
> > >>> > > > > > > >>>>>>>>>> pluggable? Meaning use something like a hash
> by
> > >>> default, but
> > >>> > > > > > > >>> you
> > >>> > > > > > > >>>>>>>>>> could optionally provide an implementation of
> > >>> something to use
> > >>> > > > > > > >>>>>>>>>> instead, like a ChangeDetector. This could be
> > >>> useful for
> > >>> > > > > > > >>> example
> > >>> > > > > > > >>>>>>>>>> to ignore changes to certain fields, which may
> > >>> not be relevant
> > >>> > > > > > > >>> to
> > >>> > > > > > > >>>>>>>>>> the operation being performed.
> > >>> > > > > > ________________________________
> > >>> > > > > > > >>>>>>>>>> From: John Roesler <vvcep...@apache.org
> > >>> > > > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>> Sent: Friday,
> > >>> January 31, 2020
> > >>> > > > > > > >>> 4:51
> > >>> > > > > > > >>>>>>>>>> PM To: dev@kafka.apache.org <mailto:
> > >>> dev@kafka.apache.org>
> > >>> > > > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto:
> > >>> dev@kafka.apache.org>> Subject:
> > >>> > > > > > > >>> Re:
> > >>> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change support for
> Kafka
> > >>> Streams
> > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This email was
> sent
> > >>> from outside
> > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or attachments
> > >>> unless you
> > >>> > > > > > expected
> > >>> > > > > > > >>>>>>>>>> them. ________________________________
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> Hello all, Sorry for my silence. It seems
> like we
> > >>> are getting
> > >>> > > > > > > >>>>>>>>>> close to consensus. Hopefully, we could move
> to a
> > >>> vote soon!
> > >>> > > > > > > >>> All
> > >>> > > > > > > >>>>>>>>>> of the reasoning from Matthias and Bruno
> around
> > >>> timestamp is
> > >>> > > > > > > >>>>>>>>>> compelling. I would be strongly in favor of
> > >>> stating a few
> > >>> > > > > > > >>> things
> > >>> > > > > > > >>>>>>>>>> very clearly in the KIP: 1. Streams will drop
> > >>> no-op updates
> > >>> > > > > > > >>> only
> > >>> > > > > > > >>>>>>>>>> for KTable operations. That is, we won't make
> any
> > >>> changes to
> > >>> > > > > > > >>>>>>>>>> KStream aggregations at the moment. It does
> seem
> > >>> like we can
> > >>> > > > > > > >>>>>>>>>> potentially revisit the time semantics of that
> > >>> operation in
> > >>> > > > > > the
> > >>> > > > > > > >>>>>>>>>> future, but we don't need to do it now. On the
> > >>> other hand, the
> > >>> > > > > > > >>>>>>>>>> proposed semantics for KTable timestamps
> (marking
> > >>> the
> > >>> > > > > > beginning
> > >>> > > > > > > >>>>>>>>>> of the validity of that record) makes sense to
> > >>> me. 2. Streams
> > >>> > > > > > > >>>>>>>>>> will only drop no-op updates for _stateful_
> > >>> KTable operations.
> > >>> > > > > > > >>> We
> > >>> > > > > > > >>>>>>>>>> don't want to add a hard guarantee that
> Streams
> > >>> will _never_
> > >>> > > > > > > >>>>>>>>>> emit a no-op table update because it would
> > >>> require adding
> > >>> > > > > > state
> > >>> > > > > > > >>>>>>>>>> to otherwise stateless operations. If someone
> is
> > >>> really
> > >>> > > > > > > >>> concerned
> > >>> > > > > > > >>>>>>>>>> about a particular stateless operation
> producing
> > >>> a lot of
> > >>> > > > > > no-op
> > >>> > > > > > > >>>>>>>>>> results, all they have to do is materialize
> it,
> > >>> and Streams
> > >>> > > > > > > >>> would
> > >>> > > > > > > >>>>>>>>>> automatically drop the no-ops. Additionally,
> I'm
> > >>> +1 on not
> > >>> > > > > > > >>> adding
> > >>> > > > > > > >>>>>>>>>> an opt-out at this time. Regarding the KIP
> > >>> itself, I would
> > >>> > > > > > > >>> clean
> > >>> > > > > > > >>>>>>>>>> it up a bit before calling for a vote. There
> is a
> > >>> lot of
> > >>> > > > > > > >>>>>>>>>> "discussion"-type language there, which is
> very
> > >>> natural to
> > >>> > > > > > > >>> read,
> > >>> > > > > > > >>>>>>>>>> but makes it a bit hard to see what _exactly_
> the
> > >>> kip is
> > >>> > > > > > > >>>>>>>>>> proposing. Richard, would you mind just making
> > >>> the "proposed
> > >>> > > > > > > >>>>>>>>>> behavior change" a simple and succinct list of
> > >>> bullet points?
> > >>> > > > > > > >>>>>>>>>> I.e., please drop glue phrases like "there has
> > >>> been some
> > >>> > > > > > > >>>>>>>>>> discussion" or "possibly we could do X". For
> the
> > >>> final version
> > >>> > > > > > > >>> of
> > >>> > > > > > > >>>>>>>>>> the KIP, it should just say, "Streams will do
> X,
> > >>> Streams will
> > >>> > > > > > > >>> do
> > >>> > > > > > > >>>>>>>>>> Y". Feel free to add an elaboration section to
> > >>> explain more
> > >>> > > > > > > >>> about
> > >>> > > > > > > >>>>>>>>>> what X and Y mean, but we don't need to talk
> about
> > >>> > > > > > > >>> possibilities
> > >>> > > > > > > >>>>>>>>>> or alternatives except in the "rejected
> > >>> alternatives" section.
> > >>> > > > > > > >>>>>>>>>> Accordingly, can you also move the options you
> > >>> presented in
> > >>> > > > > > the
> > >>> > > > > > > >>>>>>>>>> intro to the "rejected alternatives" section
> and
> > >>> only mention
> > >>> > > > > > > >>> the
> > >>> > > > > > > >>>>>>>>>> final proposal itself? This just really helps
> > >>> reviewers to
> > >>> > > > > > know
> > >>> > > > > > > >>>>>>>>>> what they are voting for, and it helps
> everyone
> > >>> after the fact
> > >>> > > > > > > >>>>>>>>>> when they are trying to get clarity on what
> > >>> exactly the
> > >>> > > > > > > >>> proposal
> > >>> > > > > > > >>>>>>>>>> is, versus all the things it could have been.
> > >>> Thanks, -John
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14, Richard Yu
> wrote:
> > >>> Hello to
> > >>> > > > > > all,
> > >>> > > > > > > >>>>>>>>>> I've finished making some initial
> modifications
> > >>> to the KIP. I
> > >>> > > > > > > >>>>>>>>>> have decided to keep the implementation
> section
> > >>> in the KIP for
> > >>> > > > > > > >>>>>>>>>> record-keeping purposes. For now, we should
> focus
> > >>> on only the
> > >>> > > > > > > >>>>>>>>>> proposed behavior changes instead. See if you
> > >>> have any
> > >>> > > > > > > >>> comments!
> > >>> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25, 2020 at 11:12
> AM
> > >>> Richard Yu
> > >>> > > > > > > >>>>>>>>>> <yohan.richard...@gmail.com <mailto:
> > >>> > > > > > yohan.richard...@gmail.com
> > >>> > > > > > > >>>>>
> > >>> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all the discussion!
> > >>> @John and @Bruno
> > >>> > > > > > > >>> I
> > >>> > > > > > > >>>>>>>>>> will survey other possible systems and see
> what I
> > >>> can do. Just
> > >>> > > > > > > >>> a
> > >>> > > > > > > >>>>>>>>>> question, by systems, I suppose you would mean
> > >>> the pros and
> > >>> > > > > > > >>> cons
> > >>> > > > > > > >>>>>>>>>> of different reporting strategies? I'm not
> > >>> completely certain
> > >>> > > > > > > >>> on
> > >>> > > > > > > >>>>>>>>>> this point, so it would be great if you can
> > >>> clarify on that.
> > >>> > > > > > So
> > >>> > > > > > > >>>>>>>>>> here's what I got from all the discussion so
> far:
> > >>> - Since both
> > >>> > > > > > > >>>>>>>>>> Matthias and John seems to have come to a
> > >>> consensus on this,
> > >>> > > > > > > >>> then
> > >>> > > > > > > >>>>>>>>>> we will go for an all-round behavorial change
> for
> > >>> KTables.
> > >>> > > > > > > >>> After
> > >>> > > > > > > >>>>>>>>>> some thought, I decided that for now, an
> opt-out
> > >>> config will
> > >>> > > > > > > >>> not
> > >>> > > > > > > >>>>>>>>>> be added. As John have pointed out, no-op
> changes
> > >>> tend to
> > >>> > > > > > > >>>>>>>>>> explode further down the topology as they are
> > >>> forwarded to
> > >>> > > > > > more
> > >>> > > > > > > >>>>>>>>>> and more processor nodes downstream. - About
> > >>> using hash codes,
> > >>> > > > > > > >>>>>>>>>> after some explanation from John, it looks
> like
> > >>> hash codes
> > >>> > > > > > > >>> might
> > >>> > > > > > > >>>>>>>>>> not be as ideal (for implementation). For
> now, we
> > >>> will omit
> > >>> > > > > > > >>> that
> > >>> > > > > > > >>>>>>>>>> detail, and save it for the PR. - @Bruno You
> do
> > >>> have valid
> > >>> > > > > > > >>>>>>>>>> concerns. Though, I am not completely certain
> if
> > >>> we want to do
> > >>> > > > > > > >>>>>>>>>> emit-on-change only for materialized KTables.
> I
> > >>> will put it
> > >>> > > > > > > >>> down
> > >>> > > > > > > >>>>>>>>>> in the KIP regardless. I will do my best to
> > >>> address all points
> > >>> > > > > > > >>>>>>>>>> raised so far on the discussion. Hope we could
> > >>> keep this
> > >>> > > > > > going!
> > >>> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24, 2020 at 6:07 PM
> > >>> Bruno Cadonna
> > >>> > > > > > > >>>>>>>>>> <br...@confluent.io <mailto:
> br...@confluent.io>>
> > >>> wrote: Thank
> > >>> > > > > > > >>> you
> > >>> > > > > > > >>>>>>>>>> Matthias for the use cases! Looking at both
> use
> > >>> cases, I think
> > >>> > > > > > > >>>>>>>>>> you need to elaborate on them in the KIP,
> > >>> Richard. Emit from
> > >>> > > > > > > >>>>>>>>>> plain KTable: I agree with Matthias that the
> > >>> lower timestamp
> > >>> > > > > > > >>>>>>>>>> makes sense because it marks the start of the
> > >>> validity of the
> > >>> > > > > > > >>>>>>>>>> record. Idempotent records with a higher
> > >>> timestamp can be
> > >>> > > > > > > >>> safely
> > >>> > > > > > > >>>>>>>>>> ignored. A corner case that I discussed with
> > >>> Matthias offline
> > >>> > > > > > > >>> is
> > >>> > > > > > > >>>>>>>>>> when we do not materialize a KTable due to
> > >>> optimization. Then
> > >>> > > > > > > >>> we
> > >>> > > > > > > >>>>>>>>>> cannot avoid the idempotent records because
> we do
> > >>> not keep the
> > >>> > > > > > > >>>>>>>>>> first record with the lower timestamp to
> compare
> > >>> to. Emit from
> > >>> > > > > > > >>>>>>>>>> KTable with aggregations: If we specify that
> an
> > >>> aggregation
> > >>> > > > > > > >>>>>>>>>> result should have the highest timestamp of
> the
> > >>> records that
> > >>> > > > > > > >>>>>>>>>> participated in the aggregation, we cannot
> ignore
> > >>> any
> > >>> > > > > > > >>> idempotent
> > >>> > > > > > > >>>>>>>>>> records. Admittedly, the result of an
> aggregation
> > >>> usually
> > >>> > > > > > > >>>>>>>>>> changes, but there are aggregations where the
> > >>> result may not
> > >>> > > > > > > >>>>>>>>>> change like min and max, or sum when the
> incoming
> > >>> records have
> > >>> > > > > > > >>> a
> > >>> > > > > > > >>>>>>>>>> value of zero. In those cases, we could
> benefit
> > >>> of the emit on
> > >>> > > > > > > >>>>>>>>>> change, but only if we define the semantics
> of the
> > >>> > > > > > aggregations
> > >>> > > > > > > >>>>>>>>>> to not use the highest timestamp of the
> > >>> participating records
> > >>> > > > > > > >>> for
> > >>> > > > > > > >>>>>>>>>> the result. In Kafka Streams, we do not have
> min,
> > >>> max, and sum
> > >>> > > > > > > >>> as
> > >>> > > > > > > >>>>>>>>>> explicit aggregations, but we need to provide
> an
> > >>> API to define
> > >>> > > > > > > >>>>>>>>>> what timestamp should be used for the result
> of
> > >>> an aggregation
> > >>> > > > > > > >>> if
> > >>> > > > > > > >>>>>>>>>> we want to go down this path. All of this does
> > >>> not block this
> > >>> > > > > > > >>> KIP
> > >>> > > > > > > >>>>>>>>>> and I just wanted to put this aspects up for
> > >>> discussion. The
> > >>> > > > > > > >>> KIP
> > >>> > > > > > > >>>>>>>>>> can limit itself to emit from materialized
> > >>> KTables. However,
> > >>> > > > > > > >>> the
> > >>> > > > > > > >>>>>>>>>> limits should be explicitly stated in the KIP.
> > >>> Best, Bruno
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J.
> Sax
> > >>> > > > > > > >>>>>>>>>> <matth...@confluent.io <mailto:
> > >>> matth...@confluent.io>> wrote:
> > >>> > > > > > > >>>>>>>>>> IMHO, the question about semantics depends on
> the
> > >>> use case, in
> > >>> > > > > > > >>>>>>>>>> particular on the origin of a KTable. If
> there is
> > >>> a changlog
> > >>> > > > > > > >>>>>>>>>> topic that one reads directly into a KTable,
> > >>> emit-on-change
> > >>> > > > > > > >>> does
> > >>> > > > > > > >>>>>>>>>> actually make sense, because the timestamp
> > >>> indicates _when_
> > >>> > > > > > the
> > >>> > > > > > > >>>>>>>>>> update was _effective_. For this case, it is
> > >>> semantically
> > >>> > > > > > sound
> > >>> > > > > > > >>>>>>>>>> to _not_ update the timestamp in the store,
> > >>> because the second
> > >>> > > > > > > >>>>>>>>>> update is actually idempotent and advancing
> the
> > >>> timestamp is
> > >>> > > > > > > >>> not
> > >>> > > > > > > >>>>>>>>>> ideal (one could even consider it to be wrong
> to
> > >>> advance the
> > >>> > > > > > > >>>>>>>>>> timestamp) because the "valid time" of the
> record
> > >>> pair did not
> > >>> > > > > > > >>>>>>>>>> change. This reasoning also applies to
> > >>> KTable-KTable joins.
> > >>> > > > > > > >>>>>>>>>> However, if the KTable is the result of an
> > >>> aggregation, I
> > >>> > > > > > think
> > >>> > > > > > > >>>>>>>>>> emit-on-update is more natural, because the
> > >>> timestamp reflects
> > >>> > > > > > > >>>>>>>>>> the _last_ time (ie, highest timestamp) of all
> > >>> input records
> > >>> > > > > > > >>> the
> > >>> > > > > > > >>>>>>>>>> contributed to the result. Hence, updating the
> > >>> timestamp and
> > >>> > > > > > > >>>>>>>>>> emitting a new record actually sounds correct
> to
> > >>> me. This
> > >>> > > > > > > >>> applies
> > >>> > > > > > > >>>>>>>>>> to windowed and non-windowed aggregations
> IMHO.
> > >>> However,
> > >>> > > > > > > >>>>>>>>>> considering the argument that the timestamp
> > >>> should not be
> > >>> > > > > > > >>> update
> > >>> > > > > > > >>>>>>>>>> in the first case in the store to begin with,
> > >>> both cases are
> > >>> > > > > > > >>>>>>>>>> actually the same, and both can be modeled as
> > >>> emit-on-change:
> > >>> > > > > > > >>> if
> > >>> > > > > > > >>>>>>>>>> a `table()` operator does not update the
> > >>> timestamp if the
> > >>> > > > > > value
> > >>> > > > > > > >>>>>>>>>> does not change, there is _no_ change and thus
> > >>> nothing is
> > >>> > > > > > > >>>>>>>>>> emitted. At the same time, if an aggregation
> > >>> operator does
> > >>> > > > > > > >>> update
> > >>> > > > > > > >>>>>>>>>> the timestamp (even if the value does not
> change)
> > >>> there _is_ a
> > >>> > > > > > > >>>>>>>>>> change and we emit. Note that handling
> > >>> out-of-order data for
> > >>> > > > > > > >>>>>>>>>> aggregations would also work seamlessly with
> this
> > >>> approach --
> > >>> > > > > > > >>> for
> > >>> > > > > > > >>>>>>>>>> out-of-order records, the timestamp does never
> > >>> change, and
> > >>> > > > > > > >>> thus,
> > >>> > > > > > > >>>>>>>>>> we only emit if the result itself changes.
> > >>> Therefore, I would
> > >>> > > > > > > >>>>>>>>>> argue that we might not even need any config,
> > >>> because the
> > >>> > > > > > > >>>>>>>>>> emit-on-change behavior is just correct and
> > >>> reduced the
> > >>> > > > > > > >>>>>>>>>> downstream load, while our current behavior is
> > >>> not ideal (even
> > >>> > > > > > > >>> if
> > >>> > > > > > > >>>>>>>>>> it's also correct). Thoughts? -Matthias On
> > >>> 1/24/20 9:37 AM,
> > >>> > > > > > > >>> John
> > >>> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks for that
> idea. I
> > >>> hadn't
> > >>> > > > > > > >>>>>>>>>> considered that option before, and it does
> seem
> > >>> like that
> > >>> > > > > > would
> > >>> > > > > > > >>>>>>>>>> be the right place to put it if we think it
> might
> > >>> be
> > >>> > > > > > > >>> semantically
> > >>> > > > > > > >>>>>>>>>> important to control on a table-by-table
> basis. I
> > >>> had been
> > >>> > > > > > > >>>>>>>>>> thinking of it less semantically and more
> > >>> practically. In the
> > >>> > > > > > > >>>>>>>>>> context of a large topology, or more
> generally, a
> > >>> large
> > >>> > > > > > > >>> software
> > >>> > > > > > > >>>>>>>>>> system that contains many topologies and other
> > >>> event-driven
> > >>> > > > > > > >>>>>>>>>> systems, each no-op result becomes an input
> that
> > >>> is destined
> > >>> > > > > > to
> > >>> > > > > > > >>>>>>>>>> itself become a no-op result, and so on, all
> the
> > >>> way through
> > >>> > > > > > > >>> the
> > >>> > > > > > > >>>>>>>>>> system. Thus, a single pointless processing
> > >>> result becomes
> > >>> > > > > > > >>>>>>>>>> amplified into a large number of pointless
> > >>> computations, cache
> > >>> > > > > > > >>>>>>>>>> perturbations, and network and disk I/O
> > >>> operations. If you
> > >>> > > > > > also
> > >>> > > > > > > >>>>>>>>>> consider operations with fan-out implications,
> > >>> like branching
> > >>> > > > > > > >>> or
> > >>> > > > > > > >>>>>>>>>> foreign-key joins, the wasted resources are
> > >>> amplified not just
> > >>> > > > > > > >>> in
> > >>> > > > > > > >>>>>>>>>> proportion to the size of the system, but the
> > >>> size of the
> > >>> > > > > > > >>> system
> > >>> > > > > > > >>>>>>>>>> times the average fan-out (to the power of the
> > >>> number of
> > >>> > > > > > > >>> fan-out
> > >>> > > > > > > >>>>>>>>>> operations on the path(s) through the
> system). In
> > >>> my time
> > >>> > > > > > > >>>>>>>>>> operating such systems, I've observed these
> > >>> effects to be very
> > >>> > > > > > > >>>>>>>>>> real, and actually, the system and use case
> > >>> doesn't have to be
> > >>> > > > > > > >>>>>>>>>> very large before the amplification poses an
> > >>> existential
> > >>> > > > > > threat
> > >>> > > > > > > >>>>>>>>>> to the system as a whole. This is the basis
> of my
> > >>> advocating
> > >>> > > > > > > >>> for
> > >>> > > > > > > >>>>>>>>>> a simple behavior change, rather than an
> opt-in
> > >>> config of any
> > >>> > > > > > > >>>>>>>>>> kind. It seems like Streams should "do the
> right
> > >>> thing" for
> > >>> > > > > > the
> > >>> > > > > > > >>>>>>>>>> majority use case. My theory (which may be
> wrong)
> > >>> is that the
> > >>> > > > > > > >>>>>>>>>> majority use case is more like "relational
> > >>> queries" than "CEP
> > >>> > > > > > > >>>>>>>>>> queries". Even if you were doing some
> > >>> event-sensitive
> > >>> > > > > > > >>>>>>>>>> computation, wouldn't you do them as Stream
> > >>> operations (where
> > >>> > > > > > > >>>>>>>>>> this feature is inapplicable anyway)? In
> keeping
> > >>> with the
> > >>> > > > > > > >>>>>>>>>> "practical" perspective, I suggested the
> opt-out
> > >>> config only
> > >>> > > > > > in
> > >>> > > > > > > >>>>>>>>>> the (I think unlikely) event that filtering
> out
> > >>> pointless
> > >>> > > > > > > >>> updates
> > >>> > > > > > > >>>>>>>>>> actually harms performance. I'd also be
> perfectly
> > >>> fine without
> > >>> > > > > > > >>>>>>>>>> the opt-out config. I really think that
> (because
> > >>> of the
> > >>> > > > > > > >>> timestamp
> > >>> > > > > > > >>>>>>>>>> semantics work already underway), we're
> already
> > >>> pre-fetching
> > >>> > > > > > > >>> the
> > >>> > > > > > > >>>>>>>>>> prior result most of the time, so there would
> > >>> actually be very
> > >>> > > > > > > >>>>>>>>>> little extra I/O involved in implementing
> > >>> emit-on-change.
> > >>> > > > > > > >>>>>>>>>> However, we should consider whether my
> experience
> > >>> is likely to
> > >>> > > > > > > >>>>>>>>>> be general. Do you have some use case in mind
> for
> > >>> which you'd
> > >>> > > > > > > >>>>>>>>>> actually want some KTable results to be
> > >>> emit-on-update for
> > >>> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna
> > >>> wrote: Hi
> > >>> > > > > > > >>> Richard,
> > >>> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree with John that
> we
> > >>> should focus
> > >>> > > > > > > >>> on
> > >>> > > > > > > >>>>>>>>>> the interface and behavior change in a KIP. We
> > >>> can discuss the
> > >>> > > > > > > >>>>>>>>>> implementation later. I am also +1 for the
> > >>> survey. I had a
> > >>> > > > > > > >>>>>>>>>> thought about this. Couldn't we consider
> > >>> emit-on-change to be
> > >>> > > > > > > >>> one
> > >>> > > > > > > >>>>>>>>>> config of suppress (like `untilWindowCloses`)?
> > >>> What you
> > >>> > > > > > > >>>>>>>>>> basically propose is to suppress updates if
> they
> > >>> do not change
> > >>> > > > > > > >>>>>>>>>> the result. Considering emit on change as a
> > >>> flavour of
> > >>> > > > > > suppress
> > >>> > > > > > > >>>>>>>>>> would be more flexible because it would
> specify
> > >>> the behavior
> > >>> > > > > > > >>>>>>>>>> locally for a KTable instead of globally for
> all
> > >>> KTables.
> > >>> > > > > > > >>>>>>>>>> Additionally, specifying the behavior in one
> > >>> place instead of
> > >>> > > > > > > >>>>>>>>>> multiple places feels more intuitive and
> > >>> consistent to me.
> > >>> > > > > > > >>> Best,
> > >>> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at 7:49 AM John
> Roesler
> > >>> > > > > > > >>>>>>>>>> <vvcep...@apache.org <mailto:
> vvcep...@apache.org>>
> > >>> wrote: Hi
> > >>> > > > > > > >>>>>>>>>> Richard, Thanks for picking this up! I know
> of at
> > >>> least one
> > >>> > > > > > > >>> large
> > >>> > > > > > > >>>>>>>>>> community member for which this feature is
> > >>> absolutely
> > >>> > > > > > > >>> essential.
> > >>> > > > > > > >>>>>>>>>> If I understand your two options, it seems
> like
> > >>> the proposal
> > >>> > > > > > is
> > >>> > > > > > > >>>>>>>>>> to implement it as a behavior change
> regardless,
> > >>> and the
> > >>> > > > > > > >>> question
> > >>> > > > > > > >>>>>>>>>> is whether to provide an opt-out config or
> not.
> > >>> Given that any
> > >>> > > > > > > >>>>>>>>>> implementation of this feature would have some
> > >>> performance
> > >>> > > > > > > >>> impact
> > >>> > > > > > > >>>>>>>>>> under some workloads, and also that we don't
> know
> > >>> if anyone
> > >>> > > > > > > >>>>>>>>>> really depends on emit-on-update time
> semantics,
> > >>> it seems like
> > >>> > > > > > > >>> we
> > >>> > > > > > > >>>>>>>>>> should propose to add an opt-out config. Can
> you
> > >>> update the
> > >>> > > > > > KIP
> > >>> > > > > > > >>>>>>>>>> to mention the exact config key and value(s)
> > >>> you'd propose?
> > >>> > > > > > > >>> Just
> > >>> > > > > > > >>>>>>>>>> to move the discussion forward, maybe
> something
> > >>> like: emit.on
> > >>> > > > > > > >>> :=
> > >>> > > > > > > >>>>>>>>>> change|update with the new default being
> "change"
> > >>> Thanks for
> > >>> > > > > > > >>>>>>>>>> pointing out the timestamp issue in
> particular. I
> > >>> agree that
> > >>> > > > > > if
> > >>> > > > > > > >>>>>>>>>> we discard the latter update as a no-op, then
> we
> > >>> also have to
> > >>> > > > > > > >>>>>>>>>> discard its timestamp (obviously, we don't
> > >>> forward the
> > >>> > > > > > > >>> timestamp
> > >>> > > > > > > >>>>>>>>>> update, as that's the whole point, but we also
> > >>> can't update
> > >>> > > > > > the
> > >>> > > > > > > >>>>>>>>>> timestamp in the store, as the store must
> remain
> > >>> consistent
> > >>> > > > > > > >>> with
> > >>> > > > > > > >>>>>>>>>> what has been emitted). I have to confess
> that I
> > >>> disagree with
> > >>> > > > > > > >>>>>>>>>> your implementation proposal, but it's also
> not
> > >>> necessary to
> > >>> > > > > > > >>>>>>>>>> discuss implementation in the KIP. Maybe it
> would
> > >>> be less
> > >>> > > > > > > >>>>>>>>>> controversial if you just drop that section
> for
> > >>> now, so that
> > >>> > > > > > > >>> the
> > >>> > > > > > > >>>>>>>>>> KIP discussion can focus on the behavior
> change
> > >>> and config.
> > >>> > > > > > > >>> Just
> > >>> > > > > > > >>>>>>>>>> for reference, there is some research into
> this
> > >>> domain. For
> > >>> > > > > > > >>>>>>>>>> example, see the "Report" section (3.2.3) of
> the
> > >>> SECRET paper:
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>
> > >>> > > > > >
> > >>>
> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop
> > >>> > > > > > > > le.csail.mit.edu
> > >>> > > > > > > >>>>>>
> > >>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data
> > >>> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com
> > >>> > > > > > > >>>>>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7
> > >>> > > > > > > >
> > >>> > > > > > > >>>>>>
> > >>> > > > > > > >>>
> > >>> > > > > >
> > >>>
> Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&amp;sdata
> > >>> > > > > > > >
> > >>> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&amp;reserved=0
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > > It might help to round out the proposal if you take a
> brief
> > >>> > > > > > > >>> survey of
> > >>> > > > > > > >>>>>>>>>> the behaviors of other systems, along with
> pros
> > >>> and cons if
> > >>> > > > > > any
> > >>> > > > > > > >>>>>>>>>> are reported. Thanks, -John
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu
> wrote:
> > >>> Hi
> > >>> > > > > > everybody!
> > >>> > > > > > > >>>>>>>>>> I'd like to propose a change that we probably
> > >>> should've added
> > >>> > > > > > > >>> for
> > >>> > > > > > > >>>>>>>>>> a long time now. The key benefit of this KIP
> > >>> would be reduced
> > >>> > > > > > > >>>>>>>>>> traffic in Kafka Streams since a lot of no-op
> > >>> results would no
> > >>> > > > > > > >>>>>>>>>> longer be sent downstream. Here is the KIP for
> > >>> reference.
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>
> > >>> > > > > >
> > >>>
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi
> > >>> > > > > > > > ki.apache.org
> > >>> > > > > > > >>>>>>
> > >>> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit
> > >>> > > > > > > >
> > >>> > > > > > > >>>>>>
> > >>> > > > > > > >>>
> > >>> > > > > >
> > >>>
> %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThom
> > >>> > > > > > > > as.Becker%40tivo.com
> > >>> > > > > > > >>>>>>
> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c
> > >>> > > > > > > >
> > >>> > > > > > > >>>>>>
> > >>> > > > > > > >>>
> > >>> > > > > >
> > >>>
> 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&amp;sdata=zYpCSFOsyN4%2B
> > >>> > > > > > > > 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&amp;reserved=0
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > > Currently, I seek to formalize our approach for this
> KIP
> > >>> first
> > >>> > > > > > > >>> before
> > >>> > > > > > > >>>>>>>>>> we determine concrete API additions /
> > >>> configurations. Some
> > >>> > > > > > > >>>>>>>>>> configs might warrant adding, whiles others
> are
> > >>> not necessary
> > >>> > > > > > > >>>>>>>>>> since adding them would only increase
> complexity
> > >>> of Kafka
> > >>> > > > > > > >>>>>>>>>> Streams. Cheers, Richard
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> ________________________________ This email
> and
> > >>> any
> > >>> > > > > > attachments
> > >>> > > > > > > >>>>>>>>>> may contain confidential and privileged
> material
> > >>> for the sole
> > >>> > > > > > > >>> use
> > >>> > > > > > > >>>>>>>>>> of the intended recipient. Any review,
> copying, or
> > >>> > > > > > distribution
> > >>> > > > > > > >>>>>>>>>> of this email (or any attachments) by others
> is
> > >>> prohibited. If
> > >>> > > > > > > >>>>>>>>>> you are not the intended recipient, please
> > >>> contact the sender
> > >>> > > > > > > >>>>>>>>>> immediately and permanently delete this email
> and
> > >>> any
> > >>> > > > > > > >>>>>>>>>> attachments. No employee or agent of TiVo is
> > >>> authorized to
> > >>> > > > > > > >>>>>>>>>> conclude any binding agreement on behalf of
> TiVo
> > >>> by email.
> > >>> > > > > > > >>>>>>>>>> Binding agreements with TiVo may only be made
> by
> > >>> a signed
> > >>> > > > > > > >>> written
> > >>> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker* *Principal
> Engineer *
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> *Personalized Content Discovery*
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
> > >>> http://www.tivo.com/>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>>
> > >>> > > > > > > >>>>>>>>>> This email and any attachments may contain
> > >>> confidential and
> > >>> > > > > > > >>>>>>>>>> privileged material for the sole use of the
> > >>> intended
> > >>> > > > > > recipient.
> > >>> > > > > > > >>>>>>>>>> Any review, copying, or distribution of this
> > >>> email (or any
> > >>> > > > > > > >>>>>>>>>> attachments) by others is prohibited. If you
> are
> > >>> not the
> > >>> > > > > > > >>> intended
> > >>> > > > > > > >>>>>>>>>> recipient, please contact the sender
> immediately
> > >>> and
> > >>> > > > > > > >>> permanently
> > >>> > > > > > > >>>>>>>>>> delete this email and any attachments. No
> > >>> employee or agent of
> > >>> > > > > > > >>>>>>>>>> TiVo is authorized to conclude any binding
> > >>> agreement on behalf
> > >>> > > > > > > >>> of
> > >>> > > > > > > >>>>>>>>>> TiVo by email. Binding agreements with TiVo
> may
> > >>> only be made
> > >>> > > > > > > >>> by a
> > >>> > > > > > > >>>>>>>>>> signed written agreement.
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> --
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> *Tommy Becker* /Principal Engineer /
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> /Personalized Content Discovery/
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
> > >>> http://www.tivo.com/>
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>>>>
> > >>> > > > > > > >>>>>>
> > >>> > > > > >
> > >>>
> ----------------------------------------------------------------------
> > >>> > > > > > > >>>>>>>
> > >>> > > > > > > >>>>>>
> > >>> > > > > > > >>>>>
> > >>> > > > > > > >>>>
> > >>> > > > > > > >>>
> > >>> > > > > > > >>
> > >>> > > > > > > >
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > Attachments:
> > >>> > > > > > > * signature.asc
> > >>> > > > > >
> > >>> > > >
> > >>> >
> > >>>
> > >>
> >
>

Reply via email to