w.r.t. new metric, there is already droppedRecordsSensor which logs:

"Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",

It seems we should introduce another metric which records the skipped
(duplicate) values.

This way, it is easier to observe the effect when this feature is in production.

Cheers


>
> ---------- Forwarded message ---------
> From: Richard Yu <yohan.richard...@gmail.com>
> Date: Sun, Feb 2, 2020 at 10:21 AM
> Subject: Re: [KAFKA-557] Add emit on change support for Kafka Streams
> To: <dev@kafka.apache.org>
>
>
> Hi Bruno,
>
> Thanks for the reply!
>
> I've included some basic description on the reporting strategies in the
> KIP (I might include more information on that later). I've also worked to
> add some more details on behavior changes as well as rejected alternatives.
> Hope it will help facilitate the process. :)
>
> I just want to add something on a relevant topic: we need metrics. I think
> this should also be included with this change for a number of reasons. For
> some users, they already know that their Streams application is
> experiencing a lot of no-op traffic. But that doesn't mean other users are
> aware of the same problem. Also, if we are dropping no-ops, then we might
> as well record exactly how many we have dropped out of how many total
> operations we've done. Therefore, I argue that we also include some metric
> which records this data and reports it to the user.
>
> Beyond that, let me know if we might need to address anything else. :)
>
> Cheers,
> Richard
>
>
>
> On Sun, Feb 2, 2020 at 3:57 AM Bruno Cadonna <br...@confluent.io> wrote:
>
>> Hi,
>>
>> Richard, thank you for the updated KIP.
>>
>> Regarding your question about the survey, IMO the survey should
>> contain a brief description of the emit (report) strategy of each
>> system and a list of pros and cons. I personally would be interested
>> what emit strategy Flink uses.
>>
>> I have a few comments about the KIP and its documentation:
>>
>> KIP-specific:
>>
>> 1. I agree with Matthias that we should also include aggregations
>> where neither the value nor the timestamp change.
>>
>> 2. Regarding Matthias' concerns about the dependency of the result of
>> a stateless operation on the materialization, I have two
>> questions/observations:
>> a) Is the result not already dependent on the materialization since in
>> case of materlized results the cache would not emit all records for
>> the same key downstream?
>> b) Emitting more records from a non-materialized operation should not
>> affect the semantics because we are emitting changelog records. The
>> next time these changelog records are materialized the result should
>> be correct. Right? However, I see the issue when a KTable is
>> transformed to a KStream with `toStream()`. The stream would then
>> differ depending on the materialization. But that seems to me an issue
>> that is not specific to the emit strategy and that we already have
>> when we use a cache, don't we? Is it even an issue?
>>
>> 3. With out-of-order records we would emit more records. Let's assume
>> the following records
>> K, V, T3
>> K, V, T1
>> K, V, T2
>> with T1 < T2 < T3
>>
>> A KTable that reads this records in this order, would emit (assuming no
>> cache)
>> K, V, T3
>> K, V, T1
>>
>> The record at T3 is emitted because it is the first.
>> The record at T1 is emitted because T1 < T3.
>> The record at T2 is not emitted because T2 >= T1
>> Correct?
>>
>> Richard, it would be good to add a section about out-of-order records
>> to the KIP.
>>
>>
>> Documentation-specific:
>>
>> 1. I agree with John on his feedback on the KIP document. It is really
>> important to clearly state what this KIP will improve and what not,
>> otherwise it becomes hard to vote on the KIP and to decide whether the
>> KIP is fully implemented or not.
>>
>> 2. Could you please already state in the "Motivation" section of the
>> KIP where you list the current emit strategies that the emit strategy
>> only applies to operations that involve a KTable? Probably for most it
>> will be clear what you mean, but IMO KIPs should be easily
>> approachable and it doesn't cost much to add this information.
>>
>> 3. Could you please list the rejected suppress extension in the
>> "Rejected Alternatives" section?
>>
>> 4. In the discussion about materializing results of stateless
>> operations, could you please add that those stateless operations are
>> on KTables? IMO adding this information makes the KIP easier
>> approachable by people that are not that familiar with the matter.
>> Best,
>> Bruno
>>
>> On Sat, Feb 1, 2020 at 11:33 PM Richard Yu <yohan.richard...@gmail.com>
>> wrote:
>> >
>> > Hi all,
>> >
>> > You all have good points!
>> >
>> > I took a look, and I thought it over. After some thinking, it appears
>> the
>> > main point of contention is whether or not we can support emit on change
>> > for stateless operations. I agree with John in that we probably should
>> > restrict ourselves to materialized KTables:
>> >
>> >    1. Loading any prior results would incur performance hits, regardless
>> >    how much one looks at it.
>> >    2. Matthias's approach does have its merits, but my primary concern
>> is
>> >    that we are effectively performing the same operation twice. And some
>> >    stateless operations can be quite expensive, so if performed twice,
>> can
>> >    incur considerable performance hits. The idea isn't bad, but it will
>> need
>> >    some work.
>> >    3. About stateless operations: Kafka Streams by design was never
>> *intended
>> >    *to have stateless operations load prior results. After all, it
>> appears
>> >    to me that only stateful operations should have that capability. If
>> we also
>> >    load prior results for stateless operations, then we will have
>> considerable
>> >    redundancy. (If we load something similar to hash codes, then that
>> is a
>> >    different matter, but we already covered that hash codes is
>> unreliable for
>> >    comparisons).
>> >
>> > Indeed, there is a discrepancy if stateless operations don't drop no-ops
>> > while stateful ones do. But I think that we shouldn't restrict
>> ourselves to
>> > such a perspective. After all, performance should be a priority over
>> > resolving such a discrepancy.
>> >
>> > This is just my point of view, but some of my points might need work.
>> Let
>> > me know what you think. :)
>> >
>> > Cheers,
>> > Richard
>> >
>> > On Fri, Jan 31, 2020 at 5:47 PM Matthias J. Sax <matth...@confluent.io>
>> > wrote:
>> >
>> > > I did not read the updated KIP itself yet. However, I do have concerns
>> > > about the idea to have different behavior for different operators.
>> > >
>> > >
>> > > (1) If there is a KStream aggregation, for which neither the
>> > > aggregation-value nor the result timestamp changes, there is no reason
>> > > to emit if we do emit-on-change semantics. Hence, why would be need to
>> > > stay on an emit-on-update model?
>> > >
>> > >
>> > > (2) If a KTable is materialized into a local state store or not, is
>> > > semantically irrelevant and an implementation detail IMHO. Hence, I
>> > > think we need to ensure that we have the same behavior for both cases:
>> > >
>> > > Example:
>> > >
>> > > stream.groupByKey()
>> > >       .count()
>> > >       .filter(...)
>> > >       .toStream().to(...);
>> > >
>> > > stream.grouyByKey()
>> > >       .count()
>> > >       .filter(..., Materialized.as("filted-table"))
>> > >       .toStream().to(...);
>> > >
>> > > It would be rather confusion for users if both would have a different
>> > > result.
>> > >
>> > > However, I actually believe we can achieve emit-on-change semantics
>> for
>> > > both cases. Note that internally, the output of `count()` is a
>> > > `<key,change<newValue,oldValue>>` changelog. Atm, we don't enable
>> "emit
>> > > old value" for all cases, but I think if we always enable it if there
>> is
>> > > no downstream state store, the downstream operator can actually
>> > > recompute its "current result" (that would otherwise be in the store)
>> > > based on the old value, the new result based on the new value, compare
>> > > old and new result and make the correct decision to emit or not.
>> > >
>> > > However, we should verify that this really works as expected before we
>> > > decide on this KIP.
>> > >
>> > >
>> > > (3) I think we also need to think a little bit about the handling of
>> > > out-of-order data. Atm, I don't see any issue in particular, but it
>> > > would be great if everybody could think about out-of-order handling
>> and
>> > > if/how it affects emit-on-change behavior. Also note, that KIP-280 is
>> > > allowing a timestamp-based compaction that might allow us to fix a
>> > > potential issue (in case there is one).
>> > >
>> > >
>> > > -Matthias
>> > >
>> > >
>> > > On 1/31/20 5:30 PM, John Roesler wrote:
>> > > > Hi Thomas and yuzhihong,
>> > > >
>> > > > That’s an interesting idea. Can you help think of a use case that
>> isn’t
>> > > also served by filtering or mapping beforehand?
>> > > >
>> > > > Thanks for helping to design this feature!
>> > > > -John
>> > > >
>> > > > On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com wrote:
>> > > >> I think this is good idea.
>> > > >>
>> > > >>> On Jan 31, 2020, at 4:49 PM, Thomas Becker <
>> thomas.bec...@tivo.com>
>> > > wrote:
>> > > >>>
>> > > >>> How do folks feel about allowing the mechanism by which no-ops
>> are
>> > > detected to be pluggable? Meaning use something like a hash by
>> default, but
>> > > you could optionally provide an implementation of something to use
>> instead,
>> > > like a ChangeDetector. This could be useful for example to ignore
>> changes
>> > > to certain fields, which may not be relevant to the operation being
>> > > performed.
>> > > >>> ________________________________
>> > > >>> From: John Roesler <vvcep...@apache.org>
>> > > >>> Sent: Friday, January 31, 2020 4:51 PM
>> > > >>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>> > > >>> Subject: Re: [KAFKA-557] Add emit on change support for Kafka
>> Streams
>> > > >>>
>> > > >>> [EXTERNAL EMAIL] Attention: This email was sent from outside
>> TiVo. DO
>> > > NOT CLICK any links or attachments unless you expected them.
>> > > >>> ________________________________
>> > > >>>
>> > > >>>
>> > > >>> Hello all,
>> > > >>>
>> > > >>> Sorry for my silence. It seems like we are getting close to
>> consensus.
>> > > >>> Hopefully, we could move to a vote soon!
>> > > >>>
>> > > >>> All of the reasoning from Matthias and Bruno around timestamp is
>> > > compelling. I
>> > > >>> would be strongly in favor of stating a few things very clearly
>> in the
>> > > KIP:
>> > > >>> 1. Streams will drop no-op updates only for KTable operations.
>> > > >>>
>> > > >>>   That is, we won't make any changes to KStream aggregations at
>> the
>> > > moment. It
>> > > >>>   does seem like we can potentially revisit the time semantics of
>> that
>> > > operation
>> > > >>>   in the future, but we don't need to do it now.
>> > > >>>
>> > > >>>   On the other hand, the proposed semantics for KTable timestamps
>> > > (marking the
>> > > >>>   beginning of the validity of that record) makes sense to me.
>> > > >>>
>> > > >>> 2. Streams will only drop no-op updates for _stateful_ KTable
>> > > operations.
>> > > >>>
>> > > >>>   We don't want to add a hard guarantee that Streams will _never_
>> emit
>> > > a no-op
>> > > >>>   table update because it would require adding state to otherwise
>> > > stateless
>> > > >>>   operations. If someone is really concerned about a particular
>> > > stateless
>> > > >>>   operation producing a lot of no-op results, all they have to do
>> is
>> > > >>>   materialize it, and Streams would automatically drop the no-ops.
>> > > >>>
>> > > >>> Additionally, I'm +1 on not adding an opt-out at this time.
>> > > >>>
>> > > >>> Regarding the KIP itself, I would clean it up a bit before
>> calling for
>> > > a vote.
>> > > >>> There is a lot of "discussion"-type language there, which is very
>> > > natural to
>> > > >>> read, but makes it a bit hard to see what _exactly_ the kip is
>> > > proposing.
>> > > >>>
>> > > >>> Richard, would you mind just making the "proposed behavior
>> change" a
>> > > simple and
>> > > >>> succinct list of bullet points? I.e., please drop glue phrases
>> like
>> > > "there has
>> > > >>> been some discussion" or "possibly we could do X". For the final
>> > > version of the
>> > > >>> KIP, it should just say, "Streams will do X, Streams will do Y".
>> Feel
>> > > free to
>> > > >>> add an elaboration section to explain more about what X and Y
>> mean,
>> > > but we don't
>> > > >>> need to talk about possibilities or alternatives except in the
>> > > "rejected
>> > > >>> alternatives" section.
>> > > >>>
>> > > >>> Accordingly, can you also move the options you presented in the
>> intro
>> > > to the
>> > > >>> "rejected alternatives" section and only mention the final
>> proposal
>> > > itself?
>> > > >>>
>> > > >>> This just really helps reviewers to know what they are voting
>> for, and
>> > > it helps
>> > > >>> everyone after the fact when they are trying to get clarity on
>> what
>> > > exactly the
>> > > >>> proposal is, versus all the things it could have been.
>> > > >>>
>> > > >>> Thanks,
>> > > >>> -John
>> > > >>>
>> > > >>>
>> > > >>>> On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote:
>> > > >>>> Hello to all,
>> > > >>>>
>> > > >>>> I've finished making some initial modifications to the KIP.
>> > > >>>> I have decided to keep the implementation section in the KIP for
>> > > >>>> record-keeping purposes.
>> > > >>>>
>> > > >>>> For now, we should focus on only the proposed behavior changes
>> > > instead.
>> > > >>>>
>> > > >>>> See if you have any comments!
>> > > >>>>
>> > > >>>> Cheers,
>> > > >>>> Richard
>> > > >>>>
>> > > >>>> On Sat, Jan 25, 2020 at 11:12 AM Richard Yu <
>> > > yohan.richard...@gmail.com>
>> > > >>>> wrote:
>> > > >>>>
>> > > >>>>> Hi all,
>> > > >>>>>
>> > > >>>>> Thanks for all the discussion!
>> > > >>>>>
>> > > >>>>> @John and @Bruno I will survey other possible systems and see
>> what I
>> > > can
>> > > >>>>> do.
>> > > >>>>> Just a question, by systems, I suppose you would mean the pros
>> and
>> > > cons of
>> > > >>>>> different reporting strategies?
>> > > >>>>>
>> > > >>>>> I'm not completely certain on this point, so it would be great
>> if
>> > > you can
>> > > >>>>> clarify on that.
>> > > >>>>>
>> > > >>>>> So here's what I got from all the discussion so far:
>> > > >>>>>
>> > > >>>>>   - Since both Matthias and John seems to have come to a
>> consensus on
>> > > >>>>>   this, then we will go for an all-round behavorial change for
>> > > KTables. After
>> > > >>>>>   some thought, I decided that for now, an opt-out config will
>> not
>> > > be added.
>> > > >>>>>   As John have pointed out, no-op changes tend to explode
>> further
>> > > down the
>> > > >>>>>   topology as they are forwarded to more and more processor
>> nodes
>> > > downstream.
>> > > >>>>>   - About using hash codes, after some explanation from John, it
>> > > looks
>> > > >>>>>   like hash codes might not be as ideal (for implementation).
>> For
>> > > now, we
>> > > >>>>>   will omit that detail, and save it for the PR.
>> > > >>>>>   - @Bruno You do have valid concerns. Though, I am not
>> completely
>> > > >>>>>   certain if we want to do emit-on-change only for materialized
>> > > KTables. I
>> > > >>>>>   will put it down in the KIP regardless.
>> > > >>>>>
>> > > >>>>> I will do my best to address all points raised so far on the
>> > > discussion.
>> > > >>>>> Hope we could keep this going!
>> > > >>>>>
>> > > >>>>> Best,
>> > > >>>>> Richard
>> > > >>>>>
>> > > >>>>>> On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna <
>> br...@confluent.io>
>> > > wrote:
>> > > >>>>>
>> > > >>>>>> Thank you Matthias for the use cases!
>> > > >>>>>>
>> > > >>>>>> Looking at both use cases, I think you need to elaborate on
>> them in
>> > > >>>>>> the KIP, Richard.
>> > > >>>>>>
>> > > >>>>>> Emit from plain KTable:
>> > > >>>>>> I agree with Matthias that the lower timestamp makes sense
>> because
>> > > it
>> > > >>>>>> marks the start of the validity of the record. Idempotent
>> records
>> > > with
>> > > >>>>>> a higher timestamp can be safely ignored. A corner case that I
>> > > >>>>>> discussed with Matthias offline is when we do not materialize a
>> > > KTable
>> > > >>>>>> due to optimization. Then we cannot avoid the idempotent
>> records
>> > > >>>>>> because we do not keep the first record with the lower
>> timestamp to
>> > > >>>>>> compare to.
>> > > >>>>>>
>> > > >>>>>> Emit from KTable with aggregations:
>> > > >>>>>> If we specify that an aggregation result should have the
>> highest
>> > > >>>>>> timestamp of the records that participated in the aggregation,
>> we
>> > > >>>>>> cannot ignore any idempotent records. Admittedly, the result
>> of an
>> > > >>>>>> aggregation usually changes, but there are aggregations where
>> the
>> > > >>>>>> result may not change like min and max, or sum when the
>> incoming
>> > > >>>>>> records have a value of zero. In those cases, we could benefit
>> of
>> > > the
>> > > >>>>>> emit on change, but only if we define the semantics of the
>> > > >>>>>> aggregations to not use the highest timestamp of the
>> participating
>> > > >>>>>> records for the result. In Kafka Streams, we do not have min,
>> max,
>> > > and
>> > > >>>>>> sum as explicit aggregations, but we need to provide an API to
>> > > define
>> > > >>>>>> what timestamp should be used for the result of an aggregation
>> if we
>> > > >>>>>> want to go down this path.
>> > > >>>>>>
>> > > >>>>>> All of this does not block this KIP and I just wanted to put
>> this
>> > > >>>>>> aspects up for discussion. The KIP can limit itself to emit
>> from
>> > > >>>>>> materialized KTables. However, the limits should be explicitly
>> > > stated
>> > > >>>>>> in the KIP.
>> > > >>>>>>
>> > > >>>>>> Best,
>> > > >>>>>> Bruno
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax <
>> > > matth...@confluent.io>
>> > > >>>>>> wrote:
>> > > >>>>>>>
>> > > >>>>>>> IMHO, the question about semantics depends on the use case, in
>> > > >>>>>>> particular on the origin of a KTable.
>> > > >>>>>>>
>> > > >>>>>>> If there is a changlog topic that one reads directly into a
>> KTable,
>> > > >>>>>>> emit-on-change does actually make sense, because the timestamp
>> > > indicates
>> > > >>>>>>> _when_ the update was _effective_. For this case, it is
>> > > semantically
>> > > >>>>>>> sound to _not_ update the timestamp in the store, because the
>> > > second
>> > > >>>>>>> update is actually idempotent and advancing the timestamp is
>> not
>> > > ideal
>> > > >>>>>>> (one could even consider it to be wrong to advance the
>> timestamp)
>> > > >>>>>>> because the "valid time" of the record pair did not change.
>> > > >>>>>>>
>> > > >>>>>>> This reasoning also applies to KTable-KTable joins.
>> > > >>>>>>>
>> > > >>>>>>> However, if the KTable is the result of an aggregation, I
>> think
>> > > >>>>>>> emit-on-update is more natural, because the timestamp
>> reflects the
>> > > >>>>>>> _last_ time (ie, highest timestamp) of all input records the
>> > > contributed
>> > > >>>>>>> to the result. Hence, updating the timestamp and emitting a
>> new
>> > > record
>> > > >>>>>>> actually sounds correct to me. This applies to windowed and
>> > > non-windowed
>> > > >>>>>>> aggregations IMHO.
>> > > >>>>>>>
>> > > >>>>>>> However, considering the argument that the timestamp should
>> not be
>> > > >>>>>>> update in the first case in the store to begin with, both
>> cases are
>> > > >>>>>>> actually the same, and both can be modeled as emit-on-change:
>> if a
>> > > >>>>>>> `table()` operator does not update the timestamp if the value
>> does
>> > > not
>> > > >>>>>>> change, there is _no_ change and thus nothing is emitted. At
>> the
>> > > same
>> > > >>>>>>> time, if an aggregation operator does update the timestamp
>> (even
>> > > if the
>> > > >>>>>>> value does not change) there _is_ a change and we emit.
>> > > >>>>>>>
>> > > >>>>>>> Note that handling out-of-order data for aggregations would
>> also
>> > > work
>> > > >>>>>>> seamlessly with this approach -- for out-of-order records, the
>> > > timestamp
>> > > >>>>>>> does never change, and thus, we only emit if the result itself
>> > > changes.
>> > > >>>>>>>
>> > > >>>>>>> Therefore, I would argue that we might not even need any
>> config,
>> > > because
>> > > >>>>>>> the emit-on-change behavior is just correct and reduced the
>> > > downstream
>> > > >>>>>>> load, while our current behavior is not ideal (even if it's
>> also
>> > > >>>>>> correct).
>> > > >>>>>>>
>> > > >>>>>>> Thoughts?
>> > > >>>>>>>
>> > > >>>>>>> -Matthias
>> > > >>>>>>>
>> > > >>>>>>> On 1/24/20 9:37 AM, John Roesler wrote:
>> > > >>>>>>>> Hi Bruno,
>> > > >>>>>>>>
>> > > >>>>>>>> Thanks for that idea. I hadn't considered that
>> > > >>>>>>>> option before, and it does seem like that would be
>> > > >>>>>>>> the right place to put it if we think it might be
>> > > >>>>>>>> semantically important to control on a
>> > > >>>>>>>> table-by-table basis.
>> > > >>>>>>>>
>> > > >>>>>>>> I had been thinking of it less semantically and
>> > > >>>>>>>> more practically. In the context of a large
>> > > >>>>>>>> topology, or more generally, a large software
>> > > >>>>>>>> system that contains many topologies and other
>> > > >>>>>>>> event-driven systems, each no-op result becomes an
>> > > >>>>>>>> input that is destined to itself become a no-op
>> > > >>>>>>>> result, and so on, all the way through the system.
>> > > >>>>>>>> Thus, a single pointless processing result becomes
>> > > >>>>>>>> amplified into a large number of pointless
>> > > >>>>>>>> computations, cache perturbations, and network
>> > > >>>>>>>> and disk I/O operations. If you also consider
>> > > >>>>>>>> operations with fan-out implications, like
>> > > >>>>>>>> branching or foreign-key joins, the wasted
>> > > >>>>>>>> resources are amplified not just in proportion to
>> > > >>>>>>>> the size of the system, but the size of the system
>> > > >>>>>>>> times the average fan-out (to the power of the
>> > > >>>>>>>> number of fan-out operations on the path(s)
>> > > >>>>>>>> through the system).
>> > > >>>>>>>>
>> > > >>>>>>>> In my time operating such systems, I've observed
>> > > >>>>>>>> these effects to be very real, and actually, the
>> > > >>>>>>>> system and use case doesn't have to be very large
>> > > >>>>>>>> before the amplification poses an existential
>> > > >>>>>>>> threat to the system as a whole.
>> > > >>>>>>>>
>> > > >>>>>>>> This is the basis of my advocating for a simple
>> > > >>>>>>>> behavior change, rather than an opt-in config of
>> > > >>>>>>>> any kind. It seems like Streams should "do the
>> > > >>>>>>>> right thing" for the majority use case. My theory
>> > > >>>>>>>> (which may be wrong) is that the majority use case
>> > > >>>>>>>> is more like "relational queries" than "CEP
>> > > >>>>>>>> queries". Even if you were doing some
>> > > >>>>>>>> event-sensitive computation, wouldn't you do them
>> > > >>>>>>>> as Stream operations (where this feature is
>> > > >>>>>>>> inapplicable anyway)?
>> > > >>>>>>>>
>> > > >>>>>>>> In keeping with the "practical" perspective, I
>> > > >>>>>>>> suggested the opt-out config only in the (I think
>> > > >>>>>>>> unlikely) event that filtering out pointless
>> > > >>>>>>>> updates actually harms performance. I'd also be
>> > > >>>>>>>> perfectly fine without the opt-out config. I
>> > > >>>>>>>> really think that (because of the timestamp
>> > > >>>>>>>> semantics work already underway), we're already
>> > > >>>>>>>> pre-fetching the prior result most of the time, so
>> > > >>>>>>>> there would actually be very little extra I/O
>> > > >>>>>>>> involved in implementing emit-on-change.
>> > > >>>>>>>>
>> > > >>>>>>>> However, we should consider whether my experience
>> > > >>>>>>>> is likely to be general. Do you have some use
>> > > >>>>>>>> case in mind for which you'd actually want some
>> > > >>>>>>>> KTable results to be emit-on-update for semantic
>> > > >>>>>>>> reasons?
>> > > >>>>>>>>
>> > > >>>>>>>> Thanks,
>> > > >>>>>>>> -John
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote:
>> > > >>>>>>>>> Hi Richard,
>> > > >>>>>>>>>
>> > > >>>>>>>>> Thank you for the KIP.
>> > > >>>>>>>>>
>> > > >>>>>>>>> I agree with John that we should focus on the interface and
>> > > behavior
>> > > >>>>>>>>> change in a KIP. We can discuss the implementation later.
>> > > >>>>>>>>>
>> > > >>>>>>>>> I am also +1 for the survey.
>> > > >>>>>>>>>
>> > > >>>>>>>>> I had a thought about this. Couldn't we consider
>> emit-on-change
>> > > to be
>> > > >>>>>>>>> one config of suppress (like `untilWindowCloses`)? What you
>> > > basically
>> > > >>>>>>>>> propose is to suppress updates if they do not change the
>> result.
>> > > >>>>>>>>> Considering emit on change as a flavour of suppress would
>> be more
>> > > >>>>>>>>> flexible because it would specify the behavior locally for a
>> > > KTable
>> > > >>>>>>>>> instead of globally for all KTables. Additionally,
>> specifying the
>> > > >>>>>>>>> behavior in one place instead of multiple places feels more
>> > > intuitive
>> > > >>>>>>>>> and consistent to me.
>> > > >>>>>>>>>
>> > > >>>>>>>>> Best,
>> > > >>>>>>>>> Bruno
>> > > >>>>>>>>>
>> > > >>>>>>>>> On Fri, Jan 24, 2020 at 7:49 AM John Roesler <
>> > > vvcep...@apache.org>
>> > > >>>>>> wrote:
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Hi Richard,
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Thanks for picking this up! I know of at least one large
>> > > community
>> > > >>>>>> member
>> > > >>>>>>>>>> for which this feature is absolutely essential.
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> If I understand your two options, it seems like the
>> proposal is
>> > > to
>> > > >>>>>> implement
>> > > >>>>>>>>>> it as a behavior change regardless, and the question is
>> whether
>> > > to
>> > > >>>>>> provide
>> > > >>>>>>>>>> an opt-out config or not.
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Given that any implementation of this feature would have
>> some
>> > > >>>>>> performance
>> > > >>>>>>>>>> impact under some workloads, and also that we don't know if
>> > > anyone
>> > > >>>>>> really
>> > > >>>>>>>>>> depends on emit-on-update time semantics, it seems like we
>> > > should
>> > > >>>>>> propose
>> > > >>>>>>>>>> to add an opt-out config. Can you update the KIP to
>> mention the
>> > > >>>>>> exact
>> > > >>>>>>>>>> config key and value(s) you'd propose?
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Just to move the discussion forward, maybe something like:
>> > > >>>>>>>>>>    emit.on := change|update
>> > > >>>>>>>>>> with the new default being "change"
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Thanks for pointing out the timestamp issue in particular.
>> I
>> > > agree
>> > > >>>>>> that if
>> > > >>>>>>>>>> we discard the latter update as a no-op, then we also have
>> to
>> > > >>>>>> discard its
>> > > >>>>>>>>>> timestamp (obviously, we don't forward the timestamp
>> update, as
>> > > >>>>>> that's
>> > > >>>>>>>>>> the whole point, but we also can't update the timestamp in
>> the
>> > > >>>>>> store, as
>> > > >>>>>>>>>> the store must remain consistent with what has been
>> emitted).
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> I have to confess that I disagree with your implementation
>> > > >>>>>> proposal, but
>> > > >>>>>>>>>> it's also not necessary to discuss implementation in the
>> KIP.
>> > > Maybe
>> > > >>>>>> it would
>> > > >>>>>>>>>> be less controversial if you just drop that section for
>> now, so
>> > > >>>>>> that the KIP
>> > > >>>>>>>>>> discussion can focus on the behavior change and config.
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Just for reference, there is some research into this
>> domain. For
>> > > >>>>>> example,
>> > > >>>>>>>>>> see the "Report" section (3.2.3) of the SECRET paper:
>> > > >>>>>>>>>>
>> > > >>>>>>
>> > >
>> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fpeople.csail.mit.edu%2Ftatbul%2Fpublications%2Fmaxstream_vldb10.pdf&amp;data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078978612&amp;sdata=yCdlYShUf2y3mqZQHA8ZGR83%2B99CZp%2B5r0HksqS%2B%2FPc%3D&amp;reserved=0
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> It might help to round out the proposal if you take a brief
>> > > survey
>> > > >>>>>> of the
>> > > >>>>>>>>>> behaviors of other systems, along with pros and cons if
>> any are
>> > > >>>>>> reported.
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Thanks,
>> > > >>>>>>>>>> -John
>> > > >>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote:
>> > > >>>>>>>>>>> Hi everybody!
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> I'd like to propose a change that we probably should've
>> added
>> > > for
>> > > >>>>>> a long
>> > > >>>>>>>>>>> time now.
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> The key benefit of this KIP would be reduced traffic in
>> Kafka
>> > > >>>>>> Streams since
>> > > >>>>>>>>>>> a lot of no-op results would no longer be sent downstream.
>> > > >>>>>>>>>>> Here is the KIP for reference.
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>
>> > >
>> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-557%253A%2BAdd%2Bemit%2Bon%2Bchange%2Bsupport%2Bfor%2BKafka%2BStreams&amp;data=02%7C01%7CThomas.Becker%40tivo.com%7C63670904ae324e62575508d7a697c3ad%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637161043078988604&amp;sdata=eVtuuyDX6aNsYcw8wOmM1HSinOq5ptPPUaTxXqgyA7g%3D&amp;reserved=0
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> Currently, I seek to formalize our approach for this KIP
>> first
>> > > >>>>>> before we
>> > > >>>>>>>>>>> determine concrete API additions / configurations.
>> > > >>>>>>>>>>> Some configs might warrant adding, whiles others are not
>> > > necessary
>> > > >>>>>> since
>> > > >>>>>>>>>>> adding them would only increase complexity of Kafka
>> Streams.
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> Cheers,
>> > > >>>>>>>>>>> Richard
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>> ________________________________
>> > > >>>
>> > > >>> This email and any attachments may contain confidential and
>> privileged
>> > > material for the sole use of the intended recipient. Any review,
>> copying,
>> > > or distribution of this email (or any attachments) by others is
>> prohibited.
>> > > If you are not the intended recipient, please contact the sender
>> > > immediately and permanently delete this email and any attachments. No
>> > > employee or agent of TiVo is authorized to conclude any binding
>> agreement
>> > > on behalf of TiVo by email. Binding agreements with TiVo may only be
>> made
>> > > by a signed written agreement.
>> > > >>
>> > >
>> > >
>>
>

Reply via email to