Hi all,

Just some updates. Below is the vote thread:
https://sematext.com/opensee/m/Kafka/uyzND1h1NPW1tLVQR?subj=+VOTE+KIP+557+Add+emit+on+change+support+for+Kafka+Streams

It would be great if we can include this change to Kafka. :)

Cheers,
Richard

On Thu, Feb 27, 2020 at 6:45 PM Richard Yu <yohan.richard...@gmail.com>
wrote:

> Hi all,
>
> @John Will add some notes accordingly.
>
> To all: Thanks for all your input!
> It looks like we can wrap up this discussion thread then.
>
> I've started a vote thread, so please feel free to cast your vote there!
>
> We should be pretty close. :)
>
> Cheers,
> Richard
>
> On Thu, Feb 27, 2020 at 2:34 PM John Roesler <vvcep...@apache.org> wrote:
>
>> Hi Richard,
>>
>> Thanks for the update!
>>
>> I read it over, and overall it looks good!
>>
>> I have only a minor concern about the rate metric definition:
>> > The rate option indicates the ratio of records dropped to actual volume
>> of records passing through the task
>> That's not the definition of a "rate". It should be something more like
>> "the average number of dropped idempotent updates per second".
>>
>> Incidentally, I mentioned this KIP to Guozhang, and he brought up an
>> interesting concern I'd like to share with the list. He noted that if we
>> filter
>> out idempotent table updates, stream time will not advance with every
>> input event anymore. He asked if this would have a negative impact on
>> operations that depend on stream time.
>>
>> I think this is a valid concern. For example, you can use Suppress to
>> buffer KTable updates until a certain amount of stream time passes.
>> Specifically,
>> the Suppress processor itself advances stream time as it receives new
>> records
>> to its `process` method. In the pathological case, all updates are
>> idempotent,
>> get dropped, and the Suppress operator never emits anything, even though
>> to
>> an outside observer, stream time should have advanced.
>>
>> Example scenario:
>> > inputTable = builder.table(input)
>> > suppressed = inputTable.suppress(untilTimeLimit(10))
>> >
>> > input: key=A, timestamp=0, value=X
>> > inputTable: key=A, timestamp=0, value=X
>> > suppressed buffer: key=A, timestamp=0, value=X (observed stream time =
>> 0)
>> > output: (nothing)
>> >
>> > input: key=A, timestamp=11, value=X
>> > // update is idempotent, so it gets dropped
>> > inputTable: key=A, timestamp=0, value=X
>> > suppressed buffer: key=A, timestamp=0, value=X (observed stream time =
>> 0)
>> > output: (nothing)
>>
>> Note that even though stream time has technically advanced beyond the
>> suppression config of 10, the suppression buffer doesn't see it because
>> the
>> KTable source already dropped the idempotent update.
>>
>> I'm thinking that this situation is indeed concerning, and we should be
>> aware
>> and make a note of it. However, I don't think that it should change our
>> proposal.
>> To understand why, I'll enumerate all the usages of stream time in Kafka
>> Streams:
>>
>> windowed operations
>> -----------------------------
>> KStreamSessionWindowAggregate & KStreamWindowAggregate:
>>   - used to determine if an incoming record belongs to a window that is
>> already closed
>> Suppressed.untilWindowCloses():
>>   - used to flush out results for windows, once they are closed
>> AbstractRocksDBSegmentedBytesStore & InMemorySessionStore &
>> InMemoryWindowStore:
>>   - used to create new segments and drop old ones that are out of
>> retention
>>
>> non-windowed operations
>> -----------------------------------
>> Suppressed.untilTimeLimit
>>   - used to flush out prior updates that have passed the configured age,
>> in stream time
>>
>> Note that most of the usages are in windowed operations. One interesting
>> thing
>> about this context is that records with higher timestamps belong to
>> different windows.
>> In order to advance stream time far enough to close a window or push it
>> out of retention, the new records must have timestamps that in later
>> windows, which means
>> that they are updates to _new_ keys, which means they would not be
>> suppressed as
>> idempotent.
>>
>> Updates within a window could still be suppressed, though:
>> For example, if the window size is 10, and the grace period is 5, and we
>> get updates all
>> for the same key with timestamps 0, 11, and 16, today, we would emit the
>> record for
>> the [0,10) window as soon as we got the update at time 16 (since stream
>> time is now
>> past the window end + grace period time of 15). But if we drop the
>> time=16 update, we
>> wouldn't emit the [0,10) window results until we get a record with time
>> >= 20.
>>
>> You can see that the maximum amount of (stream) time that dropping
>> idempotent
>> updates could delay updates is one window. This might be surprising, but
>> it doesn't
>> seem too bad, especially since Suppress explicitly does _not_ promise to
>> emit the
>> results at the earliest possible time, just at some time after the window
>> closes.
>>
>> Even better, though, all the windowed aggregations are _stream_
>> aggregations
>> that _produce_ a KTable, and we already decided that (at least for now),
>> we would
>> include the timestamp in the idempotence check for stream aggregation
>> results.
>> With this strategy, we would actually not suppress _any_ update to a
>> stream aggregation (windowed or otherwise) that advances stream time.
>>
>> So there's no problem at all with windowed operations.
>>
>> This only leaves non-windowed operations, of which there's only one. I
>> have to admit
>> that I regret Suppressed.untilTimeLimit. It turns out that everyone I've
>> heard of who
>> used this API actually wanted it to be wall-clock based, not stream-time
>> based. So,
>> I'm not sure in practice if this sharp edge will actually cut anyone.
>>
>> I think a "proper" fix would be to introduce some kind of control message
>> to advance
>> stream time independently of records. We've already talked about this a
>> little in
>> KAFKA-8769, and it would also be necessary for global consistency
>> markers. Basically,
>> once a record is dropped as an idempotent update, we could still forward
>> a time
>> marker through the topology, which could trigger suppressions, as well as
>> window
>> boundaries, but wouldn't result in any computations.
>>
>> But practically speaking, I'm really not confident that there's anyone
>> really using the
>> untilTimeLimit suppression, and even if they are, if they would really
>> see consecutive
>> idempotent updates for long enough to really have an observable impact on
>> what gets emitted from the suppression buffer. In fact, if some
>> hypothetical person did find
>> themselves on the wrong end of my assumption, they could _remove_ the
>> suppression, and rely instead on idempotence-dropping to get the same
>> level of
>> traffic reduction that they enjoy from the suppression.
>>
>> Anyway, long story short, I'm advocating to continue with the current
>> proposal
>> and just make a note of the situations I've outlined above.
>>
>> Thanks for your time,
>> -John
>>
>>
>> On Thu, Feb 27, 2020, at 14:33, Richard Yu wrote:
>> > Hi all,
>> >
>> > I might've made a minor mistake. The processor node level is level 3,
>> not
>> > level 1.
>> > I will correct the KIP accordingly.
>> >
>> > After looking over things, I decided to start the voting thread this
>> > afternoon.
>> >
>> > Cheers,
>> > Richard
>> >
>> > On Thu, Feb 27, 2020 at 12:29 PM Richard Yu <yohan.richard...@gmail.com
>> >
>> > wrote:
>> >
>> > > Hi Bruno, Hi John,
>> > >
>> > > Thanks for your comments! I updated the KIP accordingly, and it looks
>> like
>> > > for quite a few points. I was doing some beating around the bush which
>> > > could've been avoided.
>> > >
>> > > Looks like we can reduce the metric to Level 1 (per processor node)
>> then.
>> > >
>> > > I've cleaned up most of the unnecessary info, and we should be fairly
>> > > close.
>> > > I will start working on a PR soon for this KIP. (although we might
>> split
>> > > that up into stages)
>> > >
>> > > Cheers,
>> > > Richard
>> > >
>> > > On Thu, Feb 27, 2020 at 6:06 AM Bruno Cadonna <br...@confluent.io>
>> wrote:
>> > >
>> > >> Hi John,
>> > >>
>> > >> I agree with you. It is better to measure the metric on processor
>> node
>> > >> level. The users can do the rollup to task-level by themselves.
>> > >>
>> > >> Best,
>> > >> Bruno
>> > >>
>> > >> On Thu, Feb 27, 2020 at 12:09 AM John Roesler <vvcep...@apache.org>
>> > >> wrote:
>> > >> >
>> > >> > Hi Richard,
>> > >> >
>> > >> > I've been making a final pass over the KIP.
>> > >> >
>> > >> > Re: Proposed Behavior Change:
>> > >> >
>> > >> > I think this point is controversial and probably doesn't need to be
>> > >> there at all:
>> > >> > > 2.b. In certain situations where there is a high volume of
>> idempotent
>> > >> > > updates throughout the Streams DAG, it will be recommended
>> practice
>> > >> > > to materialize all operations to reduce traffic overall across
>> the
>> > >> entire
>> > >> > >  network of nodes.
>> > >> >
>> > >> > Re-reading all the points, it seems like we can sum them up in a
>> way
>> > >> that's
>> > >> > a little more straight to the point, and gives us the right amount
>> of
>> > >> flexibility:
>> > >> >
>> > >> > > Proposed Behavior Changes
>> > >> > >
>> > >> > > Definition: "idempotent update" is one in which the new result
>> and
>> > >> prior
>> > >> > > result,  when serialized, are identical byte arrays
>> > >> > >
>> > >> > > Note: an "update" is a concept that only applies to Table
>> operations,
>> > >> so
>> > >> > > the concept of an "idempotent update" also only applies to Table
>> > >> operations.
>> > >> > > See
>> > >>
>> https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
>> > >> > > for more information.
>> > >> > >
>> > >> > > Given that definition, we propose for Streams to drop idempotent
>> > >> updates
>> > >> > > in any situation where it's possible and convenient to do so. For
>> > >> example,
>> > >> > > any time we already have both the prior and new results
>> serialized, we
>> > >> > > may compare them, and drop the update if it is idempotent.
>> > >> > >
>> > >> > > Note that under this proposal, we can implement idempotence
>> checking
>> > >> > > in the following situations:
>> > >> > > 1. Any aggregation (for example, KGroupedStream, KGroupedTable,
>> > >> > >     TimeWindowedKStream, and SessionWindowedKStream operations)
>> > >> > > 2. Any Materialized KTable operation
>> > >> > > 3. Repartition operations, when we need to send both prior and
>> new
>> > >> results
>> > >> >
>> > >> > Notice that in my proposed wording, we neither limit ourselves to
>> just
>> > >> the
>> > >> > situations enumerated, nor promise to implement the optimization in
>> > >> every
>> > >> > possible situation. IMHO, this is the best way to propose such a
>> > >> feature.
>> > >> > That way, we have the flexibility to implement it in stages, and
>> also
>> > >> to add
>> > >> > on to the implementation in the future.
>> > >> >
>> > >> >
>> > >> > Re: Metrics
>> > >> >
>> > >> > I agree with Bruno, although, I think it might just be a confusing
>> > >> statement.
>> > >> > It might be clearer to drop all the "discussion", and just say: "We
>> > >> will add a
>> > >> > metric to count the number of idempotent updates that we have
>> dropped".
>> > >> >
>> > >> > Also, with respect to the metric, I'm wondering if the metric
>> should be
>> > >> task-
>> > >> > level or processor-node-level. Since the interesting action takes
>> place
>> > >> inside
>> > >> > individual processor nodes, I _think_ it would be higher leverage
>> to
>> > >> just
>> > >> > measure it at the node level. WDYT?
>> > >> >
>> > >> > Re: Design Reasoning
>> > >> >
>> > >> > This section seems to be a little bit outdated. I also just
>> noticed a
>> > >> "surprise"
>> > >> > configuration "timestamp.aggregation.selection.policy" hidden in
>> point
>> > >> 1.a.
>> > >> > Is that part of the proposal? We haven't discussed it, and I think
>> we
>> > >> were
>> > >> > talking about this KIP being "configuration free".
>> > >> >
>> > >> > There is also some discussion of discarded alternative in the
>> Design
>> > >> Reasoning
>> > >> > section, which is confusing. Finally, there was a point there I
>> didn't
>> > >> understand
>> > >> > at all, about stateless operators not being intended to load prior
>> > >> results.
>> > >> > This statement doesn't seem to be true, but it also doesn't seem
>> to be
>> > >> relevant,
>> > >> > so maybe we can just drop it.
>> > >> >
>> > >> > Overall, it might help if you make a pass on this section, and just
>> > >> discuss as
>> > >> > briefly as possible the justification for the proposed behavior
>> change,
>> > >> and
>> > >> > not adding a configuration. Try to avoid talking about things that
>> we
>> > >> are not
>> > >> > proposing, since that will just lead to confusion.
>> > >> >
>> > >> > Similarly, I'd just completely remove the "Implementation
>> [discarded]"
>> > >> section.
>> > >> > It was good to have this as part of the discussion initially, but
>> as we
>> > >> move
>> > >> > toward a vote, it's better to just streamline the KIP document as
>> much
>> > >> as
>> > >> > possible. Keeping a "discarded" section in the document will just
>> make
>> > >> it
>> > >> > harder for new people to understand the proposal. We did the same
>> thing
>> > >> > with KIP-441, where there were two prior drafts included at the
>> end of
>> > >> the
>> > >> > document, and we just deleted them for clarity.
>> > >> >
>> > >> > I liked the "Compatibility" and "Rejected Alternatives" section.
>> Very
>> > >> clear
>> > >> > and to the point.
>> > >> >
>> > >> > Thanks again for the contribution! I think once the KIP document is
>> > >> cleaned
>> > >> > up, we'll be in good shape to finalize the discussion.
>> > >> > -John
>> > >> >
>> > >> >
>> > >> > On Wed, Feb 26, 2020, at 07:27, Bruno Cadonna wrote:
>> > >> > > Hi Richard,
>> > >> > >
>> > >> > > 1. Could you change "idempotent update operations will only be
>> dropped
>> > >> > > from KTables, not from other classes." -> idempotent update
>> operations
>> > >> > > will only be dropped from materialized KTables? For
>> non-materialized
>> > >> > > KTables -- as they can occur after optimization of the topology
>> -- we
>> > >> > > cannot drop idempotent updates.
>> > >> > >
>> > >> > > 2. I cannot completely follow the metrics section. Do you want to
>> > >> > > record all idempotent updates or only the dropped ones? In
>> particular,
>> > >> > > I do not understand the following sentences:
>> > >> > > "For that matter, even if we don't drop idempotent updates, we
>> should
>> > >> > > at the very least record the number of idempotent updates that
>> has
>> > >> > > been seen go through a particular processor."
>> > >> > > "Therefore, we should add some metrics which will count the
>> number of
>> > >> > > idempotent updates that each node has seen."
>> > >> > > I do not see how we can record idempotent updates that we do not
>> drop.
>> > >> > > If we see them, we should drop them. If we do not see them, we
>> cannot
>> > >> > > drop them and we cannot record them.
>> > >> > >
>> > >> > > Best,
>> > >> > > Bruno
>> > >> > >
>> > >> > > On Wed, Feb 26, 2020 at 4:57 AM Richard Yu <
>> > >> yohan.richard...@gmail.com> wrote:
>> > >> > > >
>> > >> > > > Hi John,
>> > >> > > >
>> > >> > > > Sounds goods. It looks like we are close to wrapping things
>> up. If
>> > >> there
>> > >> > > > isn't any other revisions which needs to be made. (If so,
>> please
>> > >> comment in
>> > >> > > > the thread)
>> > >> > > > I will start the voting process this Thursday (Pacific Standard
>> > >> Time).
>> > >> > > >
>> > >> > > > Cheers,
>> > >> > > > Richard
>> > >> > > >
>> > >> > > > On Tue, Feb 25, 2020 at 11:59 AM John Roesler <
>> vvcep...@apache.org>
>> > >> wrote:
>> > >> > > >
>> > >> > > > > Hi Richard,
>> > >> > > > >
>> > >> > > > > Sorry for the slow reply. I actually think we should avoid
>> > >> checking
>> > >> > > > > equals() for now. Your reasoning is good, but the truth is
>> that
>> > >> > > > > depending on the implementation of equals() is non-trivial,
>> > >> > > > > semantically, and (though I proposed it before), I'm not
>> convinced
>> > >> > > > > it's worth the risk. Much better to start with exactly one
>> kind of
>> > >> > > > > "idempotence detection".
>> > >> > > > >
>> > >> > > > > Even if someone does update their serdes, we know that the
>> new
>> > >> > > > > serde would still be able to _de_serialize the old format,
>> or the
>> > >> whole
>> > >> > > > > app would break. The situation is that the new result gets
>> encoded
>> > >> > > > > in the new binary format, which means we don't detect an
>> > >> idempotent
>> > >> > > > > update for what it is. In this case, we'd write the new
>> binary
>> > >> format to
>> > >> > > > > disk and the changelog, and forward it downstream. However,
>> we
>> > >> only
>> > >> > > > > do this once. Now that the binary format for that record has
>> been
>> > >> updated,
>> > >> > > > > we would correctly detect idempotence of any subsequent
>> updates.
>> > >> > > > >
>> > >> > > > > Plus, we would still be able to filter out idempotent
>> updates in
>> > >> > > > > repartition
>> > >> > > > > sinks, since for those, we use the new serde to serialize
>> both
>> > >> the "old"
>> > >> > > > > and
>> > >> > > > > "new" result.
>> > >> > > > >
>> > >> > > > > It's certainly a good observation, but I think we can just
>> make a
>> > >> note of
>> > >> > > > > it
>> > >> > > > > in "rejected alternatives" for now, and plan to refine it
>> later,
>> > >> if it does
>> > >> > > > > pose a big performance problem.
>> > >> > > > >
>> > >> > > > > Thanks!
>> > >> > > > > -John
>> > >> > > > >
>> > >> > > > > On Sat, Feb 22, 2020, at 18:14, Richard Yu wrote:
>> > >> > > > > > Hi all,
>> > >> > > > > >
>> > >> > > > > > Updated the KIP.
>> > >> > > > > >
>> > >> > > > > > Just a question: do you think it would be a good idea if we
>> > >> check for
>> > >> > > > > both
>> > >> > > > > > Object#equals() and binary equality?
>> > >> > > > > > Because there might be some subtle changes in the
>> serialization
>> > >> (for
>> > >> > > > > > example, if the user decides to upgrade their serialization
>> > >> procedure to
>> > >> > > > > a
>> > >> > > > > > new one), but the underlying values of the result might be
>> the
>> > >> same.
>> > >> > > > > > (therefore equals() might return true)
>> > >> > > > > >
>> > >> > > > > > Do you think this would be plausible?
>> > >> > > > > >
>> > >> > > > > > Cheers,
>> > >> > > > > > Richard
>> > >> > > > > >
>> > >> > > > > > On Fri, Feb 21, 2020 at 2:37 PM Richard Yu <
>> > >> yohan.richard...@gmail.com>
>> > >> > > > > > wrote:
>> > >> > > > > >
>> > >> > > > > > > Hello,
>> > >> > > > > > >
>> > >> > > > > > > Just to make some updates. I changed the name of the
>> metric
>> > >> so that it
>> > >> > > > > was
>> > >> > > > > > > more in line with usual Kafka naming conventions for
>> metrics
>> > >> / sensors.
>> > >> > > > > > > Below is the updated description of the metric:
>> > >> > > > > > >
>> > >> > > > > > > dropped-idempotent-updates : (Level 2 - Per Task) DEBUG
>> (rate
>> > >> | total)
>> > >> > > > > > >
>> > >> > > > > > > Description: This metric will record the number of
>> updates
>> > >> that have
>> > >> > > > > been
>> > >> > > > > > > dropped since they are essentially re-performing an
>> earlier
>> > >> operation.
>> > >> > > > > > >
>> > >> > > > > > > Note:
>> > >> > > > > > >
>> > >> > > > > > >    - The rate option indicates the ratio of records
>> dropped
>> > >> to actual
>> > >> > > > > > >    volume of records passing through the task.
>> > >> > > > > > >    - The total option will just give a raw count of the
>> > >> number of
>> > >> > > > > records
>> > >> > > > > > >    dropped.
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > I hope that this is more on point.
>> > >> > > > > > >
>> > >> > > > > > > Best,
>> > >> > > > > > > Richard
>> > >> > > > > > >
>> > >> > > > > > > On Fri, Feb 21, 2020 at 2:20 PM Richard Yu <
>> > >> yohan.richard...@gmail.com
>> > >> > > > > >
>> > >> > > > > > > wrote:
>> > >> > > > > > >
>> > >> > > > > > >> Hi all,
>> > >> > > > > > >>
>> > >> > > > > > >> Thanks for the clarification. I was just confused a
>> little
>> > >> on what was
>> > >> > > > > > >> going on.
>> > >> > > > > > >>
>> > >> > > > > > >> So I guess then that for the actual proposal. We got the
>> > >> following:
>> > >> > > > > > >>
>> > >> > > > > > >> 1. We check for binary equality, and perform no extra
>> look
>> > >> ups.
>> > >> > > > > > >> 2. Emphasize that this applies only to materialized
>> tables.
>> > >> > > > > > >> 3. We drop aggregation updates if key, value and
>> timestamp
>> > >> is the
>> > >> > > > > same.
>> > >> > > > > > >>
>> > >> > > > > > >> Then that settles the behavior changes. So it looks
>> like the
>> > >> Metric
>> > >> > > > > that
>> > >> > > > > > >> is the only thing that is left. In this case, I think
>> the
>> > >> metric
>> > >> > > > > would be
>> > >> > > > > > >> named the following: IdempotentUpdateMetric. This is
>> mostly
>> > >> off the
>> > >> > > > > top of
>> > >> > > > > > >> my head. So if you think that we change it, feel free
>> to say
>> > >> so.
>> > >> > > > > > >> The metric will report the number of dropped operations
>> > >> inherently.
>> > >> > > > > > >>
>> > >> > > > > > >> It will probably be added as a Sensor, similar to the
>> > >> dropped records
>> > >> > > > > > >> sensor we already have.
>> > >> > > > > > >>
>> > >> > > > > > >> If there isn't anything else, I will probably start the
>> > >> voting process
>> > >> > > > > > >> next week!
>> > >> > > > > > >>
>> > >> > > > > > >> Cheers,
>> > >> > > > > > >> Richard
>> > >> > > > > > >>
>> > >> > > > > > >>
>> > >> > > > > > >> On Fri, Feb 21, 2020 at 11:23 AM John Roesler <
>> > >> vvcep...@apache.org>
>> > >> > > > > > >> wrote:
>> > >> > > > > > >>
>> > >> > > > > > >>> Hi Bruno,
>> > >> > > > > > >>>
>> > >> > > > > > >>> Thanks for the clarification. Indeed, I was thinking
>> two
>> > >> things:
>> > >> > > > > > >>> 1. For the initial implementation, we can just avoid
>> adding
>> > >> any extra
>> > >> > > > > > >>> lookups, but only do the comparison when we already
>> happen
>> > >> to have
>> > >> > > > > > >>> the prior value.
>> > >> > > > > > >>> 2. I think, as a result of the timestamp semantics, we
>> > >> actually _do_
>> > >> > > > > look
>> > >> > > > > > >>> up the prior value approximately all the time, so the
>> > >> idempotence
>> > >> > > > > check
>> > >> > > > > > >>> should be quite effective.
>> > >> > > > > > >>>
>> > >> > > > > > >>> I think that second point is the same thing you're
>> > >> referring to
>> > >> > > > > > >>> potentially
>> > >> > > > > > >>> being unnecessary. It does mean that we do fetch the
>> whole
>> > >> value in a
>> > >> > > > > > >>> lot of cases where we really only need the timestamp,
>> so it
>> > >> could
>> > >> > > > > > >>> certainly
>> > >> > > > > > >>> be optimized in the future. In that future, we would
>> need
>> > >> to weigh
>> > >> > > > > that
>> > >> > > > > > >>> optimization against losing the idempotence check. But,
>> > >> that's a
>> > >> > > > > problem
>> > >> > > > > > >>> for tomorrow :)
>> > >> > > > > > >>>
>> > >> > > > > > >>> I'm 100% on board with scrutinizing the performance as
>> we
>> > >> implement
>> > >> > > > > > >>> this feature.
>> > >> > > > > > >>>
>> > >> > > > > > >>> Thanks again,
>> > >> > > > > > >>> -John
>> > >> > > > > > >>>
>> > >> > > > > > >>> On Thu, Feb 20, 2020, at 03:25, Bruno Cadonna wrote:
>> > >> > > > > > >>> > Hi John,
>> > >> > > > > > >>> >
>> > >> > > > > > >>> > I am glad to help you with your imagination. With
>> > >> overhead, I
>> > >> > > > > mainly
>> > >> > > > > > >>> > meant the additional lookup into the state store to
>> get
>> > >> the current
>> > >> > > > > > >>> > value, but I see now in the code that we do that
>> lookup
>> > >> anyways
>> > >> > > > > > >>> > (although I think we could avoid that in the cases
>> where
>> > >> we do not
>> > >> > > > > > >>> > need the old value). With or without config, we need
>> to
>> > >> evaluate
>> > >> > > > > the
>> > >> > > > > > >>> > performance benefits of this change, in any case.
>> > >> > > > > > >>> >
>> > >> > > > > > >>> > Best,
>> > >> > > > > > >>> > Bruno
>> > >> > > > > > >>> >
>> > >> > > > > > >>> > On Wed, Feb 19, 2020 at 7:48 PM John Roesler <
>> > >> vvcep...@apache.org>
>> > >> > > > > > >>> wrote:
>> > >> > > > > > >>> > >
>> > >> > > > > > >>> > > Thanks for your remarks, Bruno!
>> > >> > > > > > >>> > >
>> > >> > > > > > >>> > > I'm in favor of standardizing on terminology like
>> "not
>> > >> forwarding
>> > >> > > > > > >>> > > idempotent updates" or "dropping idempotent
>> updates".
>> > >> Maybe we
>> > >> > > > > > >>> > > should make a pass on the KIP and just convert
>> > >> everything to this
>> > >> > > > > > >>> > > phrasing. In retrospect, even the term
>> "emit-on-change"
>> > >> has too
>> > >> > > > > much
>> > >> > > > > > >>> > > semantic baggage, since it implies the semantics
>> from
>> > >> the SECRET
>> > >> > > > > > >>> > > paper, which we don't really want to imply here.
>> > >> > > > > > >>> > >
>> > >> > > > > > >>> > > I'm also in favor of the metric as you propose.
>> > >> > > > > > >>> > >
>> > >> > > > > > >>> > > Likewise with stream aggregations, I was also
>> under the
>> > >> > > > > impression
>> > >> > > > > > >>> > > that we agreed on dropping idempotent updates to
>> the
>> > >> aggregation
>> > >> > > > > > >>> > > result, any time we find that our "new" (key,
>> value,
>> > >> timestamp)
>> > >> > > > > > >>> result
>> > >> > > > > > >>> > > is identical to the prior one.
>> > >> > > > > > >>> > >
>> > >> > > > > > >>> > > Also, I'm +1 on all your recommendations for
>> updating
>> > >> the KIP
>> > >> > > > > > >>> document
>> > >> > > > > > >>> > > for clarity.
>> > >> > > > > > >>> > >
>> > >> > > > > > >>> > > Regarding the opt-out config. Perhaps I'm suffering
>> > >> from a
>> > >> > > > > failure of
>> > >> > > > > > >>> > > imagination, but I don't see how the current
>> proposal
>> > >> could
>> > >> > > > > really
>> > >> > > > > > >>> have
>> > >> > > > > > >>> > > a measurable impact on latency. If all we do is
>> make a
>> > >> single
>> > >> > > > > extra
>> > >> > > > > > >>> pass
>> > >> > > > > > >>> > > to compare two byte arrays for equality, only in
>> the
>> > >> cases where
>> > >> > > > > we
>> > >> > > > > > >>> already
>> > >> > > > > > >>> > > have the byte arrays available, it seems unlikely
>> to
>> > >> measurably
>> > >> > > > > > >>> affect the
>> > >> > > > > > >>> > > processing of non-idempotent updates. It seems
>> > >> guaranteed to
>> > >> > > > > > >>> _decrease_
>> > >> > > > > > >>> > > the latency of processing idempotent updates,
>> since we
>> > >> get to
>> > >> > > > > skip a
>> > >> > > > > > >>> > > store#put, at least one producer#send, and also all
>> > >> downstream
>> > >> > > > > > >>> processing,
>> > >> > > > > > >>> > > including all the disk and network operations
>> > >> associated with
>> > >> > > > > > >>> downstream
>> > >> > > > > > >>> > > operations.
>> > >> > > > > > >>> > >
>> > >> > > > > > >>> > > It seems like if we're pretty sure this change
>> would
>> > >> only
>> > >> > > > > _help_, we
>> > >> > > > > > >>> shouldn't
>> > >> > > > > > >>> > > introduce the operational burden of an extra
>> > >> configuration. If we
>> > >> > > > > > >>> want to
>> > >> > > > > > >>> > > be more aggressive about dropping idempotent
>> operations
>> > >> in the
>> > >> > > > > > >>> future,
>> > >> > > > > > >>> > > such as depending on equals() or adding a
>> ChangeDetector
>> > >> > > > > interface,
>> > >> > > > > > >>> then
>> > >> > > > > > >>> > > we should consider adding a configuration as part
>> of
>> > >> that future
>> > >> > > > > > >>> work. In
>> > >> > > > > > >>> > > fact, if we add a simple "opt-in/opt-out" switch
>> right
>> > >> now, we
>> > >> > > > > might
>> > >> > > > > > >>> find
>> > >> > > > > > >>> > > that it's actually insufficient for whatever future
>> > >> feature we
>> > >> > > > > might
>> > >> > > > > > >>> propose,
>> > >> > > > > > >>> > > then we have a mess of deprecating the opt-out
>> config
>> > >> and
>> > >> > > > > replacing
>> > >> > > > > > >>> it.
>> > >> > > > > > >>> > >
>> > >> > > > > > >>> > > What do you think?
>> > >> > > > > > >>> > > -John
>> > >> > > > > > >>> > >
>> > >> > > > > > >>> > > On Wed, Feb 19, 2020, at 09:50, Bruno Cadonna
>> wrote:
>> > >> > > > > > >>> > > > Hi all,
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> > > > Sorry for the late reply!
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> > > > I am also in favour of baby steps.
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> > > > I am undecided whether the KIP should contain a
>> > >> opt-out config
>> > >> > > > > or
>> > >> > > > > > >>> not.
>> > >> > > > > > >>> > > > The overhead of emit-on-change might affect
>> latency.
>> > >> For
>> > >> > > > > > >>> applications
>> > >> > > > > > >>> > > > where low latency is crucial and there are not
>> too
>> > >> many
>> > >> > > > > idempotent
>> > >> > > > > > >>> > > > updates, it would be better to fall back to
>> > >> emit-on-update.
>> > >> > > > > > >>> However,
>> > >> > > > > > >>> > > > we do not know how much emit-on-change impacts
>> > >> latency. We
>> > >> > > > > would
>> > >> > > > > > >>> first
>> > >> > > > > > >>> > > > need to benchmark that before we can decide
>> about the
>> > >> > > > > > >>> opt-out-config.
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> > > > A metric of dropped idempotent updates seems
>> useful
>> > >> to me to be
>> > >> > > > > > >>> > > > informed about potential upstream applications or
>> > >> upstream
>> > >> > > > > > >>> operators
>> > >> > > > > > >>> > > > that produce too many idempotent updates. The KIP
>> > >> should state
>> > >> > > > > the
>> > >> > > > > > >>> > > > name of the metric, its group, its tags, and its
>> > >> recording
>> > >> > > > > level
>> > >> > > > > > >>> (see
>> > >> > > > > > >>> > > > KIP-444 or KIP-471 for examples). I propose
>> DEBUG as
>> > >> reporting
>> > >> > > > > > >>> level.
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> > > > Richard, what competing proposals for
>> emit-on-change
>> > >> for
>> > >> > > > > > >>> aggregations
>> > >> > > > > > >>> > > > do you mean? I have the feeling that we agreed
>> to get
>> > >> rid of
>> > >> > > > > > >>> > > > idempotent updates if the aggregate is updated
>> with
>> > >> the same
>> > >> > > > > key,
>> > >> > > > > > >>> > > > value, AND timestamp. I am also fine if we do not
>> > >> include this
>> > >> > > > > into
>> > >> > > > > > >>> > > > this KIP (remember: baby steps).
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> > > > You write that "emit-on-change is more correct".
>> > >> Since we
>> > >> > > > > agreed
>> > >> > > > > > >>> that
>> > >> > > > > > >>> > > > this is an optimization, IMO you cannot argue
>> this
>> > >> way.
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> > > > Please put "Alternative Approaches" under
>> "Rejected
>> > >> > > > > Alternatives",
>> > >> > > > > > >>> so
>> > >> > > > > > >>> > > > that it becomes clear that we are not going to
>> > >> implement them.
>> > >> > > > > In
>> > >> > > > > > >>> > > > general, I think the KIP needs a bit of clean-up
>> > >> (probably, you
>> > >> > > > > > >>> > > > already planned for it). "Design Reasoning" is a
>> bit
>> > >> of
>> > >> > > > > behavior
>> > >> > > > > > >>> > > > changes, rejected alternatives and duplicates a
>> bit
>> > >> the
>> > >> > > > > content in
>> > >> > > > > > >>> > > > those sections.
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> > > > I do not like the name "no-op operations" or
>> > >> "no-ops", because
>> > >> > > > > they
>> > >> > > > > > >>> > > > are rather generic. I like more "idempotent
>> updates".
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> > > > Best,
>> > >> > > > > > >>> > > > Bruno
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> > > > On Tue, Feb 18, 2020 at 7:25 PM Richard Yu <
>> > >> > > > > > >>> yohan.richard...@gmail.com> wrote:
>> > >> > > > > > >>> > > > >
>> > >> > > > > > >>> > > > > Hi all,
>> > >> > > > > > >>> > > > >
>> > >> > > > > > >>> > > > > We are definitely making progress!
>> > >> > > > > > >>> > > > >
>> > >> > > > > > >>> > > > > @John should I emphasize in the proposed
>> behavior
>> > >> changes
>> > >> > > > > that
>> > >> > > > > > >>> we are only
>> > >> > > > > > >>> > > > > doing binary equality checks for stateful
>> operators?
>> > >> > > > > > >>> > > > > It looks like we have come close to finalizing
>> this
>> > >> part of
>> > >> > > > > the
>> > >> > > > > > >>> KIP. (I
>> > >> > > > > > >>> > > > > will note in the KIP that this proposal is
>> intended
>> > >> for
>> > >> > > > > > >>> optimization, not
>> > >> > > > > > >>> > > > > semantics correctness)
>> > >> > > > > > >>> > > > >
>> > >> > > > > > >>> > > > > I do think maybe we still have one other
>> detail we
>> > >> need to
>> > >> > > > > > >>> discuss. So far,
>> > >> > > > > > >>> > > > > there has been quite a bit of back and forth
>> about
>> > >> what the
>> > >> > > > > > >>> behavior of
>> > >> > > > > > >>> > > > > aggregations should look like in emit on
>> change. I
>> > >> have seen
>> > >> > > > > > >>> > > > > multiple competing proposals, so I am not
>> > >> completely certain
>> > >> > > > > > >>> which one we
>> > >> > > > > > >>> > > > > should go with, or how we will be able to
>> > >> compromise in
>> > >> > > > > between
>> > >> > > > > > >>> them.
>> > >> > > > > > >>> > > > >
>> > >> > > > > > >>> > > > > Let me know what your thoughts are on this
>> matter,
>> > >> since we
>> > >> > > > > are
>> > >> > > > > > >>> probably
>> > >> > > > > > >>> > > > > close to wrapping up most other stuff.
>> > >> > > > > > >>> > > > > @Matthias J. Sax <matth...@confluent.io>  and
>> > >> @Bruno, see
>> > >> > > > > what
>> > >> > > > > > >>> you think
>> > >> > > > > > >>> > > > > about this.
>> > >> > > > > > >>> > > > >
>> > >> > > > > > >>> > > > > Best,
>> > >> > > > > > >>> > > > > Richard
>> > >> > > > > > >>> > > > >
>> > >> > > > > > >>> > > > >
>> > >> > > > > > >>> > > > >
>> > >> > > > > > >>> > > > > On Tue, Feb 18, 2020 at 9:06 AM John Roesler <
>> > >> > > > > > >>> vvcep...@apache.org> wrote:
>> > >> > > > > > >>> > > > >
>> > >> > > > > > >>> > > > > > Thanks, Matthias!
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>> > > > > > Regarding numbers, it would be hard to know
>> how
>> > >> many
>> > >> > > > > > >>> applications
>> > >> > > > > > >>> > > > > > would benefit, since we don't know how many
>> > >> applications
>> > >> > > > > there
>> > >> > > > > > >>> are,
>> > >> > > > > > >>> > > > > > or anything about their data sets or
>> topologies.
>> > >> We could
>> > >> > > > > do a
>> > >> > > > > > >>> survey,
>> > >> > > > > > >>> > > > > > but it seems overkill if we take the
>> conservative
>> > >> approach.
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>> > > > > > I have my own practical stream processing
>> > >> experience that
>> > >> > > > > > >>> tells me this
>> > >> > > > > > >>> > > > > > is absolutely critical for any
>> moderate-to-large
>> > >> relational
>> > >> > > > > > >>> stream
>> > >> > > > > > >>> > > > > > processing use cases. I'll leave it to you to
>> > >> decide if you
>> > >> > > > > > >>> find that
>> > >> > > > > > >>> > > > > > convincing, but it's definitely not an
>> > >> _assumption_. I've
>> > >> > > > > also
>> > >> > > > > > >>> heard from
>> > >> > > > > > >>> > > > > > a few Streams users who have already had to
>> > >> implement
>> > >> > > > > their own
>> > >> > > > > > >>> > > > > > noop-suppression transformers in order to
>> get to
>> > >> production
>> > >> > > > > > >>> scale.
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>> > > > > > Regardless, it sounds like we can agree on
>> taking
>> > >> an
>> > >> > > > > > >>> opportunistic approach
>> > >> > > > > > >>> > > > > > and targeting the optimization just to use a
>> > >> > > > > binary-equality
>> > >> > > > > > >>> check at
>> > >> > > > > > >>> > > > > > stateful operators. (I'd also suggest in sink
>> > >> nodes, when
>> > >> > > > > we
>> > >> > > > > > >>> are about to
>> > >> > > > > > >>> > > > > > send old and new values, since they are also
>> > >> already
>> > >> > > > > present
>> > >> > > > > > >>> and serialized
>> > >> > > > > > >>> > > > > > at that point.) We could make the KIP even
>> more
>> > >> vague, and
>> > >> > > > > > >>> just say that
>> > >> > > > > > >>> > > > > > we'll drop no-op updates "when possible".
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>> > > > > > I'm curious what Bruno and the others think
>> about
>> > >> this. If
>> > >> > > > > it
>> > >> > > > > > >>> seems like
>> > >> > > > > > >>> > > > > > a good starting point, perhaps we could move
>> to a
>> > >> vote soon
>> > >> > > > > > >>> and get to
>> > >> > > > > > >>> > > > > > work on the implementation!
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>> > > > > > Thanks,
>> > >> > > > > > >>> > > > > > -John
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>> > > > > > On Mon, Feb 17, 2020, at 20:54, Matthias J.
>> Sax
>> > >> wrote:
>> > >> > > > > > >>> > > > > > > Talking about optimizations and reducing
>> > >> downstream load:
>> > >> > > > > > >>> > > > > > >
>> > >> > > > > > >>> > > > > > > Do we actually have any numbers? I have the
>> > >> impression
>> > >> > > > > that
>> > >> > > > > > >>> this KIP is
>> > >> > > > > > >>> > > > > > > more or less build on the _assumption_ that
>> > >> there is a
>> > >> > > > > > >>> problem. Yes,
>> > >> > > > > > >>> > > > > > > there are some use cases that would benefit
>> > >> from this;
>> > >> > > > > But
>> > >> > > > > > >>> how many
>> > >> > > > > > >>> > > > > > > applications would actually benefit? And
>> how
>> > >> much load
>> > >> > > > > > >>> reduction would
>> > >> > > > > > >>> > > > > > > they get?
>> > >> > > > > > >>> > > > > > >
>> > >> > > > > > >>> > > > > > > The simplest approach (following John idea
>> to
>> > >> make baby
>> > >> > > > > > >>> steps) would be
>> > >> > > > > > >>> > > > > > > to apply the emit-on-change pattern only if
>> > >> there is a
>> > >> > > > > > >>> store. For this
>> > >> > > > > > >>> > > > > > > case we need to serialize old and new
>> result
>> > >> anyway and
>> > >> > > > > thus
>> > >> > > > > > >>> a simple
>> > >> > > > > > >>> > > > > > > byte-array comparison is no overhead.
>> > >> > > > > > >>> > > > > > >
>> > >> > > > > > >>> > > > > > > Sending `oldValues` by default would become
>> > >> expensive
>> > >> > > > > > >>> because we would
>> > >> > > > > > >>> > > > > > > need to serialize the recomputed old
>> result, as
>> > >> well as
>> > >> > > > > the
>> > >> > > > > > >>> new result,
>> > >> > > > > > >>> > > > > > > to make the comparison (and we now the
>> > >> serialization is
>> > >> > > > > not
>> > >> > > > > > >>> cheap). We
>> > >> > > > > > >>> > > > > > > are facing a trade-off between CPU
>> overhead and
>> > >> > > > > downstream
>> > >> > > > > > >>> load and I am
>> > >> > > > > > >>> > > > > > > not sure if we should hard code this. My
>> > >> original
>> > >> > > > > argument
>> > >> > > > > > >>> for sending
>> > >> > > > > > >>> > > > > > > `oldValues` was about semantics; but for an
>> > >> > > > > optimization, I
>> > >> > > > > > >>> am not sure
>> > >> > > > > > >>> > > > > > > if this would be the right choice.
>> > >> > > > > > >>> > > > > > >
>> > >> > > > > > >>> > > > > > > For now, users who want to opt-in can
>> force a
>> > >> > > > > > >>> materialization. A
>> > >> > > > > > >>> > > > > > > materialization may be expensive and if we
>> see
>> > >> future
>> > >> > > > > > >>> demand, we could
>> > >> > > > > > >>> > > > > > > still add an option to send `oldValues`
>> instead
>> > >> of
>> > >> > > > > > >>> materialization (this
>> > >> > > > > > >>> > > > > > > would at least save the store overhead).
>> As we
>> > >> consider
>> > >> > > > > the
>> > >> > > > > > >>> KIP an
>> > >> > > > > > >>> > > > > > > optimization, a "config" seems to make
>> sense.
>> > >> > > > > > >>> > > > > > >
>> > >> > > > > > >>> > > > > > >
>> > >> > > > > > >>> > > > > > > -Matthias
>> > >> > > > > > >>> > > > > > >
>> > >> > > > > > >>> > > > > > >
>> > >> > > > > > >>> > > > > > > On 2/17/20 5:21 PM, Richard Yu wrote:
>> > >> > > > > > >>> > > > > > > > Hi John!
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > Thanks for the reply.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > About the changes we have discussed so
>> far. I
>> > >> think
>> > >> > > > > upon
>> > >> > > > > > >>> further
>> > >> > > > > > >>> > > > > > > > consideration, we have been mostly
>> talking
>> > >> about this
>> > >> > > > > from
>> > >> > > > > > >>> the
>> > >> > > > > > >>> > > > > > perspective
>> > >> > > > > > >>> > > > > > > > that no stop-gap effort is acceptable.
>> > >> However, in
>> > >> > > > > recent
>> > >> > > > > > >>> discussion,
>> > >> > > > > > >>> > > > > > > if we
>> > >> > > > > > >>> > > > > > > > consider optimization, then it appears
>> that
>> > >> the
>> > >> > > > > > >>> perspective I
>> > >> > > > > > >>> > > > > > mentioned no
>> > >> > > > > > >>> > > > > > > > longer applies. After all, we are no
>> longer
>> > >> concerned
>> > >> > > > > so
>> > >> > > > > > >>> much about
>> > >> > > > > > >>> > > > > > > > semantics correctness, then reducing
>> traffic
>> > >> as much as
>> > >> > > > > > >>> possible
>> > >> > > > > > >>> > > > > > without
>> > >> > > > > > >>> > > > > > > > performance tradeoffs.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > In this case, I think a cache would be a
>> good
>> > >> idea for
>> > >> > > > > > >>> stateless
>> > >> > > > > > >>> > > > > > > > operations. This cache will not be
>> backed by
>> > >> a store
>> > >> > > > > > >>> obviously. We can
>> > >> > > > > > >>> > > > > > > > probably use Kafka's ThreadCache. We
>> should
>> > >> be able to
>> > >> > > > > > >>> catch a large
>> > >> > > > > > >>> > > > > > > > portion of the no-ops if we at least
>> store
>> > >> some
>> > >> > > > > results in
>> > >> > > > > > >>> the cache.
>> > >> > > > > > >>> > > > > > Not
>> > >> > > > > > >>> > > > > > > > all will be caught, but I think the
>> impact
>> > >> will be
>> > >> > > > > > >>> significant.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > On another note, I think that we should
>> > >> implement
>> > >> > > > > > >>> competing proposals
>> > >> > > > > > >>> > > > > > i.e.
>> > >> > > > > > >>> > > > > > > > one where we forward both old and new
>> values
>> > >> with a
>> > >> > > > > > >>> reasonable
>> > >> > > > > > >>> > > > > > proportion
>> > >> > > > > > >>> > > > > > > > of artificial no-ops (we do not
>> necessarily
>> > >> have to
>> > >> > > > > rely
>> > >> > > > > > >>> on equals so
>> > >> > > > > > >>> > > > > > much
>> > >> > > > > > >>> > > > > > > > as comparing the serialized binary data
>> after
>> > >> the
>> > >> > > > > > >>> operation), and in
>> > >> > > > > > >>> > > > > > > > another scenario, the cache for stateless
>> > >> ops. It
>> > >> > > > > would be
>> > >> > > > > > >>> > > > > > unreasonable if
>> > >> > > > > > >>> > > > > > > > we completely disregard either approach,
>> > >> since they
>> > >> > > > > both
>> > >> > > > > > >>> have merit.
>> > >> > > > > > >>> > > > > > The
>> > >> > > > > > >>> > > > > > > > reason for implementing both is to
>> perform
>> > >> benchmark
>> > >> > > > > tests
>> > >> > > > > > >>> on them, and
>> > >> > > > > > >>> > > > > > > > compare them with the original. This
>> way, we
>> > >> can more
>> > >> > > > > > >>> clearly see what
>> > >> > > > > > >>> > > > > > is
>> > >> > > > > > >>> > > > > > > > the drawbacks and the gains. So far, we
>> have
>> > >> been
>> > >> > > > > > >>> discussing only
>> > >> > > > > > >>> > > > > > > > hypotheticals, and if we continue to do
>> so, I
>> > >> think it
>> > >> > > > > is
>> > >> > > > > > >>> likely no
>> > >> > > > > > >>> > > > > > ground
>> > >> > > > > > >>> > > > > > > > will be gained.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > After all, what we seek is optimization,
>> and
>> > >> > > > > performance
>> > >> > > > > > >>> benchmarks
>> > >> > > > > > >>> > > > > > > will be
>> > >> > > > > > >>> > > > > > > > mandatory for a KIP of this nature.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > Hope this helps,
>> > >> > > > > > >>> > > > > > > > Richard
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > On Mon, Feb 17, 2020 at 2:12 PM John
>> Roesler <
>> > >> > > > > > >>> vvcep...@apache.org>
>> > >> > > > > > >>> > > > > > wrote:
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >> Hi again, all,
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> Sorry on my part for my silence.
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> I've just taken another look over the
>> recent
>> > >> history
>> > >> > > > > of
>> > >> > > > > > >>> this
>> > >> > > > > > >>> > > > > > > discussion. It
>> > >> > > > > > >>> > > > > > > >> seems like the #1 point to clarify
>> (because
>> > >> it affect
>> > >> > > > > > >>> everything
>> > >> > > > > > >>> > > > > > else) is
>> > >> > > > > > >>> > > > > > > >> that,
>> > >> > > > > > >>> > > > > > > >> yes, I was 100% envisioning this as an
>> > >> _optimization_.
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> As a consequence, I don't think it's
>> > >> critical to make
>> > >> > > > > any
>> > >> > > > > > >>> hard
>> > >> > > > > > >>> > > > > > guarantees
>> > >> > > > > > >>> > > > > > > >> about
>> > >> > > > > > >>> > > > > > > >> what results get forwarded and what
>> (no-op
>> > >> updates)
>> > >> > > > > get
>> > >> > > > > > >>> dropped. I'd
>> > >> > > > > > >>> > > > > > > >> initially
>> > >> > > > > > >>> > > > > > > >> just been thinking about doing this
>> > >> opportunistically
>> > >> > > > > in
>> > >> > > > > > >>> cases where
>> > >> > > > > > >>> > > > > > we
>> > >> > > > > > >>> > > > > > > >> already
>> > >> > > > > > >>> > > > > > > >> had the "old" and "new" result in
>> memory,
>> > >> thanks to a
>> > >> > > > > > >>> request to
>> > >> > > > > > >>> > > > > > > "emit old
>> > >> > > > > > >>> > > > > > > >> values", or to the implementation of
>> > >> timestamp
>> > >> > > > > semantics.
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> However, whether or not it's
>> semantically
>> > >> critical, I
>> > >> > > > > do
>> > >> > > > > > >>> think that
>> > >> > > > > > >>> > > > > > > >> Matthias's
>> > >> > > > > > >>> > > > > > > >> idea to use the change-forwarding
>> mechanism
>> > >> to check
>> > >> > > > > for
>> > >> > > > > > >>> no-ops even
>> > >> > > > > > >>> > > > > > on
>> > >> > > > > > >>> > > > > > > >> stateless operations is pretty
>> interesting.
>> > >> > > > > Specifically,
>> > >> > > > > > >>> this would
>> > >> > > > > > >>> > > > > > > >> _really_
>> > >> > > > > > >>> > > > > > > >> let you pare down useless updates by
>> using
>> > >> mapValues
>> > >> > > > > to
>> > >> > > > > > >>> strip down
>> > >> > > > > > >>> > > > > > > records
>> > >> > > > > > >>> > > > > > > >> only
>> > >> > > > > > >>> > > > > > > >> to what you really need. However, the
>> > >> dependence on
>> > >> > > > > the
>> > >> > > > > > >>> > > > > > implementation of
>> > >> > > > > > >>> > > > > > > >> equals() is troubling.
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> It might make sense to table this idea,
>> as
>> > >> well as my
>> > >> > > > > > >>> complicated
>> > >> > > > > > >>> > > > > > no-op
>> > >> > > > > > >>> > > > > > > >> detection algorithm, and initially
>> propose
>> > >> just a
>> > >> > > > > > >>> nonconfigurable
>> > >> > > > > > >>> > > > > > feature
>> > >> > > > > > >>> > > > > > > >> to
>> > >> > > > > > >>> > > > > > > >> check "old" and "new" results for binary
>> > >> equality
>> > >> > > > > before
>> > >> > > > > > >>> forwarding.
>> > >> > > > > > >>> > > > > > > I.e.,
>> > >> > > > > > >>> > > > > > > >> if
>> > >> > > > > > >>> > > > > > > >> any operation determines that the old
>> and
>> > >> new results
>> > >> > > > > are
>> > >> > > > > > >>> > > > > > > >> binary-identical, we
>> > >> > > > > > >>> > > > > > > >> would not forward.
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> I'll admit that this doesn't serve
>> Tommy's
>> > >> use case
>> > >> > > > > very
>> > >> > > > > > >>> well, but it
>> > >> > > > > > >>> > > > > > > >> might be
>> > >> > > > > > >>> > > > > > > >> better to take baby steps with an
>> > >> optimization like
>> > >> > > > > this
>> > >> > > > > > >>> and not risk
>> > >> > > > > > >>> > > > > > > >> over-reaching in a way that actually
>> harms
>> > >> > > > > performance or
>> > >> > > > > > >>> > > > > > correctness. We
>> > >> > > > > > >>> > > > > > > >> could
>> > >> > > > > > >>> > > > > > > >> always expand the feature to use
>> equals() or
>> > >> some
>> > >> > > > > kind of
>> > >> > > > > > >>> > > > > > ChangeDetector
>> > >> > > > > > >>> > > > > > > >> later
>> > >> > > > > > >>> > > > > > > >> on, in a more focused discussion.
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> Regarding metrics or debug logs, I
>> guess I
>> > >> don't feel
>> > >> > > > > > >>> strongly, but it
>> > >> > > > > > >>> > > > > > > >> feels
>> > >> > > > > > >>> > > > > > > >> like two things will happen that make it
>> > >> nicer to add
>> > >> > > > > > >>> them:
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> 1. This feature is going to
>> surprise/annoy
>> > >> _somebody_,
>> > >> > > > > > >>> and it would be
>> > >> > > > > > >>> > > > > > > >> nice to
>> > >> > > > > > >>> > > > > > > >> be able to definitively say the reason
>> that
>> > >> updates
>> > >> > > > > are
>> > >> > > > > > >>> dropped is
>> > >> > > > > > >>> > > > > > that
>> > >> > > > > > >>> > > > > > > >> they
>> > >> > > > > > >>> > > > > > > >> were no-ops. The easiest smoking gun is
>> if
>> > >> there are
>> > >> > > > > > >>> debug-logs that
>> > >> > > > > > >>> > > > > > > can be
>> > >> > > > > > >>> > > > > > > >> enabled. This person might just be
>> looking
>> > >> at the
>> > >> > > > > > >>> dashboards,
>> > >> > > > > > >>> > > > > > > wondering why
>> > >> > > > > > >>> > > > > > > >> there are 100K updates per second going
>> into
>> > >> their
>> > >> > > > > app,
>> > >> > > > > > >>> but only 1K
>> > >> > > > > > >>> > > > > > > >> results per
>> > >> > > > > > >>> > > > > > > >> second coming out. Having the metric
>> there
>> > >> makes the
>> > >> > > > > > >>> accounting
>> > >> > > > > > >>> > > > > > easier.
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> 2. Somebody is going to struggle with
>> > >> high-volume
>> > >> > > > > > >>> updates, and it
>> > >> > > > > > >>> > > > > > > would be
>> > >> > > > > > >>> > > > > > > >> nice
>> > >> > > > > > >>> > > > > > > >> for them to know that this feature is
>> saving
>> > >> them
>> > >> > > > > > >>> X-thousand updates
>> > >> > > > > > >>> > > > > > per
>> > >> > > > > > >>> > > > > > > >> second,
>> > >> > > > > > >>> > > > > > > >> etc.
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> What does everyone think about this?
>> Note,
>> > >> as I read
>> > >> > > > > it,
>> > >> > > > > > >>> what I've
>> > >> > > > > > >>> > > > > > said
>> > >> > > > > > >>> > > > > > > >> above is
>> > >> > > > > > >>> > > > > > > >> already reflected in the text of the
>> KIP.
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> Thanks,
>> > >> > > > > > >>> > > > > > > >> -John
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >> On Tue, Feb 11, 2020, at 18:27, Richard
>> Yu
>> > >> wrote:
>> > >> > > > > > >>> > > > > > > >>> Hi all,
>> > >> > > > > > >>> > > > > > > >>>
>> > >> > > > > > >>> > > > > > > >>> Bumping this. If you feel that this
>> KIP is
>> > >> not too
>> > >> > > > > > >>> urgent. Then let
>> > >> > > > > > >>> > > > > > me
>> > >> > > > > > >>> > > > > > > >>> know. :)
>> > >> > > > > > >>> > > > > > > >>>
>> > >> > > > > > >>> > > > > > > >>> Cheers,
>> > >> > > > > > >>> > > > > > > >>> Richard
>> > >> > > > > > >>> > > > > > > >>>
>> > >> > > > > > >>> > > > > > > >>> On Thu, Feb 6, 2020 at 4:55 PM Richard
>> Yu <
>> > >> > > > > > >>> > > > > > yohan.richard...@gmail.com>
>> > >> > > > > > >>> > > > > > > >>> wrote:
>> > >> > > > > > >>> > > > > > > >>>
>> > >> > > > > > >>> > > > > > > >>>> Hi all,
>> > >> > > > > > >>> > > > > > > >>>>
>> > >> > > > > > >>> > > > > > > >>>> I've had just a few thoughts
>> regarding the
>> > >> > > > > forwarding
>> > >> > > > > > >>> of <key,
>> > >> > > > > > >>> > > > > > > >>>> change<old_value, new_value>>. As
>> Matthias
>> > >> already
>> > >> > > > > > >>> mentioned, there
>> > >> > > > > > >>> > > > > > > >> are two
>> > >> > > > > > >>> > > > > > > >>>> separate priorities by which we can
>> judge
>> > >> this KIP:
>> > >> > > > > > >>> > > > > > > >>>>
>> > >> > > > > > >>> > > > > > > >>>> 1. A optimization perspective: In this
>> > >> case, the
>> > >> > > > > user
>> > >> > > > > > >>> would prefer
>> > >> > > > > > >>> > > > > > the
>> > >> > > > > > >>> > > > > > > >>>> impact of this KIP to be as minimal as
>> > >> possible. By
>> > >> > > > > > >>> such logic, if
>> > >> > > > > > >>> > > > > > > >>>> stateless operations are performed
>> twice,
>> > >> that could
>> > >> > > > > > >>> prove
>> > >> > > > > > >>> > > > > > > >> unacceptable for
>> > >> > > > > > >>> > > > > > > >>>> them. (since operations can prove
>> > >> expensive)
>> > >> > > > > > >>> > > > > > > >>>>
>> > >> > > > > > >>> > > > > > > >>>> 2. Semantics correctness perspective:
>> > >> Unlike the
>> > >> > > > > > >>> optimization
>> > >> > > > > > >>> > > > > > > >> approach, we
>> > >> > > > > > >>> > > > > > > >>>> are more concerned with all KTable
>> > >> operations
>> > >> > > > > obeying
>> > >> > > > > > >>> the same
>> > >> > > > > > >>> > > > > > emission
>> > >> > > > > > >>> > > > > > > >>>> policy. i.e. emit on change. In this
>> case,
>> > >> a
>> > >> > > > > > >>> discrepancy would not
>> > >> > > > > > >>> > > > > > be
>> > >> > > > > > >>> > > > > > > >>>> tolerated, even though an extra
>> > >> performance cost
>> > >> > > > > will
>> > >> > > > > > >>> be incurred.
>> > >> > > > > > >>> > > > > > > >>>> Therefore, we will follow Matthias's
>> > >> approach, and
>> > >> > > > > then
>> > >> > > > > > >>> perform the
>> > >> > > > > > >>> > > > > > > >>>> operation once on the old value, and
>> once
>> > >> on the
>> > >> > > > > new.
>> > >> > > > > > >>> > > > > > > >>>>
>> > >> > > > > > >>> > > > > > > >>>> The issue here I think is more black
>> and
>> > >> white than
>> > >> > > > > in
>> > >> > > > > > >>> between. The
>> > >> > > > > > >>> > > > > > > >> second
>> > >> > > > > > >>> > > > > > > >>>> option in particular would be
>> favorable
>> > >> for users
>> > >> > > > > with
>> > >> > > > > > >>> inexpensive
>> > >> > > > > > >>> > > > > > > >>>> stateless operations, while for the
>> former
>> > >> option,
>> > >> > > > > we
>> > >> > > > > > >>> are probably
>> > >> > > > > > >>> > > > > > > >> dealing
>> > >> > > > > > >>> > > > > > > >>>> with more expensive ones. So the
>> simplest
>> > >> solution
>> > >> > > > > is
>> > >> > > > > > >>> probably to
>> > >> > > > > > >>> > > > > > > >> allow the
>> > >> > > > > > >>> > > > > > > >>>> user to choose one of the behaviors,
>> and
>> > >> have a
>> > >> > > > > config
>> > >> > > > > > >>> which can
>> > >> > > > > > >>> > > > > > > >> switch in
>> > >> > > > > > >>> > > > > > > >>>> between them.
>> > >> > > > > > >>> > > > > > > >>>>
>> > >> > > > > > >>> > > > > > > >>>> Its the simplest compromise I can
>> come up
>> > >> with at
>> > >> > > > > the
>> > >> > > > > > >>> moment, but if
>> > >> > > > > > >>> > > > > > > >> you
>> > >> > > > > > >>> > > > > > > >>>> think you have a better plan which
>> could
>> > >> better
>> > >> > > > > balance
>> > >> > > > > > >>> tradeoffs.
>> > >> > > > > > >>> > > > > > Then
>> > >> > > > > > >>> > > > > > > >>>> please let us know. :)
>> > >> > > > > > >>> > > > > > > >>>>
>> > >> > > > > > >>> > > > > > > >>>> Best,
>> > >> > > > > > >>> > > > > > > >>>> Richard
>> > >> > > > > > >>> > > > > > > >>>>
>> > >> > > > > > >>> > > > > > > >>>> On Wed, Feb 5, 2020 at 5:12 PM John
>> > >> Roesler <
>> > >> > > > > > >>> vvcep...@apache.org>
>> > >> > > > > > >>> > > > > > > >> wrote:
>> > >> > > > > > >>> > > > > > > >>>>
>> > >> > > > > > >>> > > > > > > >>>>> Hi all,
>> > >> > > > > > >>> > > > > > > >>>>>
>> > >> > > > > > >>> > > > > > > >>>>> Thanks for the thoughtful comments!
>> > >> > > > > > >>> > > > > > > >>>>>
>> > >> > > > > > >>> > > > > > > >>>>> I need more time to reflect on your
>> > >> thoughts, but
>> > >> > > > > just
>> > >> > > > > > >>> wanted to
>> > >> > > > > > >>> > > > > > offer
>> > >> > > > > > >>> > > > > > > >>>>> a quick clarification about equals().
>> > >> > > > > > >>> > > > > > > >>>>>
>> > >> > > > > > >>> > > > > > > >>>>> I only meant that we can't be sure
>> if a
>> > >> class's
>> > >> > > > > > >>> equals()
>> > >> > > > > > >>> > > > > > > >> implementation
>> > >> > > > > > >>> > > > > > > >>>>> returns true for two semantically
>> > >> identical
>> > >> > > > > instances.
>> > >> > > > > > >>> I.e., if a
>> > >> > > > > > >>> > > > > > > >> class
>> > >> > > > > > >>> > > > > > > >>>>> doesn't
>> > >> > > > > > >>> > > > > > > >>>>> override the default equals()
>> > >> implementation, then
>> > >> > > > > we
>> > >> > > > > > >>> would see
>> > >> > > > > > >>> > > > > > > >> behavior
>> > >> > > > > > >>> > > > > > > >>>>> like:
>> > >> > > > > > >>> > > > > > > >>>>>
>> > >> > > > > > >>> > > > > > > >>>>> new MyPair("A", 1).equals(new
>> MyPair("A",
>> > >> 1))
>> > >> > > > > returns
>> > >> > > > > > >>> false
>> > >> > > > > > >>> > > > > > > >>>>>
>> > >> > > > > > >>> > > > > > > >>>>> In that case, I would still like to
>> catch
>> > >> no-op
>> > >> > > > > > >>> updates by
>> > >> > > > > > >>> > > > > > comparing
>> > >> > > > > > >>> > > > > > > >> the
>> > >> > > > > > >>> > > > > > > >>>>> serialized form of the records when
>> we
>> > >> happen to
>> > >> > > > > have
>> > >> > > > > > >>> it serialized
>> > >> > > > > > >>> > > > > > > >> anyway
>> > >> > > > > > >>> > > > > > > >>>>> (such as when the operation is
>> stateful,
>> > >> or when
>> > >> > > > > we're
>> > >> > > > > > >>> sending to a
>> > >> > > > > > >>> > > > > > > >>>>> repartition topic and we have both
>> the
>> > >> "new" and
>> > >> > > > > "old"
>> > >> > > > > > >>> value from
>> > >> > > > > > >>> > > > > > > >>>>> upstream).
>> > >> > > > > > >>> > > > > > > >>>>>
>> > >> > > > > > >>> > > > > > > >>>>> I didn't mean to suggest we'd try to
>> use
>> > >> > > > > reflection to
>> > >> > > > > > >>> detect
>> > >> > > > > > >>> > > > > > whether
>> > >> > > > > > >>> > > > > > > >>>>> equals
>> > >> > > > > > >>> > > > > > > >>>>> is implemented, although that is a
>> neat
>> > >> trick. I
>> > >> > > > > was
>> > >> > > > > > >>> thinking more
>> > >> > > > > > >>> > > > > > of
>> > >> > > > > > >>> > > > > > > >> a
>> > >> > > > > > >>> > > > > > > >>>>> belt-and-suspenders algorithm where
>> we do
>> > >> the check
>> > >> > > > > > >>> for no-ops
>> > >> > > > > > >>> > > > > > based
>> > >> > > > > > >>> > > > > > > >> on
>> > >> > > > > > >>> > > > > > > >>>>> equals() and then _also_ check the
>> > >> serialized bytes
>> > >> > > > > > >>> for equality.
>> > >> > > > > > >>> > > > > > > >>>>>
>> > >> > > > > > >>> > > > > > > >>>>> Thanks,
>> > >> > > > > > >>> > > > > > > >>>>> -John
>> > >> > > > > > >>> > > > > > > >>>>>
>> > >> > > > > > >>> > > > > > > >>>>> On Wed, Feb 5, 2020, at 15:31, Ted Yu
>> > >> wrote:
>> > >> > > > > > >>> > > > > > > >>>>>> Thanks for the comments, Matthias.
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>> w.r.t. requirement of an `equals()`
>> > >> > > > > implementation,
>> > >> > > > > > >>> each template
>> > >> > > > > > >>> > > > > > > >> type
>> > >> > > > > > >>> > > > > > > >>>>>> would have an equals() method. We
>> can
>> > >> use the
>> > >> > > > > > >>> following code to
>> > >> > > > > > >>> > > > > > know
>> > >> > > > > > >>> > > > > > > >>>>>> whether it is provided by JVM or
>> > >> provided by user.
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>> boolean customEquals = false;
>> > >> > > > > > >>> > > > > > > >>>>>> try {
>> > >> > > > > > >>> > > > > > > >>>>>>     Class cls =
>> > >> > > > > value.getClass().getMethod("equals",
>> > >> > > > > > >>> > > > > > > >>>>>> Object.class).getDeclaringClass();
>> > >> > > > > > >>> > > > > > > >>>>>>     if (!Object.class.equals(cls)) {
>> > >> > > > > > >>> > > > > > > >>>>>>         customEquals = true;
>> > >> > > > > > >>> > > > > > > >>>>>>     }
>> > >> > > > > > >>> > > > > > > >>>>>> } catch (NoSuchMethodException
>> nsme) {
>> > >> > > > > > >>> > > > > > > >>>>>>     // equals is always defined,
>> this
>> > >> wouldn't hit
>> > >> > > > > > >>> > > > > > > >>>>>> }
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>> The next question is: what if the
>> user
>> > >> doesn't
>> > >> > > > > > >>> provide equals()
>> > >> > > > > > >>> > > > > > > >> method ?
>> > >> > > > > > >>> > > > > > > >>>>>> Would we automatically fall back to
>> > >> > > > > emit-on-update ?
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>> Cheers
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>> On Tue, Feb 4, 2020 at 1:37 PM
>> Matthias
>> > >> J. Sax <
>> > >> > > > > > >>> mj...@apache.org>
>> > >> > > > > > >>> > > > > > > >>>>> wrote:
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > > > > First a high level comment:
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > Overall, I would like to make one step
>> back,
>> > >> and make
>> > >> > > > > sure
>> > >> > > > > > >>> we are
>> > >> > > > > > >>> > > > > > > > discussion on the same level.
>> Originally, I
>> > >> understood
>> > >> > > > > > >>> this KIP
>> > >> > > > > > >>> > > > > > > >>> as a
>> > >> > > > > > >>> > > > > > > > proposed change of _semantics_, however,
>> > >> given the
>> > >> > > > > latest
>> > >> > > > > > >>> > > > > > > >>> discussion
>> > >> > > > > > >>> > > > > > > > it seems it's actually not -- it's more
>> an
>> > >> > > > > _optimization_
>> > >> > > > > > >>> > > > > > > >>> proposal.
>> > >> > > > > > >>> > > > > > > > Hence, we only need to make sure that
>> this
>> > >> optimization
>> > >> > > > > > >>> does not
>> > >> > > > > > >>> > > > > > > >>> break
>> > >> > > > > > >>> > > > > > > > existing semantics. It this the right
>> way to
>> > >> think
>> > >> > > > > about
>> > >> > > > > > >>> it?
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > If yes, than it might actually be ok to
>> have
>> > >> different
>> > >> > > > > > >>> behavior
>> > >> > > > > > >>> > > > > > > > depending if there is a materialized
>> KTable
>> > >> or not. So
>> > >> > > > > > >>> far, we
>> > >> > > > > > >>> > > > > > > >>> never
>> > >> > > > > > >>> > > > > > > > defined a public contract about our emit
>> > >> strategy and
>> > >> > > > > it
>> > >> > > > > > >>> seems
>> > >> > > > > > >>> > > > > > > >>> this
>> > >> > > > > > >>> > > > > > > > KIP does not define one either.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > Hence, I don't have as strong of an
>> opinion
>> > >> about
>> > >> > > > > sending
>> > >> > > > > > >>> > > > > > > >>> oldValues
>> > >> > > > > > >>> > > > > > > > for example any longer. I guess the
>> question
>> > >> is really,
>> > >> > > > > > >>> what can
>> > >> > > > > > >>> > > > > > > >>> we
>> > >> > > > > > >>> > > > > > > > implement in a reasonable way.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > Other comments:
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > @Richard:
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > Can you please add the KIP to the KIP
>> > >> overview table:
>> > >> > > > > It's
>> > >> > > > > > >>> missing
>> > >> > > > > > >>> > > > > > > > (
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > > > >>>
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>>
>> > >> > > > >
>> > >>
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro
>> > >> > > > > > >>> > > > > > > > posals).
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > @Bruno:
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > You mentioned caching. I think it's
>> irrelevant
>> > >> > > > > > >>> (orthogonal) and
>> > >> > > > > > >>> > > > > > > >>> we can
>> > >> > > > > > >>> > > > > > > > discuss this KIP without considering it.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > @John:
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >>>>>>>>> Even in the source table, we
>> forward
>> > >> the
>> > >> > > > > updated
>> > >> > > > > > >>> record with
>> > >> > > > > > >>> > > > > > the
>> > >> > > > > > >>> > > > > > > >>>>>>>>> higher of the two timestamps. So
>> the
>> > >> example is
>> > >> > > > > > >>> more like:
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > That is not correct. Currently, we
>> forward
>> > >> with the
>> > >> > > > > smaller
>> > >> > > > > > >>> > > > > > > > out-of-order timestamp (changing the
>> > >> timestamp would
>> > >> > > > > > >>> corrupt the
>> > >> > > > > > >>> > > > > > > >>> data
>> > >> > > > > > >>> > > > > > > > -- we don't know, because we don't
>> check, if
>> > >> the value
>> > >> > > > > is
>> > >> > > > > > >>> the
>> > >> > > > > > >>> > > > > > > >>> same
>> > >> > > > > > >>> > > > > > > >>>>>> or
>> > >> > > > > > >>> > > > > > > > a different one, hence, we must emit the
>> > >> out-of-order
>> > >> > > > > > >>> record
>> > >> > > > > > >>> > > > > > > >>> as-is).
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > If we start to do emit-on-change, we also
>> > >> need to emit
>> > >> > > > > a
>> > >> > > > > > >>> new
>> > >> > > > > > >>> > > > > > > >>> record if
>> > >> > > > > > >>> > > > > > > > the timestamp changes due to out-of-order
>> > >> data, hence,
>> > >> > > > > we
>> > >> > > > > > >>> would
>> > >> > > > > > >>> > > > > > > >>> still
>> > >> > > > > > >>> > > > > > > > need to emit <K,V,T1> because that give
>> us
>> > >> correct
>> > >> > > > > > >>> semantics:
>> > >> > > > > > >>> > > > > > > >>> assume
>> > >> > > > > > >>> > > > > > > > you have a filter() and afterward use the
>> > >> filter
>> > >> > > > > KTable in
>> > >> > > > > > >>> a
>> > >> > > > > > >>> > > > > > > > stream-table join -- the lower T1
>> timestamp
>> > >> must be
>> > >> > > > > > >>> propagated to
>> > >> > > > > > >>> > > > > > > >>> the
>> > >> > > > > > >>> > > > > > > > filtered KTable to ensure that that the
>> > >> stream-table
>> > >> > > > > join
>> > >> > > > > > >>> compute
>> > >> > > > > > >>> > > > > > > >>> the
>> > >> > > > > > >>> > > > > > > > correct result.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > Your point about requiring an `equals()`
>> > >> > > > > implementation is
>> > >> > > > > > >>> > > > > > > >>> actually a
>> > >> > > > > > >>> > > > > > > > quite interesting one and boils down to
>> my
>> > >> statement
>> > >> > > > > from
>> > >> > > > > > >>> above
>> > >> > > > > > >>> > > > > > > >>> about
>> > >> > > > > > >>> > > > > > > > "what can we actually implement". What I
>> don't
>> > >> > > > > understand
>> > >> > > > > > >>> is:
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >>>>>>>>> This way, we still don't have to
>> rely
>> > >> on the
>> > >> > > > > > >>> existence of an
>> > >> > > > > > >>> > > > > > > >>>>>>>>> equals() method, but if it is
>> there,
>> > >> we can
>> > >> > > > > > >>> benefit from it.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > Your bullet point (2) says it uses
>> `equals()`
>> > >> --
>> > >> > > > > hence, it
>> > >> > > > > > >>> seems
>> > >> > > > > > >>> > > > > > > >>> we
>> > >> > > > > > >>> > > > > > > > actually to rely on it? Also, how can we
>> > >> detect if
>> > >> > > > > there
>> > >> > > > > > >>> is an
>> > >> > > > > > >>> > > > > > > > `equals()` method to do the comparison?
>> Would
>> > >> be fail
>> > >> > > > > if
>> > >> > > > > > >>> we don't
>> > >> > > > > > >>> > > > > > > >>> have
>> > >> > > > > > >>> > > > > > > > `equals()` nor corresponding serializes
>> to do
>> > >> the
>> > >> > > > > > >>> comparison?
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >>>>>>>>> Wow, really good catch! Yes, we
>> > >> absolutely need
>> > >> > > > > > >>> metrics and
>> > >> > > > > > >>> > > > > > > >>> logs if
>> > >> > > > > > >>> > > > > > > >>>>>>>>> we're going to drop any records.
>> And,
>> > >> yes, we
>> > >> > > > > > >>> should propose
>> > >> > > > > > >>> > > > > > > >>>>>>>>> metrics and logs that are
>> similar to
>> > >> the
>> > >> > > > > existing
>> > >> > > > > > >>> ones when we
>> > >> > > > > > >>> > > > > > > >>> drop
>> > >> > > > > > >>> > > > > > > >>>>>>>>> records for other reasons.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > I am not sure about this point. In fact,
>> we
>> > >> have
>> > >> > > > > already
>> > >> > > > > > >>> some
>> > >> > > > > > >>> > > > > > > >>> no-ops
>> > >> > > > > > >>> > > > > > > > in Kafka Streams in our join-operators
>> and
>> > >> don't report
>> > >> > > > > > >>> any of
>> > >> > > > > > >>> > > > > > > >>> those
>> > >> > > > > > >>> > > > > > > > either. Emit-on-change is operator
>> semantics
>> > >> and I
>> > >> > > > > don't
>> > >> > > > > > >>> see why
>> > >> > > > > > >>> > > > > > > >>> we
>> > >> > > > > > >>> > > > > > > > would need to have a metric for it? It
>> seems
>> > >> to be
>> > >> > > > > quite
>> > >> > > > > > >>> different
>> > >> > > > > > >>> > > > > > > > compared to dropping late or malformed
>> > >> records.
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > -Matthias
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > > On 2/4/20 7:13 AM, Thomas Becker wrote:
>> > >> > > > > > >>> > > > > > > >>>>>>>>> Thanks John for your thoughtful
>> > >> reply. Some
>> > >> > > > > > >>> comments inline.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>> On Mon, 2020-02-03 at 11:51
>> -0600,
>> > >> John Roesler
>> > >> > > > > > >>> wrote:
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This
>> > >> email was
>> > >> > > > > sent
>> > >> > > > > > >>> from outside
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or
>> > >> attachments
>> > >> > > > > > >>> unless you
>> > >> > > > > > >>> > > > > > expected
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> them.
>> > >> ________________________________
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Tommy,
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Thanks for the context. I can
>> see the
>> > >> > > > > attraction
>> > >> > > > > > >>> of
>> > >> > > > > > >>> > > > > > considering
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> these use cases together.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> To answer your question, if a
>> part
>> > >> of the
>> > >> > > > > record
>> > >> > > > > > >>> is not
>> > >> > > > > > >>> > > > > > > >>> relevant
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> to downstream consumers, I was
>> > >> thinking you
>> > >> > > > > could
>> > >> > > > > > >>> just use a
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> mapValue to remove it.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> E.g., suppose you wanted to do a
>> > >> join between
>> > >> > > > > two
>> > >> > > > > > >>> tables.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> employeeInfo.join(
>> employeePayroll,
>> > >> (info,
>> > >> > > > > > >>> payroll) -> new
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Result(info.name(),
>> > >> payroll.salary()) )
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> We only care about one attribute
>> > >> from the Info
>> > >> > > > > > >>> table (name),
>> > >> > > > > > >>> > > > > > > >>> and
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> one from the Payroll table
>> (salary),
>> > >> and these
>> > >> > > > > > >>> attributes
>> > >> > > > > > >>> > > > > > > >>> change
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> rarely. On the other hand, there
>> > >> might be many
>> > >> > > > > > >>> other
>> > >> > > > > > >>> > > > > > attributes
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> that change frequently of these
>> > >> tables. We can
>> > >> > > > > > >>> avoid
>> > >> > > > > > >>> > > > > > triggering
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the join unnecessarily by
>> mapping
>> > >> the input
>> > >> > > > > > >>> tables to drop the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> unnecessary information before
>> the
>> > >> join:
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> names =
>> employeeInfo.mapValues(info
>> > >> ->
>> > >> > > > > info.name())
>> > >> > > > > > >>> salaries
>> > >> > > > > > >>> > > > > > =
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> employeePayroll.mapValues(payroll ->
>> > >> > > > > > >>> payroll.salary())
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> names.join( salaries, (name,
>> salary)
>> > >> -> new
>> > >> > > > > > >>> Result(name,
>> > >> > > > > > >>> > > > > > > >>> salary)
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> )
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>> Ahh yes I see. This works, but
>> in the
>> > >> case
>> > >> > > > > where
>> > >> > > > > > >>> you're using
>> > >> > > > > > >>> > > > > > > >>>>>>>>> schemas as we are (e.g. Avro), it
>> > >> seems like
>> > >> > > > > this
>> > >> > > > > > >>> approach
>> > >> > > > > > >>> > > > > > could
>> > >> > > > > > >>> > > > > > > >>>>>>>>> lead to a proliferation of
>> "skinny"
>> > >> record
>> > >> > > > > types
>> > >> > > > > > >>> that just drop
>> > >> > > > > > >>> > > > > > > >>>>>>>>> various fields.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Especially if we take Matthias's
>> > >> idea to drop
>> > >> > > > > > >>> non-changes even
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> for stateless operations, this
>> would
>> > >> be quite
>> > >> > > > > > >>> efficient and is
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> also a very straightforward
>> > >> optimization to
>> > >> > > > > > >>> understand once
>> > >> > > > > > >>> > > > > > you
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> know that Streams provides
>> > >> emit-on-change.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> From the context that you
>> provided,
>> > >> it seems
>> > >> > > > > like
>> > >> > > > > > >>> a slightly
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> different situation, though.
>> Reading
>> > >> between
>> > >> > > > > the
>> > >> > > > > > >>> lines a
>> > >> > > > > > >>> > > > > > > >>> little,
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> it sounds like: in contrast to
>> the
>> > >> example
>> > >> > > > > above,
>> > >> > > > > > >>> in which we
>> > >> > > > > > >>> > > > > > > >>> are
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> filtering out extra _data_, you
>> have
>> > >> some
>> > >> > > > > extra
>> > >> > > > > > >>> _metadata_
>> > >> > > > > > >>> > > > > > that
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> you still wish to pass down
>> with the
>> > >> data when
>> > >> > > > > > >>> there is a
>> > >> > > > > > >>> > > > > > > >>> "real"
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> update, but you don't want the
>> > >> metadata
>> > >> > > > > itself to
>> > >> > > > > > >>> cause an
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> update.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>> Despite my lack of clarity, yes
>> > >> you've got it
>> > >> > > > > > >>> right ;) This
>> > >> > > > > > >>> > > > > > > >>>>>>>>> particular processor is the first
>> > >> stop for this
>> > >> > > > > > >>> data after
>> > >> > > > > > >>> > > > > > > >>> coming
>> > >> > > > > > >>> > > > > > > >>>>>>>>> in from external users, who often
>> > >> simply post
>> > >> > > > > the
>> > >> > > > > > >>> same content
>> > >> > > > > > >>> > > > > > > >>> each
>> > >> > > > > > >>> > > > > > > >>>>>>>>> time and we're trying to shield
>> > >> downstream
>> > >> > > > > > >>> consumers from
>> > >> > > > > > >>> > > > > > > >>>>>>>>> unnecessary churn.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> It does seem handy to be able to
>> > >> plug in a
>> > >> > > > > custom
>> > >> > > > > > >>> > > > > > > >>> ChangeDetector
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> for this purpose, but I worry
>> about
>> > >> the API
>> > >> > > > > > >>> complexity. Maybe
>> > >> > > > > > >>> > > > > > > >>> you
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> can help think though how to
>> provide
>> > >> the same
>> > >> > > > > > >>> benefit while
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> limiting user-facing complexity.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Here's some extra context to
>> > >> consider:
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> We currently don't make any
>> extra
>> > >> requirements
>> > >> > > > > > >>> about the
>> > >> > > > > > >>> > > > > > nature
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> of data that you can use in
>> Streams.
>> > >> For
>> > >> > > > > example,
>> > >> > > > > > >>> you don't
>> > >> > > > > > >>> > > > > > > >>> have
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> to implement hashCode and
>> equals, or
>> > >> > > > > compareTo,
>> > >> > > > > > >>> etc. With the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> current proposal, we can do an
>> > >> airtight
>> > >> > > > > > >>> comparison based only
>> > >> > > > > > >>> > > > > > > >>> on
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the serialized form of the
>> values,
>> > >> and we
>> > >> > > > > > >>> actually don't have
>> > >> > > > > > >>> > > > > > > >>> to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> deserialize the "prior" value
>> at all
>> > >> for a
>> > >> > > > > large
>> > >> > > > > > >>> number of
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> operations. Admitedly, if we
>> extend
>> > >> the
>> > >> > > > > proposal
>> > >> > > > > > >>> to include
>> > >> > > > > > >>> > > > > > > >>> no-op
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> detection for stateless
>> operations,
>> > >> we'd
>> > >> > > > > probably
>> > >> > > > > > >>> need to rely
>> > >> > > > > > >>> > > > > > > >>> on
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> equals() for no-op checking,
>> > >> otherwise we'd
>> > >> > > > > wind
>> > >> > > > > > >>> up requiring
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> serdes for stateless operations
>> as
>> > >> well.
>> > >> > > > > > >>> Actually, I'd
>> > >> > > > > > >>> > > > > > probably
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> argue for doing exactly that:
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> 1. In stateful operations, drop
>> if
>> > >> the
>> > >> > > > > serialized
>> > >> > > > > > >>> byte[]s are
>> > >> > > > > > >>> > > > > > > >>> the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> same. After deserializing, also
>> drop
>> > >> if the
>> > >> > > > > > >>> objects are equal
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> according to Object#equals().
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> 2. In stateless operations,
>> compare
>> > >> the "new"
>> > >> > > > > and
>> > >> > > > > > >>> "old" values
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> (if "old" is available) based on
>> > >> > > > > Object#equals().
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> 3. As a final optimization,
>> after
>> > >> serializing
>> > >> > > > > and
>> > >> > > > > > >>> before
>> > >> > > > > > >>> > > > > > > >>> sending
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> repartition records, compare the
>> > >> serialized
>> > >> > > > > data
>> > >> > > > > > >>> and drop
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> no-ops.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> This way, we still don't have to
>> > >> rely on the
>> > >> > > > > > >>> existence of an
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> equals() method, but if it is
>> there,
>> > >> we can
>> > >> > > > > > >>> benefit from it.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Also, we don't require a serde
>> in
>> > >> any new
>> > >> > > > > > >>> situations, but we
>> > >> > > > > > >>> > > > > > > >>> can
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> still leverage it when it is
>> > >> available.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> For clarity, in my example
>> above,
>> > >> even if the
>> > >> > > > > > >>> employeeInfo and
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> employeePayroll and Result
>> records
>> > >> all have
>> > >> > > > > > >>> serdes, we need
>> > >> > > > > > >>> > > > > > the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> "name" field (presumably
>> String) and
>> > >> the
>> > >> > > > > "salary"
>> > >> > > > > > >>> field
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> (presumable a Double) to have
>> serdes
>> > >> as well
>> > >> > > > > in
>> > >> > > > > > >>> the naive
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation. But if we can
>> > >> leverage
>> > >> > > > > equals(),
>> > >> > > > > > >>> then the
>> > >> > > > > > >>> > > > > > > >>> "right
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> thing" happens automatically.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>> I still don't totally follow why
>> the
>> > >> individual
>> > >> > > > > > >>> components
>> > >> > > > > > >>> > > > > > > >>> (name,
>> > >> > > > > > >>> > > > > > > >>>>>>>>> salary) would have to have serdes
>> > >> here. If
>> > >> > > > > Result
>> > >> > > > > > >>> has one, we
>> > >> > > > > > >>> > > > > > > >>>>>>>>> compare bytes, and if Result
>> > >> additionally has
>> > >> > > > > an
>> > >> > > > > > >>> equals()
>> > >> > > > > > >>> > > > > > method
>> > >> > > > > > >>> > > > > > > >>>>>>>>> (which presumably includes equals
>> > >> comparisons
>> > >> > > > > on
>> > >> > > > > > >>> the
>> > >> > > > > > >>> > > > > > constituent
>> > >> > > > > > >>> > > > > > > >>>>>>>>> fields), have we not covered our
>> > >> bases?
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> This dovetails in with my
>> primary UX
>> > >> concern;
>> > >> > > > > > >>> where would the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> ChangeDetector actually be
>> > >> registered? None of
>> > >> > > > > > >>> the operators
>> > >> > > > > > >>> > > > > > in
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> my example have names or topics
>> or
>> > >> any other
>> > >> > > > > > >>> identifiable
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> characteristic that could be
>> passed
>> > >> to a
>> > >> > > > > > >>> ChangeDetector class
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> registered via config. You
>> could say
>> > >> that we
>> > >> > > > > make
>> > >> > > > > > >>> > > > > > > >>> ChangeDetector
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> an optional parameter to every
>> > >> operation in
>> > >> > > > > > >>> Streams, but this
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> seems to carry quite a bit of
>> mental
>> > >> burden
>> > >> > > > > with
>> > >> > > > > > >>> it. People
>> > >> > > > > > >>> > > > > > > >>> will
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> wonder what it's for and
>> whether or
>> > >> not they
>> > >> > > > > > >>> should be using
>> > >> > > > > > >>> > > > > > > >>> it.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> There would almost certainly be
>> a
>> > >> > > > > misconception
>> > >> > > > > > >>> that it's
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> preferable to implement it
>> always,
>> > >> which
>> > >> > > > > would be
>> > >> > > > > > >>> unfortunate.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Plus, to actually implment
>> metadata
>> > >> flowing
>> > >> > > > > > >>> through the
>> > >> > > > > > >>> > > > > > > >>> topology
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> as in your use case, you'd have
>> to
>> > >> do two
>> > >> > > > > things:
>> > >> > > > > > >>> 1. make sure
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> that all operations actually
>> > >> preserve the
>> > >> > > > > > >>> metadata alongside
>> > >> > > > > > >>> > > > > > > >>> the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> data (e.g., don't accidentally
>> add a
>> > >> mapValues
>> > >> > > > > > >>> like I did, or
>> > >> > > > > > >>> > > > > > > >>> you
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> drop the metadata). 2.
>> implement a
>> > >> > > > > ChangeDetector
>> > >> > > > > > >>> for every
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> single operation in the
>> topology, or
>> > >> you don't
>> > >> > > > > > >>> get the benefit
>> > >> > > > > > >>> > > > > > > >>> of
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> dropping non-changes internally
>> 2b.
>> > >> > > > > > >>> Alternatively, you could
>> > >> > > > > > >>> > > > > > > >>> just
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> add the ChangeDetector to one
>> > >> operation toward
>> > >> > > > > > >>> the end of the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> topology. This would not drop
>> > >> redundant
>> > >> > > > > > >>> computation
>> > >> > > > > > >>> > > > > > internally,
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> but only drop redundant
>> _outputs_.
>> > >> But this is
>> > >> > > > > > >>> just about the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> same as your current solution.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>> I definitely see your point
>> regarding
>> > >> > > > > > >>> configuration. I was
>> > >> > > > > > >>> > > > > > > >>>>>>>>> originally thinking about this
>> when
>> > >> the
>> > >> > > > > > >>> deduplication was going
>> > >> > > > > > >>> > > > > > > >>> to
>> > >> > > > > > >>> > > > > > > >>>>>>>>> be opt-in, and it seemed very
>> natural
>> > >> to say
>> > >> > > > > > >>> something like:
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> employeeInfo.join(employeePayroll,
>> > >> (info,
>> > >> > > > > payroll)
>> > >> > > > > > >>> -> new
>> > >> > > > > > >>> > > > > > > >>>>>>>>> Result(info.name(),
>> > >> payroll.salary()))
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> .suppress(duplicatesAccordingTo(someChangeDetector))
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>> Alternatively you can imagine a
>> > >> similar method
>> > >> > > > > > >>> being on
>> > >> > > > > > >>> > > > > > > >>>>>>>>> Materialized, though obviously
>> this
>> > >> makes less
>> > >> > > > > > >>> sense if we
>> > >> > > > > > >>> > > > > > don't
>> > >> > > > > > >>> > > > > > > >>>>>>>>> want to require materialization.
>> If
>> > >> we're now
>> > >> > > > > > >>> talking about
>> > >> > > > > > >>> > > > > > > >>>>>>>>> changing the default behavior
>> and not
>> > >> having
>> > >> > > > > any
>> > >> > > > > > >>> configuration
>> > >> > > > > > >>> > > > > > > >>>>>>>>> options, it's harder to find a
>> place
>> > >> for this.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> A final thought; if it really
>> is a
>> > >> metadata
>> > >> > > > > > >>> question, can we
>> > >> > > > > > >>> > > > > > > >>> just
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> plan to finish up the support
>> for
>> > >> headers in
>> > >> > > > > > >>> Streams? I.e.,
>> > >> > > > > > >>> > > > > > > >>> give
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> you a way to control the way
>> that
>> > >> headers flow
>> > >> > > > > > >>> through the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> topology? Then, we could treat
>> > >> headers the
>> > >> > > > > same
>> > >> > > > > > >>> way we treat
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamps in the no-op
>> checking...
>> > >> We
>> > >> > > > > completely
>> > >> > > > > > >>> ignore them
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> for the sake of comparison.
>> Thus,
>> > >> neither the
>> > >> > > > > > >>> timestamp nor
>> > >> > > > > > >>> > > > > > the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> headers would get updated in
>> > >> internal state
>> > >> > > > > or in
>> > >> > > > > > >>> downstream
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> views as long as the value
>> itself
>> > >> doesn't
>> > >> > > > > change.
>> > >> > > > > > >>> This seems
>> > >> > > > > > >>> > > > > > to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> give us a way to support your
>> use
>> > >> case without
>> > >> > > > > > >>> adding to the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> mental overhead of using
>> Streams for
>> > >> simple
>> > >> > > > > > >>> things.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>> Agree headers could be a decent
>> fit
>> > >> for this
>> > >> > > > > > >>> particular case
>> > >> > > > > > >>> > > > > > > >>>>>>>>> because it's mostly metadata,
>> though
>> > >> to be
>> > >> > > > > honest
>> > >> > > > > > >>> we haven't
>> > >> > > > > > >>> > > > > > > >>> looked
>> > >> > > > > > >>> > > > > > > >>>>>>>>> at headers much (mostly because,
>> and
>> > >> to your
>> > >> > > > > > >>> point, support
>> > >> > > > > > >>> > > > > > > >>> seems
>> > >> > > > > > >>> > > > > > > >>>>>>>>> to be lacking). I feel like there
>> > >> would be
>> > >> > > > > other
>> > >> > > > > > >>> cases where
>> > >> > > > > > >>> > > > > > > >>> this
>> > >> > > > > > >>> > > > > > > >>>>>>>>> feature could be valuable, but I
>> > >> admit I can't
>> > >> > > > > > >>> come up with
>> > >> > > > > > >>> > > > > > > >>>>>>>>> anything right this second.
>> Perhaps
>> > >> yuzhihong
>> > >> > > > > had
>> > >> > > > > > >>> an example in
>> > >> > > > > > >>> > > > > > > >>>>>>>>> mind?
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., simple things should be
>> easy,
>> > >> and
>> > >> > > > > complex
>> > >> > > > > > >>> things should
>> > >> > > > > > >>> > > > > > > >>> be
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> possible.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> What are your thoughts? Thanks,
>> -John
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Feb 3, 2020, at 07:19,
>> > >> Thomas Becker
>> > >> > > > > > >>> wrote:
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi John, Can you describe how
>> you'd
>> > >> use
>> > >> > > > > > >>> filtering/mapping to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate records? To give
>> some
>> > >> background
>> > >> > > > > on
>> > >> > > > > > >>> my suggestion
>> > >> > > > > > >>> > > > > > > >>> we
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> currently have a small stream
>> > >> processor that
>> > >> > > > > > >>> exists solely to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> deduplicate, which we do using a
>> > >> process that
>> > >> > > > > I
>> > >> > > > > > >>> assume would
>> > >> > > > > > >>> > > > > > be
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> similar to what would be done
>> here
>> > >> (with a
>> > >> > > > > store
>> > >> > > > > > >>> of keys and
>> > >> > > > > > >>> > > > > > > >>> hash
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> values). But the records we are
>> > >> deduplicating
>> > >> > > > > > >>> have some
>> > >> > > > > > >>> > > > > > > >>> metadata
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> fields (such as timestamps of
>> when
>> > >> the record
>> > >> > > > > was
>> > >> > > > > > >>> posted) that
>> > >> > > > > > >>> > > > > > > >>> we
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> don't consider semantically
>> > >> meaningful for
>> > >> > > > > > >>> downstream
>> > >> > > > > > >>> > > > > > > >>> consumers,
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> and therefore we also suppress
>> > >> updates that
>> > >> > > > > only
>> > >> > > > > > >>> touch those
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> fields.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> -Tommy
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, 2020-01-31 at 19:30
>> -0600,
>> > >> John
>> > >> > > > > Roesler
>> > >> > > > > > >>> wrote:
>> > >> > > > > > >>> > > > > > > >>> [EXTERNAL
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> EMAIL] Attention: This email was
>> > >> sent from
>> > >> > > > > > >>> outside TiVo. DO
>> > >> > > > > > >>> > > > > > NOT
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> CLICK any links or attachments
>> > >> unless you
>> > >> > > > > > >>> expected them.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hi Thomas and yuzhihong, That’s
>> an
>> > >> interesting
>> > >> > > > > > >>> idea. Can you
>> > >> > > > > > >>> > > > > > > >>> help
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> think of a use case that isn’t
>> also
>> > >> served by
>> > >> > > > > > >>> filtering or
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> mapping beforehand? Thanks for
>> > >> helping to
>> > >> > > > > design
>> > >> > > > > > >>> this feature!
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> -John On Fri, Jan 31, 2020, at
>> 18:56,
>> > >> > > > > > >>> yuzhih...@gmail.com
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:yuzhih...@gmail.com>
>> wrote:
>> > >> I think
>> > >> > > > > this
>> > >> > > > > > >>> is good
>> > >> > > > > > >>> > > > > > > >>> idea. On
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Jan 31, 2020, at 4:49 PM, Thomas
>> > >> Becker <
>> > >> > > > > > >>> > > > > > > >>> thomas.bec...@tivo.com
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:thomas.bec...@tivo.com
>> >>
>> > >> wrote: How
>> > >> > > > > do
>> > >> > > > > > >>> folks feel
>> > >> > > > > > >>> > > > > > > >>> about
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> allowing the mechanism by which
>> > >> no-ops are
>> > >> > > > > > >>> detected to be
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> pluggable? Meaning use something
>> > >> like a hash
>> > >> > > > > by
>> > >> > > > > > >>> default, but
>> > >> > > > > > >>> > > > > > > >>> you
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> could optionally provide an
>> > >> implementation of
>> > >> > > > > > >>> something to use
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> instead, like a ChangeDetector.
>> This
>> > >> could be
>> > >> > > > > > >>> useful for
>> > >> > > > > > >>> > > > > > > >>> example
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> to ignore changes to certain
>> fields,
>> > >> which may
>> > >> > > > > > >>> not be relevant
>> > >> > > > > > >>> > > > > > > >>> to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the operation being performed.
>> > >> > > > > > >>> > > > > > ________________________________
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> From: John Roesler <
>> > >> vvcep...@apache.org
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> <mailto:vvcep...@apache.org>>
>> Sent:
>> > >> Friday,
>> > >> > > > > > >>> January 31, 2020
>> > >> > > > > > >>> > > > > > > >>> 4:51
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> PM To: dev@kafka.apache.org
>> <mailto:
>> > >> > > > > > >>> dev@kafka.apache.org>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> <dev@kafka.apache.org <mailto:
>> > >> > > > > > >>> dev@kafka.apache.org>> Subject:
>> > >> > > > > > >>> > > > > > > >>> Re:
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> [KAFKA-557] Add emit on change
>> > >> support for
>> > >> > > > > Kafka
>> > >> > > > > > >>> Streams
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> [EXTERNAL EMAIL] Attention: This
>> > >> email was
>> > >> > > > > sent
>> > >> > > > > > >>> from outside
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo. DO NOT CLICK any links or
>> > >> attachments
>> > >> > > > > > >>> unless you
>> > >> > > > > > >>> > > > > > expected
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> them.
>> > >> ________________________________
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Hello all, Sorry for my
>> silence. It
>> > >> seems
>> > >> > > > > like we
>> > >> > > > > > >>> are getting
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> close to consensus. Hopefully,
>> we
>> > >> could move
>> > >> > > > > to a
>> > >> > > > > > >>> vote soon!
>> > >> > > > > > >>> > > > > > > >>> All
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> of the reasoning from Matthias
>> and
>> > >> Bruno
>> > >> > > > > around
>> > >> > > > > > >>> timestamp is
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> compelling. I would be strongly
>> in
>> > >> favor of
>> > >> > > > > > >>> stating a few
>> > >> > > > > > >>> > > > > > > >>> things
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> very clearly in the KIP: 1.
>> Streams
>> > >> will drop
>> > >> > > > > > >>> no-op updates
>> > >> > > > > > >>> > > > > > > >>> only
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> for KTable operations. That is,
>> we
>> > >> won't make
>> > >> > > > > any
>> > >> > > > > > >>> changes to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> KStream aggregations at the
>> moment.
>> > >> It does
>> > >> > > > > seem
>> > >> > > > > > >>> like we can
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> potentially revisit the time
>> > >> semantics of that
>> > >> > > > > > >>> operation in
>> > >> > > > > > >>> > > > > > the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> future, but we don't need to do
>> it
>> > >> now. On the
>> > >> > > > > > >>> other hand, the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> proposed semantics for KTable
>> > >> timestamps
>> > >> > > > > (marking
>> > >> > > > > > >>> the
>> > >> > > > > > >>> > > > > > beginning
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> of the validity of that record)
>> > >> makes sense to
>> > >> > > > > > >>> me. 2. Streams
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> will only drop no-op updates for
>> > >> _stateful_
>> > >> > > > > > >>> KTable operations.
>> > >> > > > > > >>> > > > > > > >>> We
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> don't want to add a hard
>> guarantee
>> > >> that
>> > >> > > > > Streams
>> > >> > > > > > >>> will _never_
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit a no-op table update
>> because it
>> > >> would
>> > >> > > > > > >>> require adding
>> > >> > > > > > >>> > > > > > state
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> to otherwise stateless
>> operations.
>> > >> If someone
>> > >> > > > > is
>> > >> > > > > > >>> really
>> > >> > > > > > >>> > > > > > > >>> concerned
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> about a particular stateless
>> > >> operation
>> > >> > > > > producing
>> > >> > > > > > >>> a lot of
>> > >> > > > > > >>> > > > > > no-op
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> results, all they have to do is
>> > >> materialize
>> > >> > > > > it,
>> > >> > > > > > >>> and Streams
>> > >> > > > > > >>> > > > > > > >>> would
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> automatically drop the no-ops.
>> > >> Additionally,
>> > >> > > > > I'm
>> > >> > > > > > >>> +1 on not
>> > >> > > > > > >>> > > > > > > >>> adding
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> an opt-out at this time.
>> Regarding
>> > >> the KIP
>> > >> > > > > > >>> itself, I would
>> > >> > > > > > >>> > > > > > > >>> clean
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> it up a bit before calling for a
>> > >> vote. There
>> > >> > > > > is a
>> > >> > > > > > >>> lot of
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> "discussion"-type language
>> there,
>> > >> which is
>> > >> > > > > very
>> > >> > > > > > >>> natural to
>> > >> > > > > > >>> > > > > > > >>> read,
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> but makes it a bit hard to see
>> what
>> > >> _exactly_
>> > >> > > > > the
>> > >> > > > > > >>> kip is
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> proposing. Richard, would you
>> mind
>> > >> just making
>> > >> > > > > > >>> the "proposed
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> behavior change" a simple and
>> > >> succinct list of
>> > >> > > > > > >>> bullet points?
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> I.e., please drop glue phrases
>> like
>> > >> "there has
>> > >> > > > > > >>> been some
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> discussion" or "possibly we
>> could do
>> > >> X". For
>> > >> > > > > the
>> > >> > > > > > >>> final version
>> > >> > > > > > >>> > > > > > > >>> of
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the KIP, it should just say,
>> > >> "Streams will do
>> > >> > > > > X,
>> > >> > > > > > >>> Streams will
>> > >> > > > > > >>> > > > > > > >>> do
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Y". Feel free to add an
>> elaboration
>> > >> section to
>> > >> > > > > > >>> explain more
>> > >> > > > > > >>> > > > > > > >>> about
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> what X and Y mean, but we don't
>> need
>> > >> to talk
>> > >> > > > > about
>> > >> > > > > > >>> > > > > > > >>> possibilities
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> or alternatives except in the
>> > >> "rejected
>> > >> > > > > > >>> alternatives" section.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Accordingly, can you also move
>> the
>> > >> options you
>> > >> > > > > > >>> presented in
>> > >> > > > > > >>> > > > > > the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> intro to the "rejected
>> alternatives"
>> > >> section
>> > >> > > > > and
>> > >> > > > > > >>> only mention
>> > >> > > > > > >>> > > > > > > >>> the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> final proposal itself? This just
>> > >> really helps
>> > >> > > > > > >>> reviewers to
>> > >> > > > > > >>> > > > > > know
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> what they are voting for, and it
>> > >> helps
>> > >> > > > > everyone
>> > >> > > > > > >>> after the fact
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> when they are trying to get
>> clarity
>> > >> on what
>> > >> > > > > > >>> exactly the
>> > >> > > > > > >>> > > > > > > >>> proposal
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> is, versus all the things it
>> could
>> > >> have been.
>> > >> > > > > > >>> Thanks, -John
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Mon, Jan 27, 2020, at 18:14,
>> > >> Richard Yu
>> > >> > > > > wrote:
>> > >> > > > > > >>> Hello to
>> > >> > > > > > >>> > > > > > all,
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> I've finished making some
>> initial
>> > >> > > > > modifications
>> > >> > > > > > >>> to the KIP. I
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> have decided to keep the
>> > >> implementation
>> > >> > > > > section
>> > >> > > > > > >>> in the KIP for
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> record-keeping purposes. For
>> now, we
>> > >> should
>> > >> > > > > focus
>> > >> > > > > > >>> on only the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> proposed behavior changes
>> instead.
>> > >> See if you
>> > >> > > > > > >>> have any
>> > >> > > > > > >>> > > > > > > >>> comments!
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Cheers, Richard On Sat, Jan 25,
>> 2020
>> > >> at 11:12
>> > >> > > > > AM
>> > >> > > > > > >>> Richard Yu
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> <yohan.richard...@gmail.com
>> <mailto:
>> > >> > > > > > >>> > > > > > yohan.richard...@gmail.com
>> > >> > > > > > >>> > > > > > > >>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> wrote: Hi all, Thanks for all
>> the
>> > >> discussion!
>> > >> > > > > > >>> @John and @Bruno
>> > >> > > > > > >>> > > > > > > >>> I
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> will survey other possible
>> systems
>> > >> and see
>> > >> > > > > what I
>> > >> > > > > > >>> can do. Just
>> > >> > > > > > >>> > > > > > > >>> a
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> question, by systems, I suppose
>> you
>> > >> would mean
>> > >> > > > > > >>> the pros and
>> > >> > > > > > >>> > > > > > > >>> cons
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> of different reporting
>> strategies?
>> > >> I'm not
>> > >> > > > > > >>> completely certain
>> > >> > > > > > >>> > > > > > > >>> on
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> this point, so it would be
>> great if
>> > >> you can
>> > >> > > > > > >>> clarify on that.
>> > >> > > > > > >>> > > > > > So
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> here's what I got from all the
>> > >> discussion so
>> > >> > > > > far:
>> > >> > > > > > >>> - Since both
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias and John seems to have
>> come
>> > >> to a
>> > >> > > > > > >>> consensus on this,
>> > >> > > > > > >>> > > > > > > >>> then
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> we will go for an all-round
>> > >> behavorial change
>> > >> > > > > for
>> > >> > > > > > >>> KTables.
>> > >> > > > > > >>> > > > > > > >>> After
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> some thought, I decided that for
>> > >> now, an
>> > >> > > > > opt-out
>> > >> > > > > > >>> config will
>> > >> > > > > > >>> > > > > > > >>> not
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> be added. As John have pointed
>> out,
>> > >> no-op
>> > >> > > > > changes
>> > >> > > > > > >>> tend to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> explode further down the
>> topology as
>> > >> they are
>> > >> > > > > > >>> forwarded to
>> > >> > > > > > >>> > > > > > more
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> and more processor nodes
>> downstream.
>> > >> - About
>> > >> > > > > > >>> using hash codes,
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> after some explanation from
>> John, it
>> > >> looks
>> > >> > > > > like
>> > >> > > > > > >>> hash codes
>> > >> > > > > > >>> > > > > > > >>> might
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> not be as ideal (for
>> > >> implementation). For
>> > >> > > > > now, we
>> > >> > > > > > >>> will omit
>> > >> > > > > > >>> > > > > > > >>> that
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> detail, and save it for the PR.
>> -
>> > >> @Bruno You
>> > >> > > > > do
>> > >> > > > > > >>> have valid
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> concerns. Though, I am not
>> > >> completely certain
>> > >> > > > > if
>> > >> > > > > > >>> we want to do
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change only for
>> materialized
>> > >> KTables.
>> > >> > > > > I
>> > >> > > > > > >>> will put it
>> > >> > > > > > >>> > > > > > > >>> down
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> in the KIP regardless. I will
>> do my
>> > >> best to
>> > >> > > > > > >>> address all points
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> raised so far on the discussion.
>> > >> Hope we could
>> > >> > > > > > >>> keep this
>> > >> > > > > > >>> > > > > > going!
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Best, Richard On Fri, Jan 24,
>> 2020
>> > >> at 6:07 PM
>> > >> > > > > > >>> Bruno Cadonna
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> <br...@confluent.io <mailto:
>> > >> > > > > br...@confluent.io>>
>> > >> > > > > > >>> wrote: Thank
>> > >> > > > > > >>> > > > > > > >>> you
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Matthias for the use cases!
>> Looking
>> > >> at both
>> > >> > > > > use
>> > >> > > > > > >>> cases, I think
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> you need to elaborate on them
>> in the
>> > >> KIP,
>> > >> > > > > > >>> Richard. Emit from
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> plain KTable: I agree with
>> Matthias
>> > >> that the
>> > >> > > > > > >>> lower timestamp
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> makes sense because it marks the
>> > >> start of the
>> > >> > > > > > >>> validity of the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> record. Idempotent records with
>> a
>> > >> higher
>> > >> > > > > > >>> timestamp can be
>> > >> > > > > > >>> > > > > > > >>> safely
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> ignored. A corner case that I
>> > >> discussed with
>> > >> > > > > > >>> Matthias offline
>> > >> > > > > > >>> > > > > > > >>> is
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> when we do not materialize a
>> KTable
>> > >> due to
>> > >> > > > > > >>> optimization. Then
>> > >> > > > > > >>> > > > > > > >>> we
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> cannot avoid the idempotent
>> records
>> > >> because
>> > >> > > > > we do
>> > >> > > > > > >>> not keep the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> first record with the lower
>> > >> timestamp to
>> > >> > > > > compare
>> > >> > > > > > >>> to. Emit from
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> KTable with aggregations: If we
>> > >> specify that
>> > >> > > > > an
>> > >> > > > > > >>> aggregation
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> result should have the highest
>> > >> timestamp of
>> > >> > > > > the
>> > >> > > > > > >>> records that
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> participated in the
>> aggregation, we
>> > >> cannot
>> > >> > > > > ignore
>> > >> > > > > > >>> any
>> > >> > > > > > >>> > > > > > > >>> idempotent
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> records. Admittedly, the result
>> of an
>> > >> > > > > aggregation
>> > >> > > > > > >>> usually
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> changes, but there are
>> aggregations
>> > >> where the
>> > >> > > > > > >>> result may not
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> change like min and max, or sum
>> when
>> > >> the
>> > >> > > > > incoming
>> > >> > > > > > >>> records have
>> > >> > > > > > >>> > > > > > > >>> a
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> value of zero. In those cases,
>> we
>> > >> could
>> > >> > > > > benefit
>> > >> > > > > > >>> of the emit on
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> change, but only if we define
>> the
>> > >> semantics
>> > >> > > > > of the
>> > >> > > > > > >>> > > > > > aggregations
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> to not use the highest
>> timestamp of
>> > >> the
>> > >> > > > > > >>> participating records
>> > >> > > > > > >>> > > > > > > >>> for
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the result. In Kafka Streams,
>> we do
>> > >> not have
>> > >> > > > > min,
>> > >> > > > > > >>> max, and sum
>> > >> > > > > > >>> > > > > > > >>> as
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> explicit aggregations, but we
>> need
>> > >> to provide
>> > >> > > > > an
>> > >> > > > > > >>> API to define
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> what timestamp should be used
>> for
>> > >> the result
>> > >> > > > > of
>> > >> > > > > > >>> an aggregation
>> > >> > > > > > >>> > > > > > > >>> if
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> we want to go down this path.
>> All of
>> > >> this does
>> > >> > > > > > >>> not block this
>> > >> > > > > > >>> > > > > > > >>> KIP
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> and I just wanted to put this
>> > >> aspects up for
>> > >> > > > > > >>> discussion. The
>> > >> > > > > > >>> > > > > > > >>> KIP
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> can limit itself to emit from
>> > >> materialized
>> > >> > > > > > >>> KTables. However,
>> > >> > > > > > >>> > > > > > > >>> the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> limits should be explicitly
>> stated
>> > >> in the KIP.
>> > >> > > > > > >>> Best, Bruno
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020 at 10:58 AM
>> > >> Matthias J.
>> > >> > > > > Sax
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> <matth...@confluent.io <mailto:
>> > >> > > > > > >>> matth...@confluent.io>> wrote:
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> IMHO, the question about
>> semantics
>> > >> depends on
>> > >> > > > > the
>> > >> > > > > > >>> use case, in
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> particular on the origin of a
>> > >> KTable. If
>> > >> > > > > there is
>> > >> > > > > > >>> a changlog
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> topic that one reads directly
>> into a
>> > >> KTable,
>> > >> > > > > > >>> emit-on-change
>> > >> > > > > > >>> > > > > > > >>> does
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually make sense, because the
>> > >> timestamp
>> > >> > > > > > >>> indicates _when_
>> > >> > > > > > >>> > > > > > the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> update was _effective_. For this
>> > >> case, it is
>> > >> > > > > > >>> semantically
>> > >> > > > > > >>> > > > > > sound
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> to _not_ update the timestamp
>> in the
>> > >> store,
>> > >> > > > > > >>> because the second
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> update is actually idempotent
>> and
>> > >> advancing
>> > >> > > > > the
>> > >> > > > > > >>> timestamp is
>> > >> > > > > > >>> > > > > > > >>> not
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> ideal (one could even consider
>> it to
>> > >> be wrong
>> > >> > > > > to
>> > >> > > > > > >>> advance the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp) because the "valid
>> time"
>> > >> of the
>> > >> > > > > record
>> > >> > > > > > >>> pair did not
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> change. This reasoning also
>> applies
>> > >> to
>> > >> > > > > > >>> KTable-KTable joins.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> However, if the KTable is the
>> result
>> > >> of an
>> > >> > > > > > >>> aggregation, I
>> > >> > > > > > >>> > > > > > think
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-update is more natural,
>> > >> because the
>> > >> > > > > > >>> timestamp reflects
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the _last_ time (ie, highest
>> > >> timestamp) of all
>> > >> > > > > > >>> input records
>> > >> > > > > > >>> > > > > > > >>> the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> contributed to the result.
>> Hence,
>> > >> updating the
>> > >> > > > > > >>> timestamp and
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> emitting a new record actually
>> > >> sounds correct
>> > >> > > > > to
>> > >> > > > > > >>> me. This
>> > >> > > > > > >>> > > > > > > >>> applies
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> to windowed and non-windowed
>> > >> aggregations
>> > >> > > > > IMHO.
>> > >> > > > > > >>> However,
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> considering the argument that
>> the
>> > >> timestamp
>> > >> > > > > > >>> should not be
>> > >> > > > > > >>> > > > > > > >>> update
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> in the first case in the store
>> to
>> > >> begin with,
>> > >> > > > > > >>> both cases are
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually the same, and both can
>> be
>> > >> modeled as
>> > >> > > > > > >>> emit-on-change:
>> > >> > > > > > >>> > > > > > > >>> if
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> a `table()` operator does not
>> update
>> > >> the
>> > >> > > > > > >>> timestamp if the
>> > >> > > > > > >>> > > > > > value
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> does not change, there is _no_
>> > >> change and thus
>> > >> > > > > > >>> nothing is
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> emitted. At the same time, if an
>> > >> aggregation
>> > >> > > > > > >>> operator does
>> > >> > > > > > >>> > > > > > > >>> update
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the timestamp (even if the value
>> > >> does not
>> > >> > > > > change)
>> > >> > > > > > >>> there _is_ a
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> change and we emit. Note that
>> > >> handling
>> > >> > > > > > >>> out-of-order data for
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> aggregations would also work
>> > >> seamlessly with
>> > >> > > > > this
>> > >> > > > > > >>> approach --
>> > >> > > > > > >>> > > > > > > >>> for
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> out-of-order records, the
>> timestamp
>> > >> does never
>> > >> > > > > > >>> change, and
>> > >> > > > > > >>> > > > > > > >>> thus,
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> we only emit if the result
>> itself
>> > >> changes.
>> > >> > > > > > >>> Therefore, I would
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> argue that we might not even
>> need
>> > >> any config,
>> > >> > > > > > >>> because the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> emit-on-change behavior is just
>> > >> correct and
>> > >> > > > > > >>> reduced the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> downstream load, while our
>> current
>> > >> behavior is
>> > >> > > > > > >>> not ideal (even
>> > >> > > > > > >>> > > > > > > >>> if
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> it's also correct). Thoughts?
>> > >> -Matthias On
>> > >> > > > > > >>> 1/24/20 9:37 AM,
>> > >> > > > > > >>> > > > > > > >>> John
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Roesler wrote: Hi Bruno, Thanks
>> for
>> > >> that
>> > >> > > > > idea. I
>> > >> > > > > > >>> hadn't
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> considered that option before,
>> and
>> > >> it does
>> > >> > > > > seem
>> > >> > > > > > >>> like that
>> > >> > > > > > >>> > > > > > would
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> be the right place to put it if
>> we
>> > >> think it
>> > >> > > > > might
>> > >> > > > > > >>> be
>> > >> > > > > > >>> > > > > > > >>> semantically
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> important to control on a
>> > >> table-by-table
>> > >> > > > > basis. I
>> > >> > > > > > >>> had been
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> thinking of it less
>> semantically and
>> > >> more
>> > >> > > > > > >>> practically. In the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> context of a large topology, or
>> more
>> > >> > > > > generally, a
>> > >> > > > > > >>> large
>> > >> > > > > > >>> > > > > > > >>> software
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> system that contains many
>> topologies
>> > >> and other
>> > >> > > > > > >>> event-driven
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> systems, each no-op result
>> becomes
>> > >> an input
>> > >> > > > > that
>> > >> > > > > > >>> is destined
>> > >> > > > > > >>> > > > > > to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> itself become a no-op result,
>> and so
>> > >> on, all
>> > >> > > > > the
>> > >> > > > > > >>> way through
>> > >> > > > > > >>> > > > > > > >>> the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> system. Thus, a single pointless
>> > >> processing
>> > >> > > > > > >>> result becomes
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> amplified into a large number of
>> > >> pointless
>> > >> > > > > > >>> computations, cache
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> perturbations, and network and
>> disk
>> > >> I/O
>> > >> > > > > > >>> operations. If you
>> > >> > > > > > >>> > > > > > also
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> consider operations with fan-out
>> > >> implications,
>> > >> > > > > > >>> like branching
>> > >> > > > > > >>> > > > > > > >>> or
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> foreign-key joins, the wasted
>> > >> resources are
>> > >> > > > > > >>> amplified not just
>> > >> > > > > > >>> > > > > > > >>> in
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> proportion to the size of the
>> > >> system, but the
>> > >> > > > > > >>> size of the
>> > >> > > > > > >>> > > > > > > >>> system
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> times the average fan-out (to
>> the
>> > >> power of the
>> > >> > > > > > >>> number of
>> > >> > > > > > >>> > > > > > > >>> fan-out
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> operations on the path(s)
>> through the
>> > >> > > > > system). In
>> > >> > > > > > >>> my time
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> operating such systems, I've
>> > >> observed these
>> > >> > > > > > >>> effects to be very
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> real, and actually, the system
>> and
>> > >> use case
>> > >> > > > > > >>> doesn't have to be
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> very large before the
>> amplification
>> > >> poses an
>> > >> > > > > > >>> existential
>> > >> > > > > > >>> > > > > > threat
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> to the system as a whole. This
>> is
>> > >> the basis
>> > >> > > > > of my
>> > >> > > > > > >>> advocating
>> > >> > > > > > >>> > > > > > > >>> for
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> a simple behavior change, rather
>> > >> than an
>> > >> > > > > opt-in
>> > >> > > > > > >>> config of any
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> kind. It seems like Streams
>> should
>> > >> "do the
>> > >> > > > > right
>> > >> > > > > > >>> thing" for
>> > >> > > > > > >>> > > > > > the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case. My theory
>> (which
>> > >> may be
>> > >> > > > > wrong)
>> > >> > > > > > >>> is that the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> majority use case is more like
>> > >> "relational
>> > >> > > > > > >>> queries" than "CEP
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> queries". Even if you were
>> doing some
>> > >> > > > > > >>> event-sensitive
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> computation, wouldn't you do
>> them as
>> > >> Stream
>> > >> > > > > > >>> operations (where
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> this feature is inapplicable
>> > >> anyway)? In
>> > >> > > > > keeping
>> > >> > > > > > >>> with the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> "practical" perspective, I
>> suggested
>> > >> the
>> > >> > > > > opt-out
>> > >> > > > > > >>> config only
>> > >> > > > > > >>> > > > > > in
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the (I think unlikely) event
>> that
>> > >> filtering
>> > >> > > > > out
>> > >> > > > > > >>> pointless
>> > >> > > > > > >>> > > > > > > >>> updates
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually harms performance. I'd
>> also
>> > >> be
>> > >> > > > > perfectly
>> > >> > > > > > >>> fine without
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the opt-out config. I really
>> think
>> > >> that
>> > >> > > > > (because
>> > >> > > > > > >>> of the
>> > >> > > > > > >>> > > > > > > >>> timestamp
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> semantics work already
>> underway),
>> > >> we're
>> > >> > > > > already
>> > >> > > > > > >>> pre-fetching
>> > >> > > > > > >>> > > > > > > >>> the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> prior result most of the time,
>> so
>> > >> there would
>> > >> > > > > > >>> actually be very
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> little extra I/O involved in
>> > >> implementing
>> > >> > > > > > >>> emit-on-change.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> However, we should consider
>> whether
>> > >> my
>> > >> > > > > experience
>> > >> > > > > > >>> is likely to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> be general. Do you have some use
>> > >> case in mind
>> > >> > > > > for
>> > >> > > > > > >>> which you'd
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> actually want some KTable
>> results to
>> > >> be
>> > >> > > > > > >>> emit-on-update for
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> semantic reasons? Thanks, -John
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 24, 2020, at 11:02,
>> > >> Bruno Cadonna
>> > >> > > > > > >>> wrote: Hi
>> > >> > > > > > >>> > > > > > > >>> Richard,
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Thank you for the KIP. I agree
>> with
>> > >> John that
>> > >> > > > > we
>> > >> > > > > > >>> should focus
>> > >> > > > > > >>> > > > > > > >>> on
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the interface and behavior
>> change in
>> > >> a KIP. We
>> > >> > > > > > >>> can discuss the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation later. I am also
>> +1
>> > >> for the
>> > >> > > > > > >>> survey. I had a
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> thought about this. Couldn't we
>> > >> consider
>> > >> > > > > > >>> emit-on-change to be
>> > >> > > > > > >>> > > > > > > >>> one
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> config of suppress (like
>> > >> `untilWindowCloses`)?
>> > >> > > > > > >>> What you
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> basically propose is to suppress
>> > >> updates if
>> > >> > > > > they
>> > >> > > > > > >>> do not change
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the result. Considering emit on
>> > >> change as a
>> > >> > > > > > >>> flavour of
>> > >> > > > > > >>> > > > > > suppress
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> would be more flexible because
>> it
>> > >> would
>> > >> > > > > specify
>> > >> > > > > > >>> the behavior
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> locally for a KTable instead of
>> > >> globally for
>> > >> > > > > all
>> > >> > > > > > >>> KTables.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Additionally, specifying the
>> > >> behavior in one
>> > >> > > > > > >>> place instead of
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> multiple places feels more
>> intuitive
>> > >> and
>> > >> > > > > > >>> consistent to me.
>> > >> > > > > > >>> > > > > > > >>> Best,
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Bruno On Fri, Jan 24, 2020 at
>> 7:49
>> > >> AM John
>> > >> > > > > Roesler
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> <vvcep...@apache.org <mailto:
>> > >> > > > > vvcep...@apache.org>>
>> > >> > > > > > >>> wrote: Hi
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Richard, Thanks for picking
>> this up!
>> > >> I know
>> > >> > > > > of at
>> > >> > > > > > >>> least one
>> > >> > > > > > >>> > > > > > > >>> large
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> community member for which this
>> > >> feature is
>> > >> > > > > > >>> absolutely
>> > >> > > > > > >>> > > > > > > >>> essential.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> If I understand your two
>> options, it
>> > >> seems
>> > >> > > > > like
>> > >> > > > > > >>> the proposal
>> > >> > > > > > >>> > > > > > is
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> to implement it as a behavior
>> change
>> > >> > > > > regardless,
>> > >> > > > > > >>> and the
>> > >> > > > > > >>> > > > > > > >>> question
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> is whether to provide an opt-out
>> > >> config or
>> > >> > > > > not.
>> > >> > > > > > >>> Given that any
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> implementation of this feature
>> would
>> > >> have some
>> > >> > > > > > >>> performance
>> > >> > > > > > >>> > > > > > > >>> impact
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> under some workloads, and also
>> that
>> > >> we don't
>> > >> > > > > know
>> > >> > > > > > >>> if anyone
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> really depends on
>> emit-on-update time
>> > >> > > > > semantics,
>> > >> > > > > > >>> it seems like
>> > >> > > > > > >>> > > > > > > >>> we
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> should propose to add an opt-out
>> > >> config. Can
>> > >> > > > > you
>> > >> > > > > > >>> update the
>> > >> > > > > > >>> > > > > > KIP
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> to mention the exact config key
>> and
>> > >> value(s)
>> > >> > > > > > >>> you'd propose?
>> > >> > > > > > >>> > > > > > > >>> Just
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> to move the discussion forward,
>> maybe
>> > >> > > > > something
>> > >> > > > > > >>> like: emit.on
>> > >> > > > > > >>> > > > > > > >>> :=
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> change|update with the new
>> default
>> > >> being
>> > >> > > > > "change"
>> > >> > > > > > >>> Thanks for
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> pointing out the timestamp
>> issue in
>> > >> > > > > particular. I
>> > >> > > > > > >>> agree that
>> > >> > > > > > >>> > > > > > if
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> we discard the latter update as
>> a
>> > >> no-op, then
>> > >> > > > > we
>> > >> > > > > > >>> also have to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> discard its timestamp
>> (obviously, we
>> > >> don't
>> > >> > > > > > >>> forward the
>> > >> > > > > > >>> > > > > > > >>> timestamp
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> update, as that's the whole
>> point,
>> > >> but we also
>> > >> > > > > > >>> can't update
>> > >> > > > > > >>> > > > > > the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> timestamp in the store, as the
>> store
>> > >> must
>> > >> > > > > remain
>> > >> > > > > > >>> consistent
>> > >> > > > > > >>> > > > > > > >>> with
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> what has been emitted). I have
>> to
>> > >> confess
>> > >> > > > > that I
>> > >> > > > > > >>> disagree with
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> your implementation proposal,
>> but
>> > >> it's also
>> > >> > > > > not
>> > >> > > > > > >>> necessary to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> discuss implementation in the
>> KIP.
>> > >> Maybe it
>> > >> > > > > would
>> > >> > > > > > >>> be less
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> controversial if you just drop
>> that
>> > >> section
>> > >> > > > > for
>> > >> > > > > > >>> now, so that
>> > >> > > > > > >>> > > > > > > >>> the
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> KIP discussion can focus on the
>> > >> behavior
>> > >> > > > > change
>> > >> > > > > > >>> and config.
>> > >> > > > > > >>> > > > > > > >>> Just
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> for reference, there is some
>> > >> research into
>> > >> > > > > this
>> > >> > > > > > >>> domain. For
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> example, see the "Report"
>> section
>> > >> (3.2.3) of
>> > >> > > > > the
>> > >> > > > > > >>> SECRET paper:
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>>
>> > >> > > > >
>> > >>
>> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop
>> > >> > > > > > >>> > > > > > > > le.csail.mit.edu
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>>
>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data
>> > >> > > > > > >>> > > > > > > > =02%7C01%7CThomas.Becker%40tivo.com
>> > >> > > > > > >>> > > > > > > >>>>>>
>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > > > >>>
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>>
>> > >> > > > >
>> > >>
>> Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&amp;sdata
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>>
>> > >> =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&amp;reserved=0
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > > It might help to round out the proposal
>> if
>> > >> you take a
>> > >> > > > > brief
>> > >> > > > > > >>> > > > > > > >>> survey of
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> the behaviors of other systems,
>> > >> along with
>> > >> > > > > pros
>> > >> > > > > > >>> and cons if
>> > >> > > > > > >>> > > > > > any
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> are reported. Thanks, -John
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27,
>> > >> Richard Yu
>> > >> > > > > wrote:
>> > >> > > > > > >>> Hi
>> > >> > > > > > >>> > > > > > everybody!
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> I'd like to propose a change
>> that we
>> > >> probably
>> > >> > > > > > >>> should've added
>> > >> > > > > > >>> > > > > > > >>> for
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> a long time now. The key
>> benefit of
>> > >> this KIP
>> > >> > > > > > >>> would be reduced
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> traffic in Kafka Streams since
>> a lot
>> > >> of no-op
>> > >> > > > > > >>> results would no
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> longer be sent downstream. Here
>> is
>> > >> the KIP for
>> > >> > > > > > >>> reference.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>>
>> > >> > > > >
>> > >>
>> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi
>> > >> > > > > > >>> > > > > > > > ki.apache.org
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>>
>> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > > > >>>
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>>
>> > >> > > > >
>> > >>
>> %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThom
>> > >> > > > > > >>> > > > > > > > as.Becker%40tivo.com
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > > > >>>
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>>
>> > >> > > > >
>> > >>
>> 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&amp;sdata=zYpCSFOsyN4%2B
>> > >> > > > > > >>> > > > > > > >
>> > >> 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&amp;reserved=0
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > > Currently, I seek to formalize our
>> approach
>> > >> for this
>> > >> > > > > KIP
>> > >> > > > > > >>> first
>> > >> > > > > > >>> > > > > > > >>> before
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> we determine concrete API
>> additions /
>> > >> > > > > > >>> configurations. Some
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> configs might warrant adding,
>> whiles
>> > >> others
>> > >> > > > > are
>> > >> > > > > > >>> not necessary
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> since adding them would only
>> increase
>> > >> > > > > complexity
>> > >> > > > > > >>> of Kafka
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Streams. Cheers, Richard
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> ________________________________
>> > >> This email
>> > >> > > > > and
>> > >> > > > > > >>> any
>> > >> > > > > > >>> > > > > > attachments
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> may contain confidential and
>> > >> privileged
>> > >> > > > > material
>> > >> > > > > > >>> for the sole
>> > >> > > > > > >>> > > > > > > >>> use
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> of the intended recipient. Any
>> > >> review,
>> > >> > > > > copying, or
>> > >> > > > > > >>> > > > > > distribution
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> of this email (or any
>> attachments)
>> > >> by others
>> > >> > > > > is
>> > >> > > > > > >>> prohibited. If
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> you are not the intended
>> recipient,
>> > >> please
>> > >> > > > > > >>> contact the sender
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> immediately and permanently
>> delete
>> > >> this email
>> > >> > > > > and
>> > >> > > > > > >>> any
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> attachments. No employee or
>> agent of
>> > >> TiVo is
>> > >> > > > > > >>> authorized to
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> conclude any binding agreement
>> on
>> > >> behalf of
>> > >> > > > > TiVo
>> > >> > > > > > >>> by email.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Binding agreements with TiVo may
>> > >> only be made
>> > >> > > > > by
>> > >> > > > > > >>> a signed
>> > >> > > > > > >>> > > > > > > >>> written
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> agreement. -- *Tommy Becker*
>> > >> *Principal
>> > >> > > > > Engineer *
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> *Personalized Content Discovery*
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> *O* +1 919.460.4747 *tivo.com*
>> <
>> > >> > > > > > >>> http://www.tivo.com/>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> This email and any attachments
>> may
>> > >> contain
>> > >> > > > > > >>> confidential and
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> privileged material for the
>> sole use
>> > >> of the
>> > >> > > > > > >>> intended
>> > >> > > > > > >>> > > > > > recipient.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> Any review, copying, or
>> distribution
>> > >> of this
>> > >> > > > > > >>> email (or any
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> attachments) by others is
>> > >> prohibited. If you
>> > >> > > > > are
>> > >> > > > > > >>> not the
>> > >> > > > > > >>> > > > > > > >>> intended
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> recipient, please contact the
>> sender
>> > >> > > > > immediately
>> > >> > > > > > >>> and
>> > >> > > > > > >>> > > > > > > >>> permanently
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> delete this email and any
>> > >> attachments. No
>> > >> > > > > > >>> employee or agent of
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo is authorized to conclude
>> any
>> > >> binding
>> > >> > > > > > >>> agreement on behalf
>> > >> > > > > > >>> > > > > > > >>> of
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> TiVo by email. Binding
>> agreements
>> > >> with TiVo
>> > >> > > > > may
>> > >> > > > > > >>> only be made
>> > >> > > > > > >>> > > > > > > >>> by a
>> > >> > > > > > >>> > > > > > > >>>>>>>>>> signed written agreement.
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>> --
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>> *Tommy Becker* /Principal
>> Engineer /
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>> /Personalized Content Discovery/
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>> *O* +1 919.460.4747 *tivo.com* <
>> > >> > > > > > >>> http://www.tivo.com/>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>>
>> > >> > > > >
>> > >>
>> ----------------------------------------------------------------------
>> > >> > > > > > >>> > > > > > > >>>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>>
>> > >> > > > > > >>> > > > > > > >>>>>
>> > >> > > > > > >>> > > > > > > >>>>
>> > >> > > > > > >>> > > > > > > >>>
>> > >> > > > > > >>> > > > > > > >>
>> > >> > > > > > >>> > > > > > > >
>> > >> > > > > > >>> > > > > > >
>> > >> > > > > > >>> > > > > > >
>> > >> > > > > > >>> > > > > > > Attachments:
>> > >> > > > > > >>> > > > > > > * signature.asc
>> > >> > > > > > >>> > > > > >
>> > >> > > > > > >>> > > >
>> > >> > > > > > >>> >
>> > >> > > > > > >>>
>> > >> > > > > > >>
>> > >> > > > > >
>> > >> > > > >
>> > >> > >
>> > >> > >
>> > >>
>> > >
>> >
>>
>

Reply via email to