Hi all,

Bumping this. If you feel that this KIP is not too urgent. Then let me
know. :)

Cheers,
Richard

On Thu, Feb 6, 2020 at 4:55 PM Richard Yu <yohan.richard...@gmail.com>
wrote:

> Hi all,
>
> I've had just a few thoughts regarding the forwarding of <key,
> change<old_value, new_value>>. As Matthias already mentioned, there are two
> separate priorities by which we can judge this KIP:
>
> 1. A optimization perspective: In this case, the user would prefer the
> impact of this KIP to be as minimal as possible. By such logic, if
> stateless operations are performed twice, that could prove unacceptable for
> them. (since operations can prove expensive)
>
> 2. Semantics correctness perspective: Unlike the optimization approach, we
> are more concerned with all KTable operations obeying the same emission
> policy. i.e. emit on change. In this case, a discrepancy would not be
> tolerated, even though an extra performance cost will be incurred.
> Therefore, we will follow Matthias's approach, and then perform the
> operation once on the old value, and once on the new.
>
> The issue here I think is more black and white than in between. The second
> option in particular would be favorable for users with inexpensive
> stateless operations, while for the former option, we are probably dealing
> with more expensive ones. So the simplest solution is probably to allow the
> user to choose one of the behaviors, and have a config which can switch in
> between them.
>
> Its the simplest compromise I can come up with at the moment, but if you
> think you have a better plan which could better balance tradeoffs. Then
> please let us know. :)
>
> Best,
> Richard
>
> On Wed, Feb 5, 2020 at 5:12 PM John Roesler <vvcep...@apache.org> wrote:
>
>> Hi all,
>>
>> Thanks for the thoughtful comments!
>>
>> I need more time to reflect on your thoughts, but just wanted to offer
>> a quick clarification about equals().
>>
>> I only meant that we can't be sure if a class's equals() implementation
>> returns true for two semantically identical instances. I.e., if a class
>> doesn't
>> override the default equals() implementation, then we would see behavior
>> like:
>>
>> new MyPair("A", 1).equals(new MyPair("A", 1)) returns false
>>
>> In that case, I would still like to catch no-op updates by comparing the
>> serialized form of the records when we happen to have it serialized anyway
>> (such as when the operation is stateful, or when we're sending to a
>> repartition topic and we have both the "new" and "old" value from
>> upstream).
>>
>> I didn't mean to suggest we'd try to use reflection to detect whether
>> equals
>> is implemented, although that is a neat trick. I was thinking more of a
>> belt-and-suspenders algorithm where we do the check for no-ops based on
>> equals() and then _also_ check the serialized bytes for equality.
>>
>> Thanks,
>> -John
>>
>> On Wed, Feb 5, 2020, at 15:31, Ted Yu wrote:
>> > Thanks for the comments, Matthias.
>> >
>> > w.r.t. requirement of an `equals()` implementation, each template type
>> > would have an equals() method. We can use the following code to know
>> > whether it is provided by JVM or provided by user.
>> >
>> > boolean customEquals = false;
>> > try {
>> >     Class cls = value.getClass().getMethod("equals",
>> > Object.class).getDeclaringClass();
>> >     if (!Object.class.equals(cls)) {
>> >         customEquals = true;
>> >     }
>> > } catch (NoSuchMethodException nsme) {
>> >     // equals is always defined, this wouldn't hit
>> > }
>> >
>> > The next question is: what if the user doesn't provide equals() method ?
>> > Would we automatically fall back to emit-on-update ?
>> >
>> > Cheers
>> >
>> > On Tue, Feb 4, 2020 at 1:37 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >
>> > > -----BEGIN PGP SIGNED MESSAGE-----
>> > > Hash: SHA512
>> > >
>> > > First a high level comment:
>> > >
>> > > Overall, I would like to make one step back, and make sure we are
>> > > discussion on the same level. Originally, I understood this KIP as a
>> > > proposed change of _semantics_, however, given the latest discussion
>> > > it seems it's actually not -- it's more an _optimization_ proposal.
>> > > Hence, we only need to make sure that this optimization does not break
>> > > existing semantics. It this the right way to think about it?
>> > >
>> > > If yes, than it might actually be ok to have different behavior
>> > > depending if there is a materialized KTable or not. So far, we never
>> > > defined a public contract about our emit strategy and it seems this
>> > > KIP does not define one either.
>> > >
>> > > Hence, I don't have as strong of an opinion about sending oldValues
>> > > for example any longer. I guess the question is really, what can we
>> > > implement in a reasonable way.
>> > >
>> > >
>> > >
>> > > Other comments:
>> > >
>> > >
>> > > @Richard:
>> > >
>> > > Can you please add the KIP to the KIP overview table: It's missing
>> > > (
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro
>> > > posals).
>> > >
>> > >
>> > > @Bruno:
>> > >
>> > > You mentioned caching. I think it's irrelevant (orthogonal) and we can
>> > > discuss this KIP without considering it.
>> > >
>> > >
>> > > @John:
>> > >
>> > > > Even in the source table, we forward the updated record with the
>> > > > higher of the two timestamps. So the example is more like:
>> > >
>> > > That is not correct. Currently, we forward with the smaller
>> > > out-of-order timestamp (changing the timestamp would corrupt the data
>> > > - -- we don't know, because we don't check, if the value is the same
>> or
>> > > a different one, hence, we must emit the out-of-order record as-is).
>> > >
>> > > If we start to do emit-on-change, we also need to emit a new record if
>> > > the timestamp changes due to out-of-order data, hence, we would still
>> > > need to emit <K,V,T1> because that give us correct semantics: assume
>> > > you have a filter() and afterward use the filter KTable in a
>> > > stream-table join -- the lower T1 timestamp must be propagated to the
>> > > filtered KTable to ensure that that the stream-table join compute the
>> > > correct result.
>> > >
>> > >
>> > >
>> > > Your point about requiring an `equals()` implementation is actually a
>> > > quite interesting one and boils down to my statement from above about
>> > > "what can we actually implement". What I don't understand is:
>> > >
>> > > > This way, we still don't have to rely on the existence of an
>> > > > equals() method, but if it is there, we can benefit from it.
>> > >
>> > > Your bullet point (2) says it uses `equals()` -- hence, it seems we
>> > > actually to rely on it? Also, how can we detect if there is an
>> > > `equals()` method to do the comparison? Would be fail if we don't have
>> > > `equals()` nor corresponding serializes to do the comparison?
>> > >
>> > >
>> > >
>> > > > Wow, really good catch! Yes, we absolutely need metrics and logs if
>> > > > we're going to drop any records. And, yes, we should propose
>> > > > metrics and logs that are similar to the existing ones when we drop
>> > > > records for other reasons.
>> > >
>> > > I am not sure about this point. In fact, we have already some no-ops
>> > > in Kafka Streams in our join-operators and don't report any of those
>> > > either. Emit-on-change is operator semantics and I don't see why we
>> > > would need to have a metric for it? It seems to be quite different
>> > > compared to dropping late or malformed records.
>> > >
>> > >
>> > > - -Matthias
>> > >
>> > >
>> > >
>> > > On 2/4/20 7:13 AM, Thomas Becker wrote:
>> > > > Thanks John for your thoughtful reply. Some comments inline.
>> > > >
>> > > >
>> > > > On Mon, 2020-02-03 at 11:51 -0600, John Roesler wrote:
>> > > >> [EXTERNAL EMAIL] Attention: This email was sent from outside
>> > > >> TiVo. DO NOT CLICK any links or attachments unless you expected
>> > > >> them. ________________________________
>> > > >>
>> > > >>
>> > > >> Hi Tommy,
>> > > >>
>> > > >> Thanks for the context. I can see the attraction of considering
>> > > >> these use cases together.
>> > > >>
>> > > >> To answer your question, if a part of the record is not relevant
>> > > >> to downstream consumers, I was thinking you could just use a
>> > > >> mapValue to remove it.
>> > > >>
>> > > >> E.g., suppose you wanted to do a join between two tables.
>> > > >>
>> > > >> employeeInfo.join( employeePayroll, (info, payroll) -> new
>> > > >> Result(info.name(), payroll.salary()) )
>> > > >>
>> > > >> We only care about one attribute from the Info table (name), and
>> > > >> one from the Payroll table (salary), and these attributes change
>> > > >> rarely. On the other hand, there might be many other attributes
>> > > >> that change frequently of these tables. We can avoid triggering
>> > > >> the join unnecessarily by mapping the input tables to drop the
>> > > >> unnecessary information before the join:
>> > > >>
>> > > >> names = employeeInfo.mapValues(info -> info.name()) salaries =
>> > > >> employeePayroll.mapValues(payroll -> payroll.salary())
>> > > >>
>> > > >> names.join( salaries, (name, salary) -> new Result(name, salary)
>> > > >> )
>> > > >
>> > > > Ahh yes I see. This works, but in the case where you're using
>> > > > schemas as we are (e.g. Avro), it seems like this approach could
>> > > > lead to a proliferation of "skinny" record types that just drop
>> > > > various fields.
>> > > >
>> > > >>
>> > > >> Especially if we take Matthias's idea to drop non-changes even
>> > > >> for stateless operations, this would be quite efficient and is
>> > > >> also a very straightforward optimization to understand once you
>> > > >> know that Streams provides emit-on-change.
>> > > >>
>> > > >> From the context that you provided, it seems like a slightly
>> > > >> different situation, though. Reading between the lines a little,
>> > > >> it sounds like: in contrast to the example above, in which we are
>> > > >> filtering out extra _data_, you have some extra _metadata_ that
>> > > >> you still wish to pass down with the data when there is a "real"
>> > > >> update, but you don't want the metadata itself to cause an
>> > > >> update.
>> > > >
>> > > > Despite my lack of clarity, yes you've got it right ;) This
>> > > > particular processor is the first stop for this data after coming
>> > > > in from external users, who often simply post the same content each
>> > > > time and we're trying to shield downstream consumers from
>> > > > unnecessary churn.
>> > > >
>> > > >>
>> > > >> It does seem handy to be able to plug in a custom ChangeDetector
>> > > >> for this purpose, but I worry about the API complexity. Maybe you
>> > > >> can help think though how to provide the same benefit while
>> > > >> limiting user-facing complexity.
>> > > >>
>> > > >> Here's some extra context to consider:
>> > > >>
>> > > >> We currently don't make any extra requirements about the nature
>> > > >> of data that you can use in Streams. For example, you don't have
>> > > >> to implement hashCode and equals, or compareTo, etc. With the
>> > > >> current proposal, we can do an airtight comparison based only on
>> > > >> the serialized form of the values, and we actually don't have to
>> > > >> deserialize the "prior" value at all for a large number of
>> > > >> operations. Admitedly, if we extend the proposal to include no-op
>> > > >> detection for stateless operations, we'd probably need to rely on
>> > > >> equals() for no-op checking, otherwise we'd wind up requiring
>> > > >> serdes for stateless operations as well. Actually, I'd probably
>> > > >> argue for doing exactly that:
>> > > >>
>> > > >> 1. In stateful operations, drop if the serialized byte[]s are the
>> > > >> same. After deserializing, also drop if the objects are equal
>> > > >> according to Object#equals().
>> > > >>
>> > > >> 2. In stateless operations, compare the "new" and "old" values
>> > > >> (if "old" is available) based on Object#equals().
>> > > >>
>> > > >> 3. As a final optimization, after serializing and before sending
>> > > >> repartition records, compare the serialized data and drop
>> > > >> no-ops.
>> > > >>
>> > > >> This way, we still don't have to rely on the existence of an
>> > > >> equals() method, but if it is there, we can benefit from it.
>> > > >> Also, we don't require a serde in any new situations, but we can
>> > > >> still leverage it when it is available.
>> > > >>
>> > > >> For clarity, in my example above, even if the employeeInfo and
>> > > >> employeePayroll and Result records all have serdes, we need the
>> > > >> "name" field (presumably String) and the "salary" field
>> > > >> (presumable a Double) to have serdes as well in the naive
>> > > >> implementation. But if we can leverage equals(), then the "right
>> > > >> thing" happens automatically.
>> > > >
>> > > > I still don't totally follow why the individual components (name,
>> > > > salary) would have to have serdes here. If Result has one, we
>> > > > compare bytes, and if Result additionally has an equals() method
>> > > > (which presumably includes equals comparisons on the constituent
>> > > > fields), have we not covered our bases?
>> > > >
>> > > >>
>> > > >> This dovetails in with my primary UX concern; where would the
>> > > >> ChangeDetector actually be registered? None of the operators in
>> > > >> my example have names or topics or any other identifiable
>> > > >> characteristic that could be passed to a ChangeDetector class
>> > > >> registered via config. You could say that we make ChangeDetector
>> > > >> an optional parameter to every operation in Streams, but this
>> > > >> seems to carry quite a bit of mental burden with it. People will
>> > > >> wonder what it's for and whether or not they should be using it.
>> > > >> There would almost certainly be a misconception that it's
>> > > >> preferable to implement it always, which would be unfortunate.
>> > > >> Plus, to actually implment metadata flowing through the topology
>> > > >> as in your use case, you'd have to do two things: 1. make sure
>> > > >> that all operations actually preserve the metadata alongside the
>> > > >> data (e.g., don't accidentally add a mapValues like I did, or you
>> > > >> drop the metadata). 2. implement a ChangeDetector for every
>> > > >> single operation in the topology, or you don't get the benefit of
>> > > >> dropping non-changes internally 2b. Alternatively, you could just
>> > > >> add the ChangeDetector to one operation toward the end of the
>> > > >> topology. This would not drop redundant computation internally,
>> > > >> but only drop redundant _outputs_. But this is just about the
>> > > >> same as your current solution.
>> > > >
>> > > > I definitely see your point regarding configuration. I was
>> > > > originally thinking about this when the deduplication was going to
>> > > > be opt-in, and it seemed very natural to say something like:
>> > > >
>> > > > employeeInfo.join(employeePayroll, (info, payroll) -> new
>> > > > Result(info.name(), payroll.salary()))
>> > > > .suppress(duplicatesAccordingTo(someChangeDetector))
>> > > >
>> > > > Alternatively you can imagine a similar method being on
>> > > > Materialized, though obviously this makes less sense if we don't
>> > > > want to require materialization. If we're now talking about
>> > > > changing the default behavior and not having any configuration
>> > > > options, it's harder to find a place for this.
>> > > >
>> > > >
>> > > >
>> > > >> A final thought; if it really is a metadata question, can we just
>> > > >> plan to finish up the support for headers in Streams? I.e., give
>> > > >> you a way to control the way that headers flow through the
>> > > >> topology? Then, we could treat headers the same way we treat
>> > > >> timestamps in the no-op checking... We completely ignore them
>> > > >> for the sake of comparison. Thus, neither the timestamp nor the
>> > > >> headers would get updated in internal state or in downstream
>> > > >> views as long as the value itself doesn't change. This seems to
>> > > >> give us a way to support your use case without adding to the
>> > > >> mental overhead of using Streams for simple things.
>> > > >
>> > > > Agree headers could be a decent fit for this particular case
>> > > > because it's mostly metadata, though to be honest we haven't looked
>> > > > at headers much (mostly because, and to your point, support seems
>> > > > to be lacking). I feel like there would be other cases where this
>> > > > feature could be valuable, but I admit I can't come up with
>> > > > anything right this second. Perhaps yuzhihong had an example in
>> > > > mind?
>> > > >
>> > > >>
>> > > >> I.e., simple things should be easy, and complex things should be
>> > > >> possible.
>> > > >>
>> > > >> What are your thoughts? Thanks, -John
>> > > >>
>> > > >>
>> > > >> On Mon, Feb 3, 2020, at 07:19, Thomas Becker wrote:
>> > > >>
>> > > >> Hi John, Can you describe how you'd use filtering/mapping to
>> > > >> deduplicate records? To give some background on my suggestion we
>> > > >> currently have a small stream processor that exists solely to
>> > > >> deduplicate, which we do using a process that I assume would be
>> > > >> similar to what would be done here (with a store of keys and hash
>> > > >> values). But the records we are deduplicating have some metadata
>> > > >> fields (such as timestamps of when the record was posted) that we
>> > > >> don't consider semantically meaningful for downstream consumers,
>> > > >> and therefore we also suppress updates that only touch those
>> > > >> fields.
>> > > >>
>> > > >> -Tommy
>> > > >>
>> > > >>
>> > > >> On Fri, 2020-01-31 at 19:30 -0600, John Roesler wrote: [EXTERNAL
>> > > >> EMAIL] Attention: This email was sent from outside TiVo. DO NOT
>> > > >> CLICK any links or attachments unless you expected them.
>> > > >> ________________________________
>> > > >>
>> > > >> Hi Thomas and yuzhihong, That’s an interesting idea. Can you help
>> > > >> think of a use case that isn’t also served by filtering or
>> > > >> mapping beforehand? Thanks for helping to design this feature!
>> > > >> -John On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com
>> > > >> <mailto:yuzhih...@gmail.com> wrote: I think this is good idea. On
>> > > >> Jan 31, 2020, at 4:49 PM, Thomas Becker <thomas.bec...@tivo.com
>> > > >> <mailto:thomas.bec...@tivo.com>> wrote: How do folks feel about
>> > > >> allowing the mechanism by which no-ops are detected to be
>> > > >> pluggable? Meaning use something like a hash by default, but you
>> > > >> could optionally provide an implementation of something to use
>> > > >> instead, like a ChangeDetector. This could be useful for example
>> > > >> to ignore changes to certain fields, which may not be relevant to
>> > > >> the operation being performed. ________________________________
>> > > >> From: John Roesler <vvcep...@apache.org
>> > > >> <mailto:vvcep...@apache.org>> Sent: Friday, January 31, 2020 4:51
>> > > >> PM To: dev@kafka.apache.org <mailto:dev@kafka.apache.org>
>> > > >> <dev@kafka.apache.org <mailto:dev@kafka.apache.org>> Subject: Re:
>> > > >> [KAFKA-557] Add emit on change support for Kafka Streams
>> > > >> [EXTERNAL EMAIL] Attention: This email was sent from outside
>> > > >> TiVo. DO NOT CLICK any links or attachments unless you expected
>> > > >> them. ________________________________
>> > > >>
>> > > >> Hello all, Sorry for my silence. It seems like we are getting
>> > > >> close to consensus. Hopefully, we could move to a vote soon! All
>> > > >> of the reasoning from Matthias and Bruno around timestamp is
>> > > >> compelling. I would be strongly in favor of stating a few things
>> > > >> very clearly in the KIP: 1. Streams will drop no-op updates only
>> > > >> for KTable operations. That is, we won't make any changes to
>> > > >> KStream aggregations at the moment. It does seem like we can
>> > > >> potentially revisit the time semantics of that operation in the
>> > > >> future, but we don't need to do it now. On the other hand, the
>> > > >> proposed semantics for KTable timestamps (marking the beginning
>> > > >> of the validity of that record) makes sense to me. 2. Streams
>> > > >> will only drop no-op updates for _stateful_ KTable operations. We
>> > > >> don't want to add a hard guarantee that Streams will _never_
>> > > >> emit a no-op table update because it would require adding state
>> > > >> to otherwise stateless operations. If someone is really concerned
>> > > >> about a particular stateless operation producing a lot of no-op
>> > > >> results, all they have to do is materialize it, and Streams would
>> > > >> automatically drop the no-ops. Additionally, I'm +1 on not adding
>> > > >> an opt-out at this time. Regarding the KIP itself, I would clean
>> > > >> it up a bit before calling for a vote. There is a lot of
>> > > >> "discussion"-type language there, which is very natural to read,
>> > > >> but makes it a bit hard to see what _exactly_ the kip is
>> > > >> proposing. Richard, would you mind just making the "proposed
>> > > >> behavior change" a simple and succinct list of bullet points?
>> > > >> I.e., please drop glue phrases like "there has been some
>> > > >> discussion" or "possibly we could do X". For the final version of
>> > > >> the KIP, it should just say, "Streams will do X, Streams will do
>> > > >> Y". Feel free to add an elaboration section to explain more about
>> > > >> what X and Y mean, but we don't need to talk about possibilities
>> > > >> or alternatives except in the "rejected alternatives" section.
>> > > >> Accordingly, can you also move the options you presented in the
>> > > >> intro to the "rejected alternatives" section and only mention the
>> > > >> final proposal itself? This just really helps reviewers to know
>> > > >> what they are voting for, and it helps everyone after the fact
>> > > >> when they are trying to get clarity on what exactly the proposal
>> > > >> is, versus all the things it could have been. Thanks, -John
>> > > >>
>> > > >> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote: Hello to all,
>> > > >> I've finished making some initial modifications to the KIP. I
>> > > >> have decided to keep the implementation section in the KIP for
>> > > >> record-keeping purposes. For now, we should focus on only the
>> > > >> proposed behavior changes instead. See if you have any comments!
>> > > >> Cheers, Richard On Sat, Jan 25, 2020 at 11:12 AM Richard Yu
>> > > >> <yohan.richard...@gmail.com <mailto:yohan.richard...@gmail.com>>
>> > > >> wrote: Hi all, Thanks for all the discussion! @John and @Bruno I
>> > > >> will survey other possible systems and see what I can do. Just a
>> > > >> question, by systems, I suppose you would mean the pros and cons
>> > > >> of different reporting strategies? I'm not completely certain on
>> > > >> this point, so it would be great if you can clarify on that. So
>> > > >> here's what I got from all the discussion so far: - Since both
>> > > >> Matthias and John seems to have come to a consensus on this, then
>> > > >> we will go for an all-round behavorial change for KTables. After
>> > > >> some thought, I decided that for now, an opt-out config will not
>> > > >> be added. As John have pointed out, no-op changes tend to
>> > > >> explode further down the topology as they are forwarded to more
>> > > >> and more processor nodes downstream. - About using hash codes,
>> > > >> after some explanation from John, it looks like hash codes might
>> > > >> not be as ideal (for implementation). For now, we will omit that
>> > > >> detail, and save it for the PR. - @Bruno You do have valid
>> > > >> concerns. Though, I am not completely certain if we want to do
>> > > >> emit-on-change only for materialized KTables. I will put it down
>> > > >> in the KIP regardless. I will do my best to address all points
>> > > >> raised so far on the discussion. Hope we could keep this going!
>> > > >> Best, Richard On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna
>> > > >> <br...@confluent.io <mailto:br...@confluent.io>> wrote: Thank you
>> > > >> Matthias for the use cases! Looking at both use cases, I think
>> > > >> you need to elaborate on them in the KIP, Richard. Emit from
>> > > >> plain KTable: I agree with Matthias that the lower timestamp
>> > > >> makes sense because it marks the start of the validity of the
>> > > >> record. Idempotent records with a higher timestamp can be safely
>> > > >> ignored. A corner case that I discussed with Matthias offline is
>> > > >> when we do not materialize a KTable due to optimization. Then we
>> > > >> cannot avoid the idempotent records because we do not keep the
>> > > >> first record with the lower timestamp to compare to. Emit from
>> > > >> KTable with aggregations: If we specify that an aggregation
>> > > >> result should have the highest timestamp of the records that
>> > > >> participated in the aggregation, we cannot ignore any idempotent
>> > > >> records. Admittedly, the result of an aggregation usually
>> > > >> changes, but there are aggregations where the result may not
>> > > >> change like min and max, or sum when the incoming records have a
>> > > >> value of zero. In those cases, we could benefit of the emit on
>> > > >> change, but only if we define the semantics of the aggregations
>> > > >> to not use the highest timestamp of the participating records for
>> > > >> the result. In Kafka Streams, we do not have min, max, and sum as
>> > > >> explicit aggregations, but we need to provide an API to define
>> > > >> what timestamp should be used for the result of an aggregation if
>> > > >> we want to go down this path. All of this does not block this KIP
>> > > >> and I just wanted to put this aspects up for discussion. The KIP
>> > > >> can limit itself to emit from materialized KTables. However, the
>> > > >> limits should be explicitly stated in the KIP. Best, Bruno
>> > > >>
>> > > >>
>> > > >> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax
>> > > >> <matth...@confluent.io <mailto:matth...@confluent.io>> wrote:
>> > > >> IMHO, the question about semantics depends on the use case, in
>> > > >> particular on the origin of a KTable. If there is a changlog
>> > > >> topic that one reads directly into a KTable, emit-on-change does
>> > > >> actually make sense, because the timestamp indicates _when_ the
>> > > >> update was _effective_. For this case, it is semantically sound
>> > > >> to _not_ update the timestamp in the store, because the second
>> > > >> update is actually idempotent and advancing the timestamp is not
>> > > >> ideal (one could even consider it to be wrong to advance the
>> > > >> timestamp) because the "valid time" of the record pair did not
>> > > >> change. This reasoning also applies to KTable-KTable joins.
>> > > >> However, if the KTable is the result of an aggregation, I think
>> > > >> emit-on-update is more natural, because the timestamp reflects
>> > > >> the _last_ time (ie, highest timestamp) of all input records the
>> > > >> contributed to the result. Hence, updating the timestamp and
>> > > >> emitting a new record actually sounds correct to me. This applies
>> > > >> to windowed and non-windowed aggregations IMHO. However,
>> > > >> considering the argument that the timestamp should not be update
>> > > >> in the first case in the store to begin with, both cases are
>> > > >> actually the same, and both can be modeled as emit-on-change: if
>> > > >> a `table()` operator does not update the timestamp if the value
>> > > >> does not change, there is _no_ change and thus nothing is
>> > > >> emitted. At the same time, if an aggregation operator does update
>> > > >> the timestamp (even if the value does not change) there _is_ a
>> > > >> change and we emit. Note that handling out-of-order data for
>> > > >> aggregations would also work seamlessly with this approach -- for
>> > > >> out-of-order records, the timestamp does never change, and thus,
>> > > >> we only emit if the result itself changes. Therefore, I would
>> > > >> argue that we might not even need any config, because the
>> > > >> emit-on-change behavior is just correct and reduced the
>> > > >> downstream load, while our current behavior is not ideal (even if
>> > > >> it's also correct). Thoughts? -Matthias On 1/24/20 9:37 AM, John
>> > > >> Roesler wrote: Hi Bruno, Thanks for that idea. I hadn't
>> > > >> considered that option before, and it does seem like that would
>> > > >> be the right place to put it if we think it might be semantically
>> > > >> important to control on a table-by-table basis. I had been
>> > > >> thinking of it less semantically and more practically. In the
>> > > >> context of a large topology, or more generally, a large software
>> > > >> system that contains many topologies and other event-driven
>> > > >> systems, each no-op result becomes an input that is destined to
>> > > >> itself become a no-op result, and so on, all the way through the
>> > > >> system. Thus, a single pointless processing result becomes
>> > > >> amplified into a large number of pointless computations, cache
>> > > >> perturbations, and network and disk I/O operations. If you also
>> > > >> consider operations with fan-out implications, like branching or
>> > > >> foreign-key joins, the wasted resources are amplified not just in
>> > > >> proportion to the size of the system, but the size of the system
>> > > >> times the average fan-out (to the power of the number of fan-out
>> > > >> operations on the path(s) through the system). In my time
>> > > >> operating such systems, I've observed these effects to be very
>> > > >> real, and actually, the system and use case doesn't have to be
>> > > >> very large before the amplification poses an existential threat
>> > > >> to the system as a whole. This is the basis of my advocating for
>> > > >> a simple behavior change, rather than an opt-in config of any
>> > > >> kind. It seems like Streams should "do the right thing" for the
>> > > >> majority use case. My theory (which may be wrong) is that the
>> > > >> majority use case is more like "relational queries" than "CEP
>> > > >> queries". Even if you were doing some event-sensitive
>> > > >> computation, wouldn't you do them as Stream operations (where
>> > > >> this feature is inapplicable anyway)? In keeping with the
>> > > >> "practical" perspective, I suggested the opt-out config only in
>> > > >> the (I think unlikely) event that filtering out pointless updates
>> > > >> actually harms performance. I'd also be perfectly fine without
>> > > >> the opt-out config. I really think that (because of the timestamp
>> > > >> semantics work already underway), we're already pre-fetching the
>> > > >> prior result most of the time, so there would actually be very
>> > > >> little extra I/O involved in implementing emit-on-change.
>> > > >> However, we should consider whether my experience is likely to
>> > > >> be general. Do you have some use case in mind for which you'd
>> > > >> actually want some KTable results to be emit-on-update for
>> > > >> semantic reasons? Thanks, -John
>> > > >>
>> > > >> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote: Hi Richard,
>> > > >> Thank you for the KIP. I agree with John that we should focus on
>> > > >> the interface and behavior change in a KIP. We can discuss the
>> > > >> implementation later. I am also +1 for the survey. I had a
>> > > >> thought about this. Couldn't we consider emit-on-change to be one
>> > > >> config of suppress (like `untilWindowCloses`)? What you
>> > > >> basically propose is to suppress updates if they do not change
>> > > >> the result. Considering emit on change as a flavour of suppress
>> > > >> would be more flexible because it would specify the behavior
>> > > >> locally for a KTable instead of globally for all KTables.
>> > > >> Additionally, specifying the behavior in one place instead of
>> > > >> multiple places feels more intuitive and consistent to me. Best,
>> > > >> Bruno On Fri, Jan 24, 2020 at 7:49 AM John Roesler
>> > > >> <vvcep...@apache.org <mailto:vvcep...@apache.org>> wrote: Hi
>> > > >> Richard, Thanks for picking this up! I know of at least one large
>> > > >> community member for which this feature is absolutely essential.
>> > > >> If I understand your two options, it seems like the proposal is
>> > > >> to implement it as a behavior change regardless, and the question
>> > > >> is whether to provide an opt-out config or not. Given that any
>> > > >> implementation of this feature would have some performance impact
>> > > >> under some workloads, and also that we don't know if anyone
>> > > >> really depends on emit-on-update time semantics, it seems like we
>> > > >> should propose to add an opt-out config. Can you update the KIP
>> > > >> to mention the exact config key and value(s) you'd propose? Just
>> > > >> to move the discussion forward, maybe something like: emit.on :=
>> > > >> change|update with the new default being "change" Thanks for
>> > > >> pointing out the timestamp issue in particular. I agree that if
>> > > >> we discard the latter update as a no-op, then we also have to
>> > > >> discard its timestamp (obviously, we don't forward the timestamp
>> > > >> update, as that's the whole point, but we also can't update the
>> > > >> timestamp in the store, as the store must remain consistent with
>> > > >> what has been emitted). I have to confess that I disagree with
>> > > >> your implementation proposal, but it's also not necessary to
>> > > >> discuss implementation in the KIP. Maybe it would be less
>> > > >> controversial if you just drop that section for now, so that the
>> > > >> KIP discussion can focus on the behavior change and config. Just
>> > > >> for reference, there is some research into this domain. For
>> > > >> example, see the "Report" section (3.2.3) of the SECRET paper:
>> > > >>
>> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeop
>> > > le.csail.mit.edu
>> %2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data
>> > > =02%7C01%7CThomas.Becker%40tivo.com
>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7
>> > >
>> Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&amp;sdata
>> > > =4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&amp;reserved=0
>> > > >>
>> > > >>
>> > > It might help to round out the proposal if you take a brief survey of
>> > > >> the behaviors of other systems, along with pros and cons if any
>> > > >> are reported. Thanks, -John
>> > > >>
>> > > >> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote: Hi everybody!
>> > > >> I'd like to propose a change that we probably should've added for
>> > > >> a long time now. The key benefit of this KIP would be reduced
>> > > >> traffic in Kafka Streams since a lot of no-op results would no
>> > > >> longer be sent downstream. Here is the KIP for reference.
>> > > >>
>> > > >>
>> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwi
>> > > ki.apache.org
>> %2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit
>> > >
>> %2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThom
>> > > as.Becker%40tivo.com
>> %7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c
>> > >
>> 0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&amp;sdata=zYpCSFOsyN4%2B
>> > > 4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&amp;reserved=0
>> > > >>
>> > > >>
>> > > Currently, I seek to formalize our approach for this KIP first before
>> > > >> we determine concrete API additions / configurations. Some
>> > > >> configs might warrant adding, whiles others are not necessary
>> > > >> since adding them would only increase complexity of Kafka
>> > > >> Streams. Cheers, Richard
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >> ________________________________ This email and any attachments
>> > > >> may contain confidential and privileged material for the sole use
>> > > >> of the intended recipient. Any review, copying, or distribution
>> > > >> of this email (or any attachments) by others is prohibited. If
>> > > >> you are not the intended recipient, please contact the sender
>> > > >> immediately and permanently delete this email and any
>> > > >> attachments. No employee or agent of TiVo is authorized to
>> > > >> conclude any binding agreement on behalf of TiVo by email.
>> > > >> Binding agreements with TiVo may only be made by a signed written
>> > > >> agreement. -- *Tommy Becker* *Principal Engineer *
>> > > >>
>> > > >> *Personalized Content Discovery*
>> > > >>
>> > > >> *O* +1 919.460.4747 *tivo.com* <http://www.tivo.com/>
>> > > >>
>> > > >>
>> > > >>
>> > > >> This email and any attachments may contain confidential and
>> > > >> privileged material for the sole use of the intended recipient.
>> > > >> Any review, copying, or distribution of this email (or any
>> > > >> attachments) by others is prohibited. If you are not the intended
>> > > >> recipient, please contact the sender immediately and permanently
>> > > >> delete this email and any attachments. No employee or agent of
>> > > >> TiVo is authorized to conclude any binding agreement on behalf of
>> > > >> TiVo by email. Binding agreements with TiVo may only be made by a
>> > > >> signed written agreement.
>> > > >
>> > > > --
>> > > >
>> > > > *Tommy Becker* /Principal Engineer /
>> > > >
>> > > > /Personalized Content Discovery/
>> > > >
>> > > > *O* +1 919.460.4747 *tivo.com* <http://www.tivo.com/>
>> > > >
>> > > >
>> > > >
>> ----------------------------------------------------------------------
>> > > - --
>> > > >
>> > > >  This email and any attachments may contain confidential and
>> > > > privileged material for the sole use of the intended recipient. Any
>> > > > review, copying, or distribution of this email (or any attachments)
>> > > > by others is prohibited. If you are not the intended recipient,
>> > > > please contact the sender immediately and permanently delete this
>> > > > email and any attachments. No employee or agent of TiVo is
>> > > > authorized to conclude any binding agreement on behalf of TiVo by
>> > > > email. Binding agreements with TiVo may only be made by a signed
>> > > > written agreement.
>> > > -----BEGIN PGP SIGNATURE-----
>> > >
>> > > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl454+oACgkQO4miYXKq
>> > > /OjU9xAAgdnk1JIXS+cKSepgz95o04M57DkhDlINU6XB30gvXHDhs4Flh+Z36Jei
>> > > g+ch6QDbt0OSD5qkq/gJ4xkZmsS2odYFzkOq8A3ROfsY6delDw3KOpD7JTJiy0g+
>> > > TPUeFadzZFeh8t/+c2aIJq/HscWbsVwR5B4k/p85kpSDRkk8Hy3RwFF9/BB/yOss
>> > > Nmfs+JSe6xPiIQG8NwWLy+4yfQJ/j+r3JF6S9EbRtWIUUlIjSzhcCHraB6QbhObS
>> > > BYNtZEaGFcxuxwg45fywHo7Q5CyUNCulZ7NPzvTHxX1vuxQ6hHjcoEu4SU7gyP0B
>> > > 5f0f4DfGR7o5bz+E3Bu8Q6xYVDNo86bCp0/1R557R+eESbLIL5q8EAgVYE8JO+89
>> > > V3oVr1NiJ4slMQ5AZKNBke9J3IdUrDQCkB2i4w6FUkGtIb1XaEanX9ETg/4bWK/D
>> > > yb5UZH6tN50jFF/cTCoT39Wp6QdJnX2tKlgp9GT90dSG9ELJJNcFhzg/7+D0kVkt
>> > > VSNkg57NUg/KcIFhfT4/MXeuaawU7wYXD8a+OaJqBSapDc26oK9IExScltuY+PVX
>> > > ltp1pKvAibHLWDJaAeX61jN48ukpZxHFWgGaNs2wYmwR17xE4xVgTnBTfJOd+5qk
>> > > /J/Re36UlHgDGLPCXtrdrlGNhL/sn8zg2XaR3Bt9VBYHSVpL+H8=
>> > > =CzUe
>> > > -----END PGP SIGNATURE-----
>> > >
>> >
>>
>

Reply via email to