Thanks John for your thoughtful reply. Some comments inline.

On Mon, 2020-02-03 at 11:51 -0600, John Roesler wrote:

[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.

________________________________



Hi Tommy,


Thanks for the context. I can see the attraction of considering these use cases

together.


To answer your question, if a part of the record is not relevant to downstream

consumers, I was thinking you could just use a mapValue to remove it.


E.g., suppose you wanted to do a join between two tables.


employeeInfo.join(

  employeePayroll,

  (info, payroll) -> new Result(info.name(), payroll.salary())

)


We only care about one attribute from the Info table (name), and one from the

Payroll table (salary), and these attributes change rarely. On the other hand,

there might be many other attributes that change frequently of these tables. We

can avoid triggering the join unnecessarily by mapping the input tables to drop

the unnecessary information before the join:


names = employeeInfo.mapValues(info -> info.name())

salaries = employeePayroll.mapValues(payroll -> payroll.salary())


names.join(

  salaries,

  (name, salary) -> new Result(name, salary)

)

Ahh yes I see. This works, but in the case where you're using schemas as we are 
(e.g. Avro), it seems like this approach could lead to a proliferation of 
"skinny" record types that just drop various fields.



Especially if we take Matthias's idea to drop non-changes even for stateless

operations, this would be quite efficient and is also a very straightforward

optimization to understand once you know that Streams provides emit-on-change.


From the context that you provided, it seems like a slightly different

situation, though. Reading between the lines a little, it sounds like: in

contrast to the example above, in which we are filtering out extra _data_,

you have some extra _metadata_ that you still wish to pass down with the data

when there is a "real" update, but you don't want the metadata itself to cause

an update.

Despite my lack of clarity, yes you've got it right ;) This particular 
processor is the first stop for this data after coming in from external users, 
who often simply post the same content each time and we're trying to shield 
downstream consumers from unnecessary churn.



It does seem handy to be able to plug in a custom ChangeDetector for this

purpose, but I worry about the API complexity. Maybe you can help think though

how to provide the same benefit while limiting user-facing complexity.


Here's some extra context to consider:


We currently don't make any extra requirements about the nature of data that you

can use in Streams. For example, you don't have to implement hashCode and

equals, or compareTo, etc. With the current proposal, we can do an airtight

comparison based only on the serialized form of the values, and we actually

don't have to deserialize the "prior" value at all for a large number of

operations. Admitedly, if we extend the proposal to include no-op detection for

stateless operations, we'd probably need to rely on equals() for no-op checking,

otherwise we'd wind up requiring serdes for stateless operations as well.

Actually, I'd probably argue for doing exactly that:


 1. In stateful operations, drop if the serialized byte[]s are the same. After

 deserializing, also drop if the objects are equal according to Object#equals().


 2. In stateless operations, compare the "new" and "old" values (if "old" is

 available) based on Object#equals().


 3. As a final optimization, after serializing and before sending repartition

 records, compare the serialized data and drop no-ops.


This way, we still don't have to rely on the existence of an equals() method,

but if it is there, we can benefit from it. Also, we don't require a serde in

any new situations, but we can still leverage it when it is available.


For clarity, in my example above, even if the employeeInfo and employeePayroll

and Result records all have serdes, we need the "name" field (presumably String)

and the "salary" field (presumable a Double) to have serdes as well in the naive

implementation. But if we can leverage equals(), then the "right thing" happens

automatically.

I still don't totally follow why the individual components (name, salary) would 
have to have serdes here. If Result has one, we compare bytes, and if Result 
additionally has an equals() method (which presumably includes equals 
comparisons on the constituent fields), have we not covered our bases?



This dovetails in with my primary UX concern; where would the ChangeDetector

actually be registered? None of the operators in my example have names or

topics or any other identifiable characteristic that could be passed to a

ChangeDetector class registered via config. You could say that we make

ChangeDetector an optional parameter to every operation in Streams, but this

seems to carry quite a bit of mental burden with it. People will wonder what

it's for and whether or not they should be using it. There would almost

certainly be a misconception that it's preferable to implement it always, which

would be unfortunate. Plus, to actually implment metadata flowing through the

topology as in your use case, you'd have to do two things:

1. make sure that all operations actually preserve the metadata alongside the

data (e.g., don't accidentally add a mapValues like I did, or you drop the

metadata).

2. implement a ChangeDetector for every single operation in the topology, or you

don't get the benefit of dropping non-changes internally

2b. Alternatively, you could just add the ChangeDetector to one operation toward

the end of the topology. This would not drop redundant computation internally,

but only drop redundant _outputs_. But this is just about the same as your

current solution.


I definitely see your point regarding configuration. I was originally thinking 
about this when the deduplication was going to be opt-in, and it seemed very 
natural to say something like:

employeeInfo.join(employeePayroll, (info, payroll) -> new Result(info.name(), 
payroll.salary()))
.suppress(duplicatesAccordingTo(someChangeDetector))

Alternatively you can imagine a similar method being on Materialized, though 
obviously this makes less sense if we don't want to require materialization. If 
we're now talking about changing the default behavior and not having any 
configuration options, it's harder to find a place for this.



A final thought; if it really is a metadata question, can we just plan to finish

up the support for headers in Streams? I.e., give you a way to control the way

that headers flow through the topology? Then, we could treat headers the same

way we treat timestamps in the no-op checking... We completely ignore them for

the sake of comparison. Thus, neither the timestamp nor the headers would get

updated in internal state or in downstream views as long as the value itself

doesn't change. This seems to give us a way to support your use case without

adding to the mental overhead of using Streams for simple things.

Agree headers could be a decent fit for this particular case because it's 
mostly metadata, though to be honest we haven't looked at headers much (mostly 
because, and to your point, support seems to be lacking). I feel like there 
would be other cases where this feature could be valuable, but I admit I can't 
come up with anything right this second. Perhaps yuzhihong had an example in 
mind?


