John, Thanks for your replies. As for the two options of the API, I think I'm slightly inclined to the first option as well. My motivation is a bit different, as I think of the first one maybe more flexible, for example:
KTable<Windowed<..>> table = ... count(); table.toStream().peek(..); // want to peek at the changelog stream, do not care about final results. table.suppress().toStream().to("topic"); // sending to a topic, want to only send the final results. -------------- Besides that, I have a few more minor questions: 1. For "allowedLateness", what should be the default value? I.e. if user do not specify "allowedLateness" in TimeWindows, what value should we set? 2. For API names, some personal suggestions here: 2.a) "allowedLateness" -> "until" (semantics changed, and also value is defined as delta on top of window length), where "until" -> "retentionPeriod", and the latter will be removed from `Windows` to ` WindowStoreBuilder` in the future. 2.b) "BufferConfig" -> "Buffered" ? Guozhang On Mon, Jul 9, 2018 at 2:09 PM, John Roesler <j...@confluent.io> wrote: > Hey Matthias and Guozhang, > > Sorry for the slow reply. I was mulling about your feedback and weighing > some ideas in a sketchbook PR: https://github.com/apache/kafka/pull/5337. > > Your thought about keeping suppression independent of business logic is a > very good one. I agree that it would make more sense to add some kind of > "window close" concept to the window definition. > > In fact, doing that immediately solves the inconsistency problem Guozhang > brought up. There's no need to add a "final results" or "emission" option > to the windowed aggregation. > > What do you think about an API more like this: > > final StreamsBuilder builder = new StreamsBuilder(); > > builder > .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) > .groupBy( > (String k1, String v1) -> k1, > Serialized.with(STRING_SERDE, STRING_SERDE) > ) > .windowedBy(TimeWindows > .of(scaledTime(2L)) > .until(scaledTime(3L)) > .allowedLateness(scaledTime(1L)) > ) > .count(Materialized.as("counts")) > .suppress( > emitFinalResultsOnly( > BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(SHUT_DOWN) > ) > ) > .toStream() > .to("output-suppressed", Produced.with(STRING_SERDE, LONG_SERDE)); > > Note that: > * "emitFinalResultsOnly" is available *only* on windowed tables (enforced > by the type system at compile time), and it determines the time to wait by > looking at "allowedLateness" on the TimeWindows config. > * querying "counts" will produce results (eventually) consistent with > what's observable in "output-suppressed". > * in all cases, "suppress" has no effect on business logic, just on event > suppression. > > Is this API straightforward? Or do you still prefer the version that both > proposed: > > ... > .windowedBy(TimeWindows > .of(scaledTime(2L)) > .until(scaledTime(3L)) > .allowedLateness(scaledTime(1L)) > ) > .count( > Materialized.as("counts"), > emitFinalResultsOnly( > BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(SHUT_DOWN) > ) > ) > ... > > To me, these two are practically identical, and I still vaguely prefer the > first one. > > The prototype has made clearer to me that users of "final results for > windows" and users of "suppression for table events" both need to configure > the suppression buffer. > > This buffer configuration consists of: > 1. how many keys or bytes to keep in memory > 2. what to do if memory runs out (shut down, start using disk, ...) > > So it's not as simple as setting a "final results" flag. We'll either have > an "Emit" config object on the windowed aggregators that takes the same > BufferConfig that the "Suppress" config on the suppression operator, or we > just use the suppression operator for both. > > Perhaps it would sweeten the deal a little to point out that we have 2 > overloads already for each windowed aggregator (with and without > Materialized). Adding "Emitted" or something would mean that we'd add a new > overload for each one, taking us up to 4 overloads each for "count", > "aggregate" and "reduce". Using "suppress" means that we don't add any new > overloads. > > Thanks again for helping to hash this out, > -John > > On Fri, Jul 6, 2018 at 6:20 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > I think I agree with Matthias for having dedicated APIs for windowed > > operation final output scenario, PLUS separating the window close which > the > > "final output" would rely on, from the window retention time itself > > (admittedly it would make this KIP effort larger, but if we believe we > need > > to do this separation anyways we could just do it now). > > > > And then we can have the `KTable#suppress()` for intermediate-suppression > > only, not for late-record-suppression, until we've seen that becomes a > > common feature request because our current design still allows to be > > extended for that purpose. > > > > > > Guozhang > > > > On Wed, Jul 4, 2018 at 12:53 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Thanks for the discussion. I am just catching up. > > > > > > In general, I think we have different uses cases and non-windowed and > > > windowed is quite different. For the non-windowed case, suppress() has > > > no (useful) close or retention time, no final semantics, and also no > > > business logic impact. > > > > > > On the other hand, for windowed aggregations, close time and final > > > result do have a meaning. IMHO, `close()` is part of business logic > > > while retention time is not. Also, suppression of intermediate result > is > > > not a business rule and there might be use case for which either "early > > > intermediate" (before window end time) are suppressed only, or all > > > intermediates are suppressed (maybe also something in the middle, ie, > > > just reduce the load of intermediate updates). Thus, window-suppression > > > is much richer. > > > > > > IMHO, a generic `suppress()` operator that can be inserted into the > data > > > flow at any point is useful. Maybe we should keep is as generic as > > > possible. However, it might be difficult to use with regard to > > > windowing, as the mental effort to use it is high. > > > > > > With regard to Guozhang's comment: > > > > > > > we will actually > > > > process data as old as 30 days as well, while most of the late > updates > > > > beyond 5 minutes would be discarded anyways. > > > > > > If we use `suppress()` as a standalone operator, this is correct and > > > intended IMHO. To address the issue if the behavior is unwanted, I > would > > > suggest to add a "suppress option" directly to > > > `count()/reduce()/aggregate()` window operator similar to > > > `Materialized`. This would be an "embedded suppress" and avoid the > > > issue. It would also address the issue about mental effort for "single > > > final window result" use case. > > > > > > I also think that a shorter close-time than retention time is useful > for > > > window aggregation. If we add close() to the window definition and > > > until() to `Materialized`, we can separate both correctly IMHO. > > > > > > About setting `close = min(close,retention)` I am not sure. We might > > > rather throw an exception than reducing the close time automatically. > > > Otherwise, I see many user question about "I set close to X but it does > > > not get updated for some data that is with delay of X". > > > > > > The tricky question might be to design the API in a backward compatible > > > way though. > > > > > > > > > > > > -Matthias > > > > > > On 7/3/18 5:38 AM, John Roesler wrote: > > > > Hi Guozhang, > > > > > > > > I see. It seems like if we want to decouple 1) and 2), we need to > alter > > > the > > > > definition of the window. Do you think it would close the gap if we > > > added a > > > > "window close" time to the window definition? > > > > > > > > Such as: > > > > > > > > builder.stream("input") > > > > .groupByKey() > > > > .windowedBy( > > > > TimeWindows > > > > .of(60_000) > > > > .closeAfter(10 * 60) > > > > .until(30L * 24 * 60 * 60 * 1000) > > > > ) > > > > .count() > > > > .suppress(Suppression.finalResultsOnly()); > > > > > > > > Possibly called "finalResultsAtWindowClose" or something? > > > > > > > > Thanks, > > > > -John > > > > > > > > On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > >> Hey John, > > > >> > > > >> Obviously I'm too lazy on email replying diligence compared with you > > :) > > > >> Will try to reply them separately: > > > >> > > > >> > > > >> ------------------------------------------------------------ > > > ----------------- > > > >> > > > >> To reply your email on "Mon, Jul 2, 2018 at 8:23 AM": > > > >> > > > >> I'm aware of this use case, but again, the concern is that, in this > > > setting > > > >> in order to let the window be queryable for 30 days, we will > actually > > > >> process data as old as 30 days as well, while most of the late > updates > > > >> beyond 5 minutes would be discarded anyways. Personally I think for > > the > > > >> final update scenario, the ideal situation users would want is that > > "do > > > not > > > >> process any data that is less than 5 minutes, and of course no > update > > > >> records to the downstream later than 5 minutes either; but retain > the > > > >> window to be queryable for 30 days". And by doing that the final > > window > > > >> snapshot would also be aligned with the update stream as well. In > > other > > > >> words, among these three periods: > > > >> > > > >> 1) the retention length of the window / table. > > > >> 2) the late records acceptance for updating the window. > > > >> 3) the late records update to be sent downstream. > > > >> > > > >> Final update use cases would naturally want 2) = 3), while 1) may be > > > >> different and larger, while what we provide now is that 1) = 2), > which > > > >> could be different and in practice larger than 3), hence not the > most > > > >> intuitive for their needs. > > > >> > > > >> > > > >> > > > >> ------------------------------------------------------------ > > > ----------------- > > > >> > > > >> To reply your email on "Mon, Jul 2, 2018 at 10:27 AM": > > > >> > > > >> I'd like option 2) over option 1) better as well from programming > pov. > > > But > > > >> I'm wondering if option 2) would provide the above semantics or it > is > > > still > > > >> coupling 1) with 2) as well ? > > > >> > > > >> > > > >> > > > >> Guozhang > > > >> > > > >> > > > >> > > > >> > > > >> On Mon, Jul 2, 2018 at 1:08 PM, John Roesler <j...@confluent.io> > > wrote: > > > >> > > > >>> In fact, to push the idea further (which IIRC is what Matthias > > > originally > > > >>> proposed), if we can accept "Suppression#finalResultsOnly" in my > last > > > >>> email, then we could also consider whether to eliminate > > > >>> "suppressLateEvents" entirely. > > > >>> > > > >>> We could always add it later, but you've both expressed doubt that > > > there > > > >>> are practical use cases for it outside of final-results. > > > >>> > > > >>> -John > > > >>> > > > >>> On Mon, Jul 2, 2018 at 12:27 PM John Roesler <j...@confluent.io> > > > wrote: > > > >>> > > > >>>> Hi again, Guozhang ;) Here's the second part of my response... > > > >>>> > > > >>>> It seems like your main concern is: "if I'm a user who wants final > > > >> update > > > >>>> semantics, how complicated is it for me to get it?" > > > >>>> > > > >>>> I think we have to assume that people don't always have time to > > become > > > >>>> deeply familiar with all the nuances of a programming environment > > > >> before > > > >>>> they use it. Especially if they're evaluating several frameworks > for > > > >>> their > > > >>>> use case, it's very valuable to make it as obvious as possible how > > to > > > >>>> accomplish various computations with Streams. > > > >>>> > > > >>>> To me the biggest question is whether with a fresh perspective, > > people > > > >>>> would say "oh, I get it, I have to bound my lateness and suppress > > > >>>> intermediate updates, and of course I'll get only the final > > result!", > > > >> or > > > >>> if > > > >>>> it's more like "wtf? all I want is the final result, what are all > > > these > > > >>>> parameters?". > > > >>>> > > > >>>> I was talking with Matthias a while back, and he had an idea that > I > > > >> think > > > >>>> can help, which is to essentially set up a final-result recipe in > > > >>> addition > > > >>>> to the raw parameters. I previously thought that it wouldn't be > > > >> possible > > > >>> to > > > >>>> restrict its usage to Windowed KTables, but thinking about it > again > > > >> this > > > >>>> weekend, I have a couple of ideas: > > > >>>> > > > >>>> ================ > > > >>>> = 1. Static Wrapper = > > > >>>> ================ > > > >>>> We can define an extra static function that "wraps" a KTable with > > > >>>> final-result semantics. > > > >>>> > > > >>>> public static <K extends Windowed, V> KTable<K, V> > finalResultsOnly( > > > >>>> final KTable<K, V> windowedKTable, > > > >>>> final Duration maxAllowedLateness, > > > >>>> final Suppression.BufferFullStrategy bufferFullStrategy) { > > > >>>> return windowedKTable.suppress( > > > >>>> Suppression.suppressLateEvents(maxAllowedLateness) > > > >>>> .suppressIntermediateEvents( > > > >>>> IntermediateSuppression > > > >>>> .emitAfter(maxAllowedLateness) > > > >>>> .bufferFullStrategy(bufferFullStrategy) > > > >>>> ) > > > >>>> ); > > > >>>> } > > > >>>> > > > >>>> Because windowedKTable is a parameter, the static function can > > easily > > > >>>> impose an extra bound on the key type, that it extends Windowed. > > This > > > >>> would > > > >>>> make "final results only" only available on windowed ktables. > > > >>>> > > > >>>> Here's how it would look to use: > > > >>>> > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ... > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts = > > > >>>> finalResultsOnly( > > > >>>> windowCounts, > > > >>>> Duration.ofMinutes(10), > > > >>>> Suppression.BufferFullStrategy.SHUT_DOWN > > > >>>> ); > > > >>>> > > > >>>> Trying to use it on a non-windowed KTable yields: > > > >>>> > > > >>>>> Error:(129, 35) java: method finalResultsOnly in class > > > >>>>> org.apache.kafka.streams.kstream.internals.KTableAggregateTest > > > cannot > > > >>> be > > > >>>>> applied to given types; > > > >>>>> required: > > > >>>>> org.apache.kafka.streams.kstream.KTable<K,V>,java.time. > > > >>> Duration,org.apache.kafka.streams.kstream.Suppression. > > > BufferFullStrategy > > > >>>>> found: > > > >>>>> org.apache.kafka.streams.kstream.KTable<java.lang. > > > >>> String,java.lang.String>,java.time.Duration,org.apache. > > > >>> kafka.streams.kstream.Suppression.BufferFullStrategy > > > >>>>> reason: inference variable K has incompatible bounds > > > >>>>> equality constraints: java.lang.String > > > >>>>> upper bounds: org.apache.kafka.streams.kstream.Windowed > > > >>>> > > > >>>> > > > >>>> > > > >>>> ================================================= > > > >>>> = 2. Add <K,V> parameters and recipe method to Suppression = > > > >>>> ================================================= > > > >>>> > > > >>>> By adding K,V parameters to Suppression, we can provide a > similarly > > > >>>> bounded config method directly on the Suppression class: > > > >>>> > > > >>>> public static <K extends Windowed, V> Suppression<K, V> > > > >>>> finalResultsOnly(final Duration maxAllowedLateness, final > > > >>>> BufferFullStrategy bufferFullStrategy) { > > > >>>> return Suppression > > > >>>> .<K, V>suppressLateEvents(maxAllowedLateness) > > > >>>> .suppressIntermediateEvents(IntermediateSuppression > > > >>>> .emitAfter(maxAllowedLateness) > > > >>>> .bufferFullStrategy(bufferFullStrategy) > > > >>>> ); > > > >>>> } > > > >>>> > > > >>>> Then, here's how it would look to use it: > > > >>>> > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ... > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts = > > > >>>> windowCounts.suppress( > > > >>>> Suppression.finalResultsOnly( > > > >>>> Duration.ofMinutes(10) > > > >>>> Suppression.BufferFullStrategy.SHUT_DOWN > > > >>>> ) > > > >>>> ); > > > >>>> > > > >>>> Trying to use it on a non-windowed ktable yields: > > > >>>> > > > >>>>> Error:(127, 35) java: method finalResultsOnly in class > > > >>>>> org.apache.kafka.streams.kstream.Suppression<K,V> cannot be > applied > > > to > > > >>>>> given types; > > > >>>>> required: > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream. > > > >>> Suppression.BufferFullStrategy > > > >>>>> found: > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream. > > > >>> Suppression.BufferFullStrategy > > > >>>>> reason: explicit type argument java.lang.String does not > conform > > to > > > >>>>> declared bound(s) org.apache.kafka.streams.kstream.Windowed > > > >>>> > > > >>>> > > > >>>> > > > >>>> ============ > > > >>>> = Downsides = > > > >>>> ============ > > > >>>> > > > >>>> Of course, there's a downside either way: > > > >>>> * for 1: this "wrapper" interaction would be the first in the > DSL. > > Is > > > >> it > > > >>>> too strange, and how discoverable would it be? > > > >>>> * for 2: adding those type parameters to Suppression will force > all > > > >>>> callers to provide them in the event of a chained construction > > because > > > >>> Java > > > >>>> doesn't do RHS recursive type inference. This is already visible > in > > > >> other > > > >>>> parts of the Streams DSL. For example, often calls to Materialized > > > >>> builders > > > >>>> have to provide seemingly obvious type bounds. > > > >>>> > > > >>>> ============ > > > >>>> = Conclusion = > > > >>>> ============ > > > >>>> > > > >>>> I think option 2 is more "normal" and discoverable. It does have a > > > >>>> downside, but it's one that's pre-existing elsewhere in the DSL. > > > >>>> > > > >>>> WDYT? Would the addition of this "recipe" method to Suppression > > > resolve > > > >>>> your concern? > > > >>>> > > > >>>> Thanks again, > > > >>>> -John > > > >>>> > > > >>>> On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang <wangg...@gmail.com > > > > > >>> wrote: > > > >>>> > > > >>>>> Hi John, > > > >>>>> > > > >>>>> Regarding the metrics: yeah I think I'm with you that the dropped > > > >>> records > > > >>>>> due to window retention or emit suppression policies should be > > > >> recorded > > > >>>>> differently, and using this KIP's proposed metric would be fine. > If > > > >> you > > > >>>>> also think we can use this KIP's proposed metrics to cover the > > window > > > >>>>> retention cased skipping records, then we can include the changes > > in > > > >>> this > > > >>>>> KIP as well. > > > >>>>> > > > >>>>> Regarding the current proposal, I'm actually not too worried > about > > > the > > > >>>>> inconsistency between query semantics and downstream emit > > semantics. > > > >> For > > > >>>>> queries, we will always return the current running results of the > > > >>> windows, > > > >>>>> being it partial or final results depending on the window > retention > > > >> time > > > >>>>> anyways, which has nothing to do whether the emitted stream > should > > be > > > >>> one > > > >>>>> final output per key or not. I also agree that having a unified > > > >>> operation > > > >>>>> is generally better for users to focus on leveraging that one > only > > > >> than > > > >>>>> learning about two set of operations. The only question I had is, > > for > > > >>>>> final > > > >>>>> updates of window stores, if it is a bit awkward to understand > the > > > >>>>> configuration combo. Thinking about this more, I think my root > > worry > > > >> in > > > >>>>> the > > > >>>>> "suppressLateEvents" call for windowed tables, since from a user > > > >>>>> perspective: if my retention time is X which means "pay the cost > to > > > >>> allow > > > >>>>> late records up to X to still be applied updating the tables", > why > > > >>> would I > > > >>>>> ever want to suppressLateEvents by Y ( < X), to say "do not send > > the > > > >>>>> updates up to Y, which means the downstream operator or sink > topic > > > for > > > >>>>> this > > > >>>>> stream would actually see a truncated update stream while I've > paid > > > >>> larger > > > >>>>> cost for that"; and of course, Y > X would not make sense either > as > > > >> you > > > >>>>> would not see any updates later than X anyways. So in all, my > > feeling > > > >> is > > > >>>>> that it makes less sense for windowed table's > "suppressLateEvents" > > > >> with > > > >>> a > > > >>>>> parameter that is not equal to the window retention, and opening > > the > > > >>> door > > > >>>>> in the current proposal may confuse people with that. > > > >>>>> > > > >>>>> Again, above is just a subjective opinion and probably we can > also > > > >> bring > > > >>>>> up > > > >>>>> some scenarios that users does want to set X != Y.. but > personally > > I > > > >>> feel > > > >>>>> that even if the semantics for this scenario if intuitive for > user > > to > > > >>>>> understand, doe that really make sense and should we really open > > the > > > >>> door > > > >>>>> for it. So I think maybe separating the final update in a > separate > > > >> API's > > > >>>>> benefits may overwhelm the advantage of having one uniform > > > definition. > > > >>> And > > > >>>>> for my alternative proposal, the rationale was from both my > concern > > > >>> about > > > >>>>> "suppressLateEvents" for windowed store, and Matthias' question > > about > > > >>>>> "suppressLateEvents" for non-windowed stores, that if it is less > > > >>>>> meaningful > > > >>>>> for both, we can consider removing it completely and only do > > > >>>>> "IntermediateSuppression" in Suppress instead. > > > >>>>> > > > >>>>> So I'd summarize my thoughts in the following questions: > > > >>>>> > > > >>>>> 1. Does "suppressLateEvents" with parameter Y != X (window > > retention > > > >>> time) > > > >>>>> for windowed stores make sense in practice? > > > >>>>> 2. Does "suppressLateEvents" with any parameter Y for > non-windowed > > > >>> stores > > > >>>>> make sense in practice? > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> Guozhang > > > >>>>> > > > >>>>> > > > >>>>> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck <bbej...@gmail.com> > > > >> wrote: > > > >>>>> > > > >>>>>> Thanks for the explanation, that does make sense. I have some > > > >>>>> questions on > > > >>>>>> operations, but I'll just wait for the PR and tests. > > > >>>>>> > > > >>>>>> Thanks, > > > >>>>>> Bill > > > >>>>>> > > > >>>>>> On Wed, Jun 27, 2018 at 8:14 PM John Roesler <j...@confluent.io > > > > > >>> wrote: > > > >>>>>> > > > >>>>>>> Hi Bill, > > > >>>>>>> > > > >>>>>>> Thanks for the review! > > > >>>>>>> > > > >>>>>>> Your question is very much applicable to the KIP and not at all > > an > > > >>>>>>> implementation detail. Thanks for bringing it up. > > > >>>>>>> > > > >>>>>>> I'm proposing not to change the existing caches and > > configurations > > > >>> at > > > >>>>> all > > > >>>>>>> (for now). > > > >>>>>>> > > > >>>>>>> Imagine you have a topology like this: > > > >>>>>>> commit.interval.ms = 100 > > > >>>>>>> > > > >>>>>>> (ktable1 (cached)) -> (suppress emitAfter 200) > > > >>>>>>> > > > >>>>>>> The first ktable (ktable1) will respect the commit interval and > > > >>> buffer > > > >>>>>>> events for 100ms before logging, storing, or forwarding them > > > >> (IIRC). > > > >>>>>>> Therefore, the second ktable (suppress) will only see the > events > > > >> at > > > >>> a > > > >>>>>> rate > > > >>>>>>> of once per 100ms. It will apply its own buffering, and emit > once > > > >>> per > > > >>>>>> 200ms > > > >>>>>>> This case is pretty trivial because the suppress time is a > > > >> multiple > > > >>> of > > > >>>>>> the > > > >>>>>>> commit interval. > > > >>>>>>> > > > >>>>>>> When it's not an integer multiple, you'll get behavior like in > > > >> this > > > >>>>>> marble > > > >>>>>>> diagram: > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)-> > > > >>>>>>> > > > >>>>>>> [ KTable caching with commit interval = 2 ] > > > >>>>>>> > > > >>>>>>> <--------(k:2)---------(k:4)---------(k:6)-> > > > >>>>>>> > > > >>>>>>> [ suppress with emitAfter = 3 ] > > > >>>>>>> > > > >>>>>>> <---------------(k:2)----------------(k:6)-> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> If this behavior isn't desired (for example, if you wanted to > > emit > > > >>>>> (k:3) > > > >>>>>> at > > > >>>>>>> time 3, I'd recommend setting the "cache.max.bytes.buffering" > to > > 0 > > > >>> or > > > >>>>>>> modifying the topology to disable caching. Then, the behavior > is > > > >>> more > > > >>>>>>> simply determined just by the suppress operator. > > > >>>>>>> > > > >>>>>>> Does that seem right to you? > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> Regarding the changelogs, because the suppression operator > hangs > > > >>> onto > > > >>>>>>> events for a while, it will need its own changelog. The > changelog > > > >>>>>>> should represent the current state of the buffer at all times. > So > > > >>> when > > > >>>>>> the > > > >>>>>>> suppress operator sees (k:2), for example, it will log (k:2). > > When > > > >>> it > > > >>>>>>> later gets to time 3, it's time to emit (k:2) downstream. > Because > > > >> k > > > >>>>> is no > > > >>>>>>> longer buffered, the suppress operator will log (k:null). Thus, > > > >> when > > > >>>>>>> recovering, > > > >>>>>>> it can rebuild the buffer by reading its changelog. > > > >>>>>>> > > > >>>>>>> What do you think about this? > > > >>>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> -John > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck <bbej...@gmail.com > > > > > >>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Hi John, thanks for the KIP. > > > >>>>>>>> > > > >>>>>>>> Early on in the KIP, you mention the current approaches for > > > >>>>> controlling > > > >>>>>>> the > > > >>>>>>>> rate of downstream records from a KTable, cache size > > > >> configuration > > > >>>>> and > > > >>>>>>>> commit time. > > > >>>>>>>> > > > >>>>>>>> Will these configuration parameters still be in effect for > > > >> tables > > > >>>>> that > > > >>>>>>>> don't use suppression? For tables taking advantage of > > > >>> suppression, > > > >>>>>> will > > > >>>>>>>> these configurations have no impact? > > > >>>>>>>> This last question may be to implementation specific but if > the > > > >>>>>> requested > > > >>>>>>>> suppression time is longer than the specified commit time, > will > > > >>> the > > > >>>>>>> latest > > > >>>>>>>> record in the suppression buffer get stored in a changelog? > > > >>>>>>>> > > > >>>>>>>> Thanks, > > > >>>>>>>> Bill > > > >>>>>>>> > > > >>>>>>>> On Wed, Jun 27, 2018 at 3:04 PM John Roesler < > j...@confluent.io > > > >>> > > > >>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Thanks for the feedback, Matthias, > > > >>>>>>>>> > > > >>>>>>>>> It seems like in straightforward relational processing cases, > > > >> it > > > >>>>>> would > > > >>>>>>>> not > > > >>>>>>>>> make sense to bound the lateness of KTables. In general, it > > > >>> seems > > > >>>>>>> better > > > >>>>>>>> to > > > >>>>>>>>> have "guard rails" in place that make it easier to write > > > >>> sensible > > > >>>>>>>> programs > > > >>>>>>>>> than insensible ones. > > > >>>>>>>>> > > > >>>>>>>>> But I'm still going to argue in favor of keeping it for all > > > >>>>> KTables > > > >>>>>> ;) > > > >>>>>>>>> > > > >>>>>>>>> 1. I believe it is simpler to understand the operator if it > > > >> has > > > >>>>> one > > > >>>>>>>> uniform > > > >>>>>>>>> definition, regardless of context. It's well defined and > > > >>> intuitive > > > >>>>>> what > > > >>>>>>>>> will happen when you use late-event suppression on a KTable, > > > >> so > > > >>> I > > > >>>>>> think > > > >>>>>>>>> nothing surprising or dangerous will happen in that case. > From > > > >>> my > > > >>>>>>>>> perspective, having two sets of allowed operations is > actually > > > >>> an > > > >>>>>>>> increase > > > >>>>>>>>> in cognitive complexity. > > > >>>>>>>>> > > > >>>>>>>>> 2. To me, it's not crazy to use the operator this way. For > > > >>>>> example, > > > >>>>>> in > > > >>>>>>>> lieu > > > >>>>>>>>> of full-featured timestamp semantics, I can implement MVCC > > > >>>>> behavior > > > >>>>>>> when > > > >>>>>>>>> building a KTable by "suppressLateEvents(Duration.ZERO)". I > > > >>>>> suspect > > > >>>>>>> that > > > >>>>>>>>> there are other, non-obvious applications of suppressing late > > > >>>>> events > > > >>>>>> on > > > >>>>>>>>> KTables. > > > >>>>>>>>> > > > >>>>>>>>> 3. Not to get too much into implementation details in a KIP > > > >>>>>> discussion, > > > >>>>>>>> but > > > >>>>>>>>> if we did want to make late-event suppression available only > > > >> on > > > >>>>>>> windowed > > > >>>>>>>>> KTables, we have two enforcement options: > > > >>>>>>>>> a. check when we build the topology - this would be simple > > > >> to > > > >>>>>>>> implement, > > > >>>>>>>>> but would be a runtime check. Hopefully, people write tests > > > >> for > > > >>>>> their > > > >>>>>>>>> topology before deploying them, so the feedback loop isn't > > > >>>>>>> instantaneous, > > > >>>>>>>>> but it's not too long either. > > > >>>>>>>>> b. add a new WindowedKTable type - this would be a compile > > > >>> time > > > >>>>>>> check, > > > >>>>>>>>> but would also be substantial increase of both interface and > > > >>> code > > > >>>>>>>>> complexity. > > > >>>>>>>>> > > > >>>>>>>>> We should definitely strive to have guard rails protecting > > > >>> against > > > >>>>>>>>> surprising or dangerous behavior. Protecting against programs > > > >>>>> that we > > > >>>>>>>> don't > > > >>>>>>>>> currently predict is a lesser benefit, and I think we can put > > > >> up > > > >>>>>> guard > > > >>>>>>>>> rails on a case-by-case basis for that. It seems like the > > > >>>>> increase in > > > >>>>>>>>> cognitive (and potentially code and interface) complexity > > > >> makes > > > >>> me > > > >>>>>>> think > > > >>>>>>>> we > > > >>>>>>>>> should skip this case. > > > >>>>>>>>> > > > >>>>>>>>> What do you think? > > > >>>>>>>>> > > > >>>>>>>>> Thanks, > > > >>>>>>>>> -John > > > >>>>>>>>> > > > >>>>>>>>> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax < > > > >>>>>>> matth...@confluent.io> > > > >>>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Thanks for the KIP John. > > > >>>>>>>>>> > > > >>>>>>>>>> One initial comments about the last example "Bounded > > > >>> lateness": > > > >>>>>> For a > > > >>>>>>>>>> non-windowed KTable bounding the lateness does not really > > > >> make > > > >>>>>> sense, > > > >>>>>>>>>> does it? > > > >>>>>>>>>> > > > >>>>>>>>>> Thus, I am wondering if we should allow > > > >> `suppressLateEvents()` > > > >>>>> for > > > >>>>>>> this > > > >>>>>>>>>> case? It seems to be better to only allow it for > > > >>>>> windowed-KTables. > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> -Matthias > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> On 6/27/18 8:53 AM, Ted Yu wrote: > > > >>>>>>>>>>> I noticed this (lack of primary parameter) as well. > > > >>>>>>>>>>> > > > >>>>>>>>>>> What you gave as new example is semantically the same as > > > >>> what > > > >>>>> I > > > >>>>>>>>>> suggested. > > > >>>>>>>>>>> So it is good by me. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Thanks > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler < > > > >>>>> j...@confluent.io > > > >>>>>>> > > > >>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Thanks for taking look, Ted, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> I agree this is a departure from the conventions of > > > >> Streams > > > >>>>> DSL. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Most of our config objects have one or two "required" > > > >>>>>> parameters, > > > >>>>>>>>> which > > > >>>>>>>>>> fit > > > >>>>>>>>>>>> naturally with the static factory method approach. > > > >>>>> TimeWindow, > > > >>>>>> for > > > >>>>>>>>>> example, > > > >>>>>>>>>>>> requires a size parameter, so we can naturally say > > > >>>>>>>>> TimeWindows.of(size). > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> I think in the case of a suppression, there's really no > > > >>>>> "core" > > > >>>>>>>>>> parameter, > > > >>>>>>>>>>>> and "Suppression.of()" seems sillier than "new > > > >>>>> Suppression()". I > > > >>>>>>>> think > > > >>>>>>>>>> that > > > >>>>>>>>>>>> Suppression.of(duration) would be ambiguous, since there > > > >>> are > > > >>>>>> many > > > >>>>>>>>>> durations > > > >>>>>>>>>>>> that we can configure. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> However, thinking about it again, I suppose that I can > > > >> give > > > >>>>> each > > > >>>>>>>>>>>> configuration method a static version, which would let > > > >> you > > > >>>>>> replace > > > >>>>>>>>> "new > > > >>>>>>>>>>>> Suppression()." with "Suppression." in all the examples. > > > >>>>>>> Basically, > > > >>>>>>>>>> instead > > > >>>>>>>>>>>> of "of()", we'd support any of the methods I listed. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> For example: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> windowCounts > > > >>>>>>>>>>>> .suppress( > > > >>>>>>>>>>>> Suppression > > > >>>>>>>>>>>> .suppressLateEvents(Duration.ofMinutes(10)) > > > >>>>>>>>>>>> .suppressIntermediateEvents( > > > >>>>>>>>>>>> > > > >>>>>>>>>> IntermediateSuppression.emitAfter(Duration.ofMinutes(10)) > > > >>>>>>>>>>>> ) > > > >>>>>>>>>>>> ); > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Does that seem better? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>> -John > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu < > > > >>> yuzhih...@gmail.com > > > >>>>>> > > > >>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> I started to read this KIP which contains a lot of > > > >>>>> materials. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> One suggestion: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> .suppress( > > > >>>>>>>>>>>>> new Suppression() > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Do you think it would be more consistent with the rest > > > >> of > > > >>>>>> Streams > > > >>>>>>>>> data > > > >>>>>>>>>>>>> structures by supporting `of` ? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Suppression.of(Duration.ofMinutes(10)) > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Cheers > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler < > > > >>>>>> j...@confluent.io > > > >>>>>>>> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Hello devs and users, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Please take some time to consider this proposal for > > > >> Kafka > > > >>>>>>> Streams: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> KIP-328: Ability to suppress updates for KTables > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> link: https://cwiki.apache.org/confluence/x/sQU0BQ > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> The basic idea is to provide: > > > >>>>>>>>>>>>>> * more usable control over update rate (vs the current > > > >>>>> state > > > >>>>>>> store > > > >>>>>>>>>>>>> caches) > > > >>>>>>>>>>>>>> * the final-result-for-windowed-computations feature > > > >>> which > > > >>>>>>> several > > > >>>>>>>>>>>> people > > > >>>>>>>>>>>>>> have requested > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I look forward to your feedback! > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>> -John > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> -- > > > >>>>> -- Guozhang > > > >>>>> > > > >>>> > > > >>> > > > >> > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang