Hello to all, I've finished making some initial modifications to the KIP. I have decided to keep the implementation section in the KIP for record-keeping purposes.
For now, we should focus on only the proposed behavior changes instead. See if you have any comments! Cheers, Richard On Sat, Jan 25, 2020 at 11:12 AM Richard Yu <yohan.richard...@gmail.com> wrote: > Hi all, > > Thanks for all the discussion! > > @John and @Bruno I will survey other possible systems and see what I can > do. > Just a question, by systems, I suppose you would mean the pros and cons of > different reporting strategies? > > I'm not completely certain on this point, so it would be great if you can > clarify on that. > > So here's what I got from all the discussion so far: > > - Since both Matthias and John seems to have come to a consensus on > this, then we will go for an all-round behavorial change for KTables. After > some thought, I decided that for now, an opt-out config will not be added. > As John have pointed out, no-op changes tend to explode further down the > topology as they are forwarded to more and more processor nodes downstream. > - About using hash codes, after some explanation from John, it looks > like hash codes might not be as ideal (for implementation). For now, we > will omit that detail, and save it for the PR. > - @Bruno You do have valid concerns. Though, I am not completely > certain if we want to do emit-on-change only for materialized KTables. I > will put it down in the KIP regardless. > > I will do my best to address all points raised so far on the discussion. > Hope we could keep this going! > > Best, > Richard > > On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna <br...@confluent.io> wrote: > >> Thank you Matthias for the use cases! >> >> Looking at both use cases, I think you need to elaborate on them in >> the KIP, Richard. >> >> Emit from plain KTable: >> I agree with Matthias that the lower timestamp makes sense because it >> marks the start of the validity of the record. Idempotent records with >> a higher timestamp can be safely ignored. A corner case that I >> discussed with Matthias offline is when we do not materialize a KTable >> due to optimization. Then we cannot avoid the idempotent records >> because we do not keep the first record with the lower timestamp to >> compare to. >> >> Emit from KTable with aggregations: >> If we specify that an aggregation result should have the highest >> timestamp of the records that participated in the aggregation, we >> cannot ignore any idempotent records. Admittedly, the result of an >> aggregation usually changes, but there are aggregations where the >> result may not change like min and max, or sum when the incoming >> records have a value of zero. In those cases, we could benefit of the >> emit on change, but only if we define the semantics of the >> aggregations to not use the highest timestamp of the participating >> records for the result. In Kafka Streams, we do not have min, max, and >> sum as explicit aggregations, but we need to provide an API to define >> what timestamp should be used for the result of an aggregation if we >> want to go down this path. >> >> All of this does not block this KIP and I just wanted to put this >> aspects up for discussion. The KIP can limit itself to emit from >> materialized KTables. However, the limits should be explicitly stated >> in the KIP. >> >> Best, >> Bruno >> >> >> >> On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax <matth...@confluent.io> >> wrote: >> > >> > IMHO, the question about semantics depends on the use case, in >> > particular on the origin of a KTable. >> > >> > If there is a changlog topic that one reads directly into a KTable, >> > emit-on-change does actually make sense, because the timestamp indicates >> > _when_ the update was _effective_. For this case, it is semantically >> > sound to _not_ update the timestamp in the store, because the second >> > update is actually idempotent and advancing the timestamp is not ideal >> > (one could even consider it to be wrong to advance the timestamp) >> > because the "valid time" of the record pair did not change. >> > >> > This reasoning also applies to KTable-KTable joins. >> > >> > However, if the KTable is the result of an aggregation, I think >> > emit-on-update is more natural, because the timestamp reflects the >> > _last_ time (ie, highest timestamp) of all input records the contributed >> > to the result. Hence, updating the timestamp and emitting a new record >> > actually sounds correct to me. This applies to windowed and non-windowed >> > aggregations IMHO. >> > >> > However, considering the argument that the timestamp should not be >> > update in the first case in the store to begin with, both cases are >> > actually the same, and both can be modeled as emit-on-change: if a >> > `table()` operator does not update the timestamp if the value does not >> > change, there is _no_ change and thus nothing is emitted. At the same >> > time, if an aggregation operator does update the timestamp (even if the >> > value does not change) there _is_ a change and we emit. >> > >> > Note that handling out-of-order data for aggregations would also work >> > seamlessly with this approach -- for out-of-order records, the timestamp >> > does never change, and thus, we only emit if the result itself changes. >> > >> > Therefore, I would argue that we might not even need any config, because >> > the emit-on-change behavior is just correct and reduced the downstream >> > load, while our current behavior is not ideal (even if it's also >> correct). >> > >> > Thoughts? >> > >> > -Matthias >> > >> > On 1/24/20 9:37 AM, John Roesler wrote: >> > > Hi Bruno, >> > > >> > > Thanks for that idea. I hadn't considered that >> > > option before, and it does seem like that would be >> > > the right place to put it if we think it might be >> > > semantically important to control on a >> > > table-by-table basis. >> > > >> > > I had been thinking of it less semantically and >> > > more practically. In the context of a large >> > > topology, or more generally, a large software >> > > system that contains many topologies and other >> > > event-driven systems, each no-op result becomes an >> > > input that is destined to itself become a no-op >> > > result, and so on, all the way through the system. >> > > Thus, a single pointless processing result becomes >> > > amplified into a large number of pointless >> > > computations, cache perturbations, and network >> > > and disk I/O operations. If you also consider >> > > operations with fan-out implications, like >> > > branching or foreign-key joins, the wasted >> > > resources are amplified not just in proportion to >> > > the size of the system, but the size of the system >> > > times the average fan-out (to the power of the >> > > number of fan-out operations on the path(s) >> > > through the system). >> > > >> > > In my time operating such systems, I've observed >> > > these effects to be very real, and actually, the >> > > system and use case doesn't have to be very large >> > > before the amplification poses an existential >> > > threat to the system as a whole. >> > > >> > > This is the basis of my advocating for a simple >> > > behavior change, rather than an opt-in config of >> > > any kind. It seems like Streams should "do the >> > > right thing" for the majority use case. My theory >> > > (which may be wrong) is that the majority use case >> > > is more like "relational queries" than "CEP >> > > queries". Even if you were doing some >> > > event-sensitive computation, wouldn't you do them >> > > as Stream operations (where this feature is >> > > inapplicable anyway)? >> > > >> > > In keeping with the "practical" perspective, I >> > > suggested the opt-out config only in the (I think >> > > unlikely) event that filtering out pointless >> > > updates actually harms performance. I'd also be >> > > perfectly fine without the opt-out config. I >> > > really think that (because of the timestamp >> > > semantics work already underway), we're already >> > > pre-fetching the prior result most of the time, so >> > > there would actually be very little extra I/O >> > > involved in implementing emit-on-change. >> > > >> > > However, we should consider whether my experience >> > > is likely to be general. Do you have some use >> > > case in mind for which you'd actually want some >> > > KTable results to be emit-on-update for semantic >> > > reasons? >> > > >> > > Thanks, >> > > -John >> > > >> > > >> > > On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote: >> > >> Hi Richard, >> > >> >> > >> Thank you for the KIP. >> > >> >> > >> I agree with John that we should focus on the interface and behavior >> > >> change in a KIP. We can discuss the implementation later. >> > >> >> > >> I am also +1 for the survey. >> > >> >> > >> I had a thought about this. Couldn't we consider emit-on-change to be >> > >> one config of suppress (like `untilWindowCloses`)? What you basically >> > >> propose is to suppress updates if they do not change the result. >> > >> Considering emit on change as a flavour of suppress would be more >> > >> flexible because it would specify the behavior locally for a KTable >> > >> instead of globally for all KTables. Additionally, specifying the >> > >> behavior in one place instead of multiple places feels more intuitive >> > >> and consistent to me. >> > >> >> > >> Best, >> > >> Bruno >> > >> >> > >> On Fri, Jan 24, 2020 at 7:49 AM John Roesler <vvcep...@apache.org> >> wrote: >> > >>> >> > >>> Hi Richard, >> > >>> >> > >>> Thanks for picking this up! I know of at least one large community >> member >> > >>> for which this feature is absolutely essential. >> > >>> >> > >>> If I understand your two options, it seems like the proposal is to >> implement >> > >>> it as a behavior change regardless, and the question is whether to >> provide >> > >>> an opt-out config or not. >> > >>> >> > >>> Given that any implementation of this feature would have some >> performance >> > >>> impact under some workloads, and also that we don't know if anyone >> really >> > >>> depends on emit-on-update time semantics, it seems like we should >> propose >> > >>> to add an opt-out config. Can you update the KIP to mention the >> exact >> > >>> config key and value(s) you'd propose? >> > >>> >> > >>> Just to move the discussion forward, maybe something like: >> > >>> emit.on := change|update >> > >>> with the new default being "change" >> > >>> >> > >>> Thanks for pointing out the timestamp issue in particular. I agree >> that if >> > >>> we discard the latter update as a no-op, then we also have to >> discard its >> > >>> timestamp (obviously, we don't forward the timestamp update, as >> that's >> > >>> the whole point, but we also can't update the timestamp in the >> store, as >> > >>> the store must remain consistent with what has been emitted). >> > >>> >> > >>> I have to confess that I disagree with your implementation >> proposal, but >> > >>> it's also not necessary to discuss implementation in the KIP. Maybe >> it would >> > >>> be less controversial if you just drop that section for now, so >> that the KIP >> > >>> discussion can focus on the behavior change and config. >> > >>> >> > >>> Just for reference, there is some research into this domain. For >> example, >> > >>> see the "Report" section (3.2.3) of the SECRET paper: >> > >>> >> http://people.csail.mit.edu/tatbul/publications/maxstream_vldb10.pdf >> > >>> >> > >>> It might help to round out the proposal if you take a brief survey >> of the >> > >>> behaviors of other systems, along with pros and cons if any are >> reported. >> > >>> >> > >>> Thanks, >> > >>> -John >> > >>> >> > >>> >> > >>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote: >> > >>>> Hi everybody! >> > >>>> >> > >>>> I'd like to propose a change that we probably should've added for >> a long >> > >>>> time now. >> > >>>> >> > >>>> The key benefit of this KIP would be reduced traffic in Kafka >> Streams since >> > >>>> a lot of no-op results would no longer be sent downstream. >> > >>>> Here is the KIP for reference. >> > >>>> >> > >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams >> > >>>> >> > >>>> Currently, I seek to formalize our approach for this KIP first >> before we >> > >>>> determine concrete API additions / configurations. >> > >>>> Some configs might warrant adding, whiles others are not necessary >> since >> > >>>> adding them would only increase complexity of Kafka Streams. >> > >>>> >> > >>>> Cheers, >> > >>>> Richard >> > >>>> >> > >> >> > >> >