I.e., simple things should be easy, and complex things should be possible.

What are your thoughts?
Thanks,
-John


On Mon, Feb 3, 2020, at 07:19, Thomas Becker wrote:

Hi John,
Can you describe how you'd use filtering/mapping to deduplicate
records? To give some background on my suggestion we currently have a
small stream processor that exists solely to deduplicate, which we do
using a process that I assume would be similar to what would be done
here (with a store of keys and hash values). But the records we are
deduplicating have some metadata fields (such as timestamps of when the
record was posted) that we don't consider semantically meaningful for
downstream consumers, and therefore we also suppress updates that only
touch those fields.

-Tommy


On Fri, 2020-01-31 at 19:30 -0600, John Roesler wrote:
[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them. 
________________________________

Hi Thomas and yuzhihong,
That’s an interesting idea. Can you help think of a use case that isn’t also 
served by filtering or mapping beforehand?
Thanks for helping to design this feature! -John
On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com<mailto:yuzhih...@gmail.com> 
wrote: I think this is good idea.
On Jan 31, 2020, at 4:49 PM, Thomas Becker 
<thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>> wrote:
How do folks feel about allowing the mechanism by which no-ops are detected to 
be pluggable? Meaning use something like a hash by default, but you could 
optionally provide an implementation of something to use instead, like a 
ChangeDetector. This could be useful for example to ignore changes to certain 
fields, which may not be relevant to the operation being performed. 
________________________________ From: John Roesler 
<vvcep...@apache.org<mailto:vvcep...@apache.org>> Sent: Friday, January 31, 
2020 4:51 PM To: dev@kafka.apache.org<mailto:dev@kafka.apache.org> 
<dev@kafka.apache.org<mailto:dev@kafka.apache.org>> Subject: Re: [KAFKA-557] 
Add emit on change support for Kafka Streams
[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them. 
________________________________

Hello all,
Sorry for my silence. It seems like we are getting close to consensus. 
Hopefully, we could move to a vote soon!
All of the reasoning from Matthias and Bruno around timestamp is compelling. I 
would be strongly in favor of stating a few things very clearly in the KIP: 1. 
Streams will drop no-op updates only for KTable operations.
  That is, we won't make any changes to KStream aggregations at the moment. It 
does seem like we can potentially revisit the time semantics of that operation 
in the future, but we don't need to do it now.
  On the other hand, the proposed semantics for KTable timestamps (marking the 
beginning of the validity of that record) makes sense to me.
2. Streams will only drop no-op updates for _stateful_ KTable operations.
  We don't want to add a hard guarantee that Streams will _never_ emit a no-op 
table update because it would require adding state to otherwise stateless 
operations. If someone is really concerned about a particular stateless 
operation producing a lot of no-op results, all they have to do is materialize 
it, and Streams would automatically drop the no-ops.
Additionally, I'm +1 on not adding an opt-out at this time.
Regarding the KIP itself, I would clean it up a bit before calling for a vote. 
There is a lot of "discussion"-type language there, which is very natural to 
read, but makes it a bit hard to see what _exactly_ the kip is proposing.
Richard, would you mind just making the "proposed behavior change" a simple and 
succinct list of bullet points? I.e., please drop glue phrases like "there has 
been some discussion" or "possibly we could do X". For the final version of the 
KIP, it should just say, "Streams will do X, Streams will do Y". Feel free to 
add an elaboration section to explain more about what X and Y mean, but we 
don't need to talk about possibilities or alternatives except in the "rejected 
alternatives" section.
Accordingly, can you also move the options you presented in the intro to the 
"rejected alternatives" section and only mention the final proposal itself?
This just really helps reviewers to know what they are voting for, and it helps 
everyone after the fact when they are trying to get clarity on what exactly the 
proposal is, versus all the things it could have been.
Thanks, -John

On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote: Hello to all,
I've finished making some initial modifications to the KIP. I have decided to 
keep the implementation section in the KIP for record-keeping purposes.
For now, we should focus on only the proposed behavior changes instead.
See if you have any comments!
Cheers, Richard
On Sat, Jan 25, 2020 at 11:12 AM Richard Yu 
<yohan.richard...@gmail.com<mailto:yohan.richard...@gmail.com>> wrote:
Hi all,
Thanks for all the discussion!
@John and @Bruno I will survey other possible systems and see what I can do. 
Just a question, by systems, I suppose you would mean the pros and cons of 
different reporting strategies?
I'm not completely certain on this point, so it would be great if you can 
clarify on that.
So here's what I got from all the discussion so far:
  - Since both Matthias and John seems to have come to a consensus on this, 
then we will go for an all-round behavorial change for KTables. After some 
thought, I decided that for now, an opt-out config will not be added. As John 
have pointed out, no-op changes tend to explode further down the topology as 
they are forwarded to more and more processor nodes downstream. - About using 
hash codes, after some explanation from John, it looks like hash codes might 
not be as ideal (for implementation). For now, we will omit that detail, and 
save it for the PR. - @Bruno You do have valid concerns. Though, I am not 
completely certain if we want to do emit-on-change only for materialized 
KTables. I will put it down in the KIP regardless.
I will do my best to address all points raised so far on the discussion. Hope 
we could keep this going!
Best, Richard
On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna 
<br...@confluent.io<mailto:br...@confluent.io>> wrote:
Thank you Matthias for the use cases!
Looking at both use cases, I think you need to elaborate on them in the KIP, 
Richard.
Emit from plain KTable: I agree with Matthias that the lower timestamp makes 
sense because it marks the start of the validity of the record. Idempotent 
records with a higher timestamp can be safely ignored. A corner case that I 
discussed with Matthias offline is when we do not materialize a KTable due to 
optimization. Then we cannot avoid the idempotent records because we do not 
keep the first record with the lower timestamp to compare to.
Emit from KTable with aggregations: If we specify that an aggregation result 
should have the highest timestamp of the records that participated in the 
aggregation, we cannot ignore any idempotent records. Admittedly, the result of 
an aggregation usually changes, but there are aggregations where the result may 
not change like min and max, or sum when the incoming records have a value of 
zero. In those cases, we could benefit of the emit on change, but only if we 
define the semantics of the aggregations to not use the highest timestamp of 
the participating records for the result. In Kafka Streams, we do not have min, 
max, and sum as explicit aggregations, but we need to provide an API to define 
what timestamp should be used for the result of an aggregation if we want to go 
down this path.
All of this does not block this KIP and I just wanted to put this aspects up 
for discussion. The KIP can limit itself to emit from materialized KTables. 
However, the limits should be explicitly stated in the KIP.
Best, Bruno


On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io>> wrote:
IMHO, the question about semantics depends on the use case, in particular on 
the origin of a KTable.
If there is a changlog topic that one reads directly into a KTable, 
emit-on-change does actually make sense, because the timestamp indicates _when_ 
the update was _effective_. For this case, it is semantically sound to _not_ 
update the timestamp in the store, because the second update is actually 
idempotent and advancing the timestamp is not ideal (one could even consider it 
to be wrong to advance the timestamp) because the "valid time" of the record 
pair did not change.
This reasoning also applies to KTable-KTable joins.
However, if the KTable is the result of an aggregation, I think emit-on-update 
is more natural, because the timestamp reflects the _last_ time (ie, highest 
timestamp) of all input records the contributed to the result. Hence, updating 
the timestamp and emitting a new record actually sounds correct to me. This 
applies to windowed and non-windowed aggregations IMHO.
However, considering the argument that the timestamp should not be update in 
the first case in the store to begin with, both cases are actually the same, 
and both can be modeled as emit-on-change: if a `table()` operator does not 
update the timestamp if the value does not change, there is _no_ change and 
thus nothing is emitted. At the same time, if an aggregation operator does 
update the timestamp (even if the value does not change) there _is_ a change 
and we emit.
Note that handling out-of-order data for aggregations would also work 
seamlessly with this approach -- for out-of-order records, the timestamp does 
never change, and thus, we only emit if the result itself changes.
Therefore, I would argue that we might not even need any config, because the 
emit-on-change behavior is just correct and reduced the downstream load, while 
our current behavior is not ideal (even if it's also correct).
Thoughts?
-Matthias
On 1/24/20 9:37 AM, John Roesler wrote: Hi Bruno,
Thanks for that idea. I hadn't considered that option before, and it does seem 
like that would be the right place to put it if we think it might be 
semantically important to control on a table-by-table basis.
I had been thinking of it less semantically and more practically. In the 
context of a large topology, or more generally, a large software system that 
contains many topologies and other event-driven systems, each no-op result 
becomes an input that is destined to itself become a no-op result, and so on, 
all the way through the system. Thus, a single pointless processing result 
becomes amplified into a large number of pointless computations, cache 
perturbations, and network and disk I/O operations. If you also consider 
operations with fan-out implications, like branching or foreign-key joins, the 
wasted resources are amplified not just in proportion to the size of the 
system, but the size of the system times the average fan-out (to the power of 
the number of fan-out operations on the path(s) through the system).
In my time operating such systems, I've observed these effects to be very real, 
and actually, the system and use case doesn't have to be very large before the 
amplification poses an existential threat to the system as a whole.
This is the basis of my advocating for a simple behavior change, rather than an 
opt-in config of any kind. It seems like Streams should "do the right thing" 
for the majority use case. My theory (which may be wrong) is that the majority 
use case is more like "relational queries" than "CEP queries". Even if you were 
doing some event-sensitive computation, wouldn't you do them as Stream 
operations (where this feature is inapplicable anyway)?
In keeping with the "practical" perspective, I suggested the opt-out config 
only in the (I think unlikely) event that filtering out pointless updates 
actually harms performance. I'd also be perfectly fine without the opt-out 
config. I really think that (because of the timestamp semantics work already 
underway), we're already pre-fetching the prior result most of the time, so 
there would actually be very little extra I/O involved in implementing 
emit-on-change.
However, we should consider whether my experience is likely to be general. Do 
you have some use case in mind for which you'd actually want some KTable 
results to be emit-on-update for semantic reasons?
Thanks, -John

On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote: Hi Richard,
Thank you for the KIP.
I agree with John that we should focus on the interface and behavior change in 
a KIP. We can discuss the implementation later.
I am also +1 for the survey.
I had a thought about this. Couldn't we consider emit-on-change to be one 
config of suppress (like `untilWindowCloses`)? What you basically propose is to 
suppress updates if they do not change the result. Considering emit on change 
as a flavour of suppress would be more flexible because it would specify the 
behavior locally for a KTable instead of globally for all KTables. 
Additionally, specifying the behavior in one place instead of multiple places 
feels more intuitive and consistent to me.
Best, Bruno
On Fri, Jan 24, 2020 at 7:49 AM John Roesler 
<vvcep...@apache.org<mailto:vvcep...@apache.org>> wrote:
Hi Richard,
Thanks for picking this up! I know of at least one large community member for 
which this feature is absolutely essential.
If I understand your two options, it seems like the proposal is to implement it 
as a behavior change regardless, and the question is whether to provide an 
opt-out config or not.
Given that any implementation of this feature would have some performance 
impact under some workloads, and also that we don't know if anyone really 
depends on emit-on-update time semantics, it seems like we should propose to 
add an opt-out config. Can you update the KIP to mention the exact config key 
and value(s) you'd propose?
Just to move the discussion forward, maybe something like: emit.on := 
change|update with the new default being "change"
Thanks for pointing out the timestamp issue in particular. I agree that if we 
discard the latter update as a no-op, then we also have to discard its 
timestamp (obviously, we don't forward the timestamp update, as that's the 
whole point, but we also can't update the timestamp in the store, as the store 
must remain consistent with what has been emitted).
I have to confess that I disagree with your implementation proposal, but it's 
also not necessary to discuss implementation in the KIP. Maybe it would be less 
controversial if you just drop that section for now, so that the KIP discussion 
can focus on the behavior change and config.
Just for reference, there is some research into this domain. For example, see 
the "Report" section (3.2.3) of the SECRET paper:
https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeople.csail.mit.edu%2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data=02%7C01%7CThomas.Becker%40tivo.com%7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160859282&amp;sdata=4dSGIS8jNPAPP7B48r9e%2BUgFh3WdmzVyXhyT63eP8dI%3D&amp;reserved=0
It might help to round out the proposal if you take a brief survey of the 
behaviors of other systems, along with pros and cons if any are reported.
Thanks, -John

On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote: Hi everybody!
I'd like to propose a change that we probably should've added for a long time 
now.
The key benefit of this KIP would be reduced traffic in Kafka Streams since a 
lot of no-op results would no longer be sent downstream. Here is the KIP for 
reference.

https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit%2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThomas.Becker%40tivo.com%7Ce0235483b1eb4f259c5c08d7a8d1c16b%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637163491160869277&amp;sdata=zYpCSFOsyN4%2B4rKRZBQ%2FZvcGQ4EINR9Qm6PLsB7EKrc%3D&amp;reserved=0
Currently, I seek to formalize our approach for this KIP first before we 
determine concrete API additions / configurations. Some configs might warrant 
adding, whiles others are not necessary since adding them would only increase 
complexity of Kafka Streams.
Cheers, Richard






________________________________
This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.
--
*Tommy Becker*
*Principal Engineer *

*Personalized Content Discovery*

*O* +1 919.460.4747
*tivo.com* <http://www.tivo.com/>



 This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review,
copying, or distribution of this email (or any attachments) by others
is prohibited. If you are not the intended recipient, please contact
the sender immediately and permanently delete this email and any
attachments. No employee or agent of TiVo is authorized to conclude any
binding agreement on behalf of TiVo by email. Binding agreements with
TiVo may only be made by a signed written agreement.

--
[cid:aa0c30045ff771eb93e09a48a960221335bc5477.camel@tivo.com] Tommy Becker
Principal Engineer
Personalized Content Discovery
O +1 919.460.4747
tivo.com<http://www.tivo.com/>

________________________________

This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.

Reply via email to