Hello to all,

I've finished making some initial modifications to the KIP.
I have decided to keep the implementation section in the KIP for
record-keeping purposes.

For now, we should focus on only the proposed behavior changes instead.

See if you have any comments!

Cheers,
Richard

On Sat, Jan 25, 2020 at 11:12 AM Richard Yu <yohan.richard...@gmail.com>
wrote:

> Hi all,
>
> Thanks for all the discussion!
>
> @John and @Bruno I will survey other possible systems and see what I can
> do.
> Just a question, by systems, I suppose you would mean the pros and cons of
> different reporting strategies?
>
> I'm not completely certain on this point, so it would be great if you can
> clarify on that.
>
> So here's what I got from all the discussion so far:
>
>    - Since both Matthias and John seems to have come to a consensus on
>    this, then we will go for an all-round behavorial change for KTables. After
>    some thought, I decided that for now, an opt-out config will not be added.
>    As John have pointed out, no-op changes tend to explode further down the
>    topology as they are forwarded to more and more processor nodes downstream.
>    - About using hash codes, after some explanation from John, it looks
>    like hash codes might not be as ideal (for implementation). For now, we
>    will omit that detail, and save it for the PR.
>    - @Bruno You do have valid concerns. Though, I am not completely
>    certain if we want to do emit-on-change only for materialized KTables. I
>    will put it down in the KIP regardless.
>
> I will do my best to address all points raised so far on the discussion.
> Hope we could keep this going!
>
> Best,
> Richard
>
> On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna <br...@confluent.io> wrote:
>
>> Thank you Matthias for the use cases!
>>
>> Looking at both use cases, I think you need to elaborate on them in
>> the KIP, Richard.
>>
>> Emit from plain KTable:
>> I agree with Matthias that the lower timestamp makes sense because it
>> marks the start of the validity of the record. Idempotent records with
>> a higher timestamp can be safely ignored. A corner case that I
>> discussed with Matthias offline is when we do not materialize a KTable
>> due to optimization. Then we cannot avoid the idempotent records
>> because we do not keep the first record with the lower timestamp to
>> compare to.
>>
>> Emit from KTable with aggregations:
>> If we specify that an aggregation result should have the highest
>> timestamp of the records that participated in the aggregation, we
>> cannot ignore any idempotent records. Admittedly, the result of an
>> aggregation usually changes, but there are aggregations where the
>> result may not change like min and max, or sum when the incoming
>> records have a value of zero. In those cases, we could benefit of the
>> emit on change, but only if we define the semantics of the
>> aggregations to not use the highest timestamp of the participating
>> records for the result. In Kafka Streams, we do not have min, max, and
>> sum as explicit aggregations, but we need to provide an API to define
>> what timestamp should be used for the result of an aggregation if we
>> want to go down this path.
>>
>> All of this does not block this KIP and I just wanted to put this
>> aspects up for discussion. The KIP can limit itself to emit from
>> materialized KTables. However, the limits should be explicitly stated
>> in the KIP.
>>
>> Best,
>> Bruno
>>
>>
>>
>> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax <matth...@confluent.io>
>> wrote:
>> >
>> > IMHO, the question about semantics depends on the use case, in
>> > particular on the origin of a KTable.
>> >
>> > If there is a changlog topic that one reads directly into a KTable,
>> > emit-on-change does actually make sense, because the timestamp indicates
>> > _when_ the update was _effective_. For this case, it is semantically
>> > sound to _not_ update the timestamp in the store, because the second
>> > update is actually idempotent and advancing the timestamp is not ideal
>> > (one could even consider it to be wrong to advance the timestamp)
>> > because the "valid time" of the record pair did not change.
>> >
>> > This reasoning also applies to KTable-KTable joins.
>> >
>> > However, if the KTable is the result of an aggregation, I think
>> > emit-on-update is more natural, because the timestamp reflects the
>> > _last_ time (ie, highest timestamp) of all input records the contributed
>> > to the result. Hence, updating the timestamp and emitting a new record
>> > actually sounds correct to me. This applies to windowed and non-windowed
>> > aggregations IMHO.
>> >
>> > However, considering the argument that the timestamp should not be
>> > update in the first case in the store to begin with, both cases are
>> > actually the same, and both can be modeled as emit-on-change: if a
>> > `table()` operator does not update the timestamp if the value does not
>> > change, there is _no_ change and thus nothing is emitted. At the same
>> > time, if an aggregation operator does update the timestamp (even if the
>> > value does not change) there _is_ a change and we emit.
>> >
>> > Note that handling out-of-order data for aggregations would also work
>> > seamlessly with this approach -- for out-of-order records, the timestamp
>> > does never change, and thus, we only emit if the result itself changes.
>> >
>> > Therefore, I would argue that we might not even need any config, because
>> > the emit-on-change behavior is just correct and reduced the downstream
>> > load, while our current behavior is not ideal (even if it's also
>> correct).
>> >
>> > Thoughts?
>> >
>> > -Matthias
>> >
>> > On 1/24/20 9:37 AM, John Roesler wrote:
>> > > Hi Bruno,
>> > >
>> > > Thanks for that idea. I hadn't considered that
>> > > option before, and it does seem like that would be
>> > > the right place to put it if we think it might be
>> > > semantically important to control on a
>> > > table-by-table basis.
>> > >
>> > > I had been thinking of it less semantically and
>> > > more practically. In the context of a large
>> > > topology, or more generally, a large software
>> > > system that contains many topologies and other
>> > > event-driven systems, each no-op result becomes an
>> > > input that is destined to itself become a no-op
>> > > result, and so on, all the way through the system.
>> > > Thus, a single pointless processing result becomes
>> > > amplified into a large number of pointless
>> > > computations, cache perturbations, and network
>> > > and disk I/O operations. If you also consider
>> > > operations with fan-out implications, like
>> > > branching or foreign-key joins, the wasted
>> > > resources are amplified not just in proportion to
>> > > the size of the system, but the size of the system
>> > > times the average fan-out (to the power of the
>> > > number of fan-out operations on the path(s)
>> > > through the system).
>> > >
>> > > In my time operating such systems, I've observed
>> > > these effects to be very real, and actually, the
>> > > system and use case doesn't have to be very large
>> > > before the amplification poses an existential
>> > > threat to the system as a whole.
>> > >
>> > > This is the basis of my advocating for a simple
>> > > behavior change, rather than an opt-in config of
>> > > any kind. It seems like Streams should "do the
>> > > right thing" for the majority use case. My theory
>> > > (which may be wrong) is that the majority use case
>> > > is more like "relational queries" than "CEP
>> > > queries". Even if you were doing some
>> > > event-sensitive computation, wouldn't you do them
>> > > as Stream operations (where this feature is
>> > > inapplicable anyway)?
>> > >
>> > > In keeping with the "practical" perspective, I
>> > > suggested the opt-out config only in the (I think
>> > > unlikely) event that filtering out pointless
>> > > updates actually harms performance. I'd also be
>> > > perfectly fine without the opt-out config. I
>> > > really think that (because of the timestamp
>> > > semantics work already underway), we're already
>> > > pre-fetching the prior result most of the time, so
>> > > there would actually be very little extra I/O
>> > > involved in implementing emit-on-change.
>> > >
>> > > However, we should consider whether my experience
>> > > is likely to be general. Do you have some use
>> > > case in mind for which you'd actually want some
>> > > KTable results to be emit-on-update for semantic
>> > > reasons?
>> > >
>> > > Thanks,
>> > > -John
>> > >
>> > >
>> > > On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote:
>> > >> Hi Richard,
>> > >>
>> > >> Thank you for the KIP.
>> > >>
>> > >> I agree with John that we should focus on the interface and behavior
>> > >> change in a KIP. We can discuss the implementation later.
>> > >>
>> > >> I am also +1 for the survey.
>> > >>
>> > >> I had a thought about this. Couldn't we consider emit-on-change to be
>> > >> one config of suppress (like `untilWindowCloses`)? What you basically
>> > >> propose is to suppress updates if they do not change the result.
>> > >> Considering emit on change as a flavour of suppress would be more
>> > >> flexible because it would specify the behavior locally for a KTable
>> > >> instead of globally for all KTables. Additionally, specifying the
>> > >> behavior in one place instead of multiple places feels more intuitive
>> > >> and consistent to me.
>> > >>
>> > >> Best,
>> > >> Bruno
>> > >>
>> > >> On Fri, Jan 24, 2020 at 7:49 AM John Roesler <vvcep...@apache.org>
>> wrote:
>> > >>>
>> > >>> Hi Richard,
>> > >>>
>> > >>> Thanks for picking this up! I know of at least one large community
>> member
>> > >>> for which this feature is absolutely essential.
>> > >>>
>> > >>> If I understand your two options, it seems like the proposal is to
>> implement
>> > >>> it as a behavior change regardless, and the question is whether to
>> provide
>> > >>> an opt-out config or not.
>> > >>>
>> > >>> Given that any implementation of this feature would have some
>> performance
>> > >>> impact under some workloads, and also that we don't know if anyone
>> really
>> > >>> depends on emit-on-update time semantics, it seems like we should
>> propose
>> > >>> to add an opt-out config. Can you update the KIP to mention the
>> exact
>> > >>> config key and value(s) you'd propose?
>> > >>>
>> > >>> Just to move the discussion forward, maybe something like:
>> > >>>     emit.on := change|update
>> > >>> with the new default being "change"
>> > >>>
>> > >>> Thanks for pointing out the timestamp issue in particular. I agree
>> that if
>> > >>> we discard the latter update as a no-op, then we also have to
>> discard its
>> > >>> timestamp (obviously, we don't forward the timestamp update, as
>> that's
>> > >>> the whole point, but we also can't update the timestamp in the
>> store, as
>> > >>> the store must remain consistent with what has been emitted).
>> > >>>
>> > >>> I have to confess that I disagree with your implementation
>> proposal, but
>> > >>> it's also not necessary to discuss implementation in the KIP. Maybe
>> it would
>> > >>> be less controversial if you just drop that section for now, so
>> that the KIP
>> > >>> discussion can focus on the behavior change and config.
>> > >>>
>> > >>> Just for reference, there is some research into this domain. For
>> example,
>> > >>> see the "Report" section (3.2.3) of the SECRET paper:
>> > >>>
>> http://people.csail.mit.edu/tatbul/publications/maxstream_vldb10.pdf
>> > >>>
>> > >>> It might help to round out the proposal if you take a brief survey
>> of the
>> > >>> behaviors of other systems, along with pros and cons if any are
>> reported.
>> > >>>
>> > >>> Thanks,
>> > >>> -John
>> > >>>
>> > >>>
>> > >>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote:
>> > >>>> Hi everybody!
>> > >>>>
>> > >>>> I'd like to propose a change that we probably should've added for
>> a long
>> > >>>> time now.
>> > >>>>
>> > >>>> The key benefit of this KIP would be reduced traffic in Kafka
>> Streams since
>> > >>>> a lot of no-op results would no longer be sent downstream.
>> > >>>> Here is the KIP for reference.
>> > >>>>
>> > >>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
>> > >>>>
>> > >>>> Currently, I seek to formalize our approach for this KIP first
>> before we
>> > >>>> determine concrete API additions / configurations.
>> > >>>> Some configs might warrant adding, whiles others are not necessary
>> since
>> > >>>> adding them would only increase complexity of Kafka Streams.
>> > >>>>
>> > >>>> Cheers,
>> > >>>> Richard
>> > >>>>
>> > >>
>> >
>>
>

Reply via email to