That is a good point.. I cannot think of a better option than documentation and warning, and also given that we'd probably better not reusing the function name `until` for close time.
Guozhang On Tue, Jul 10, 2018 at 3:31 PM, John Roesler <j...@confluent.io> wrote: > I had some opportunity to reflect on the default for close time today... > > Note that the current "close time" is equal to the retention time, and > therefore "close" today shares the default retention of 24h. > > It would definitely break any application that today specifies a retention > time to set close shorter than that time. It's also likely to break apps if > they *don't* set the retention time and rely on the 24h default. So it's > unfortunate, but I think if "close" isn't set, we should use the retention > time instead of a fixed default. > > When we ultimately remove the retention time parameter ("until"), we will > have to set "close" to a default of 24h. > > Of course, this has a negative impact on the user of "final results", since > they won't see any output at all for retentionTime/24h, and may find this > confusing. What can we do about this except document it well? Maybe log a > warning if we see that close wasn't explicitly set while using "final > results"? > > Thanks, > -John > > On Tue, Jul 10, 2018 at 10:46 AM John Roesler <j...@confluent.io> wrote: > > > Hi Guozhang, > > > > That sounds good to me. I'll include that in the KIP. > > > > Thanks, > > -John > > > > On Mon, Jul 9, 2018 at 6:33 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > >> Let me clarify a bit on what I meant about moving `retentionPeriod` to > >> WindowStoreBuilder: > >> > >> In another discussion we had around KIP-319 / 330, that the "retention > >> period" should not really be a window spec, but only a window store > spec, > >> as it only affects how long to retain each window to be queryable along > >> with the storage cost. > >> > >> More specifically, today the "maintainMs" returned from Windows is used > in > >> three places: > >> > >> 1) for windowed aggregations, they are passed in directly into > >> `Stores.persistentWindows()` as the retention period parameters. For > this > >> use case we should just let the WindowStoreBuilder to specify this value > >> itself. > >> > >> NOTE: It is also returned in the KStreamWindowAggregate processor, to > >> determine if a received record should be dropped due to its lateness. We > >> may need to think of another way to get this value inside the processor > >> > >> 2) for windowed stream-stream join, it is used as the join range > parameter > >> but only to check that "windowSizeMs <= retentionPeriodMs". We can do > this > >> check at the store builder lever instead of at the processor level. > >> > >> > >> If we can remove its usage in both 1) and 2), then we should be able to > >> safely remove this from the `Windows` spec. > >> > >> > >> Guozhang > >> > >> > >> On Mon, Jul 9, 2018 at 3:53 PM, John Roesler <j...@confluent.io> wrote: > >> > >> > Thanks for the reply, Guozhang, > >> > > >> > Good! I agree, that is also a good reason, and I actually made use of > >> that > >> > in my tests. I'll update the KIP. > >> > > >> > By the way, I chose "allowedLateness" as I was trying to pick a better > >> name > >> > than "close", but I think it's actually the wrong name. We don't want > to > >> > bound the lateness of events in general, only with respect to the end > of > >> > their window. > >> > > >> > If we have a window [0,10), with "allowedLateness" of 5, then if we > get > >> an > >> > event with timestamp 3 at time 9, the name implies we'd reject it, > which > >> > seems silly. Really, we'd only want to start rejecting that event at > >> stream > >> > time 15. > >> > > >> > What I meant was more like "allowedLatenessAfterWindowEnd", but > that's > >> too > >> > verbose. I think that "close" + some documentation about what it means > >> will > >> > be better. > >> > > >> > 1: "Close" would be measured from the end of the window, so a > reasonable > >> > default would be "0". Recall that "close" really only needs to be > >> specified > >> > for final results, and a default of 0 would produce the most intuitive > >> > results. If folks later discover that they are missing some late > events, > >> > they can adjust the parameter accordingly. IMHO, any other value would > >> just > >> > be a guess on our part. > >> > > >> > 2a: > >> > I think you're saying to re-use "until" instead of adding "close" to > the > >> > window. > >> > > >> > The downside here would be that the semantic change could be more > >> confusing > >> > than deprecating "until" and introducing window "close" and a > >> > "retentionTime" on the store builder. The deprecation is a good, > >> controlled > >> > way for us to make sure people are getting the semantics they think > >> they're > >> > getting, as well as giving us an opportunity to link people to the API > >> they > >> > should use instead. > >> > > >> > I didn't fully understand the second part, but it sounds like you're > >> > suggesting to add a new "retentionTime" setter to Windows to bridge > the > >> gap > >> > until we add it to the store builder? That seems kind of roundabout to > >> me, > >> > if that's what you meant. We could just immediately add it to the > store > >> > builders in the same PR. > >> > > >> > 2b: Sounds good to me! > >> > > >> > Thanks again, > >> > -John > >> > > >> > > >> > On Mon, Jul 9, 2018 at 4:55 PM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > > >> > > John, > >> > > > >> > > Thanks for your replies. As for the two options of the API, I think > >> I'm > >> > > slightly inclined to the first option as well. My motivation is a > bit > >> > > different, as I think of the first one maybe more flexible, for > >> example: > >> > > > >> > > KTable<Windowed<..>> table = ... count(); > >> > > > >> > > table.toStream().peek(..); // want to peek at the changelog > stream, > >> do > >> > > not care about final results. > >> > > > >> > > table.suppress().toStream().to("topic"); // sending to a topic, > >> want > >> > to > >> > > only send the final results. > >> > > > >> > > -------------- > >> > > > >> > > Besides that, I have a few more minor questions: > >> > > > >> > > 1. For "allowedLateness", what should be the default value? I.e. if > >> user > >> > do > >> > > not specify "allowedLateness" in TimeWindows, what value should we > >> set? > >> > > > >> > > 2. For API names, some personal suggestions here: > >> > > > >> > > 2.a) "allowedLateness" -> "until" (semantics changed, and also > value > >> is > >> > > defined as delta on top of window length), where "until" -> > >> > > "retentionPeriod", and the latter will be removed from `Windows` to > ` > >> > > WindowStoreBuilder` in the future. > >> > > > >> > > 2.b) "BufferConfig" -> "Buffered" ? > >> > > > >> > > > >> > > > >> > > Guozhang > >> > > > >> > > > >> > > On Mon, Jul 9, 2018 at 2:09 PM, John Roesler <j...@confluent.io> > >> wrote: > >> > > > >> > > > Hey Matthias and Guozhang, > >> > > > > >> > > > Sorry for the slow reply. I was mulling about your feedback and > >> > weighing > >> > > > some ideas in a sketchbook PR: https://github.com/apache/ > >> > kafka/pull/5337 > >> > > . > >> > > > > >> > > > Your thought about keeping suppression independent of business > logic > >> > is a > >> > > > very good one. I agree that it would make more sense to add some > >> kind > >> > of > >> > > > "window close" concept to the window definition. > >> > > > > >> > > > In fact, doing that immediately solves the inconsistency problem > >> > Guozhang > >> > > > brought up. There's no need to add a "final results" or "emission" > >> > option > >> > > > to the windowed aggregation. > >> > > > > >> > > > What do you think about an API more like this: > >> > > > > >> > > > final StreamsBuilder builder = new StreamsBuilder(); > >> > > > > >> > > > builder > >> > > > .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) > >> > > > .groupBy( > >> > > > (String k1, String v1) -> k1, > >> > > > Serialized.with(STRING_SERDE, STRING_SERDE) > >> > > > ) > >> > > > .windowedBy(TimeWindows > >> > > > .of(scaledTime(2L)) > >> > > > .until(scaledTime(3L)) > >> > > > .allowedLateness(scaledTime(1L)) > >> > > > ) > >> > > > .count(Materialized.as("counts")) > >> > > > .suppress( > >> > > > emitFinalResultsOnly( > >> > > > BufferConfig.withBufferKeys(10_000L).bufferFullStrategy( > >> > SHUT_DOWN) > >> > > > ) > >> > > > ) > >> > > > .toStream() > >> > > > .to("output-suppressed", Produced.with(STRING_SERDE, > LONG_SERDE)); > >> > > > > >> > > > Note that: > >> > > > * "emitFinalResultsOnly" is available *only* on windowed tables > >> > > (enforced > >> > > > by the type system at compile time), and it determines the time to > >> wait > >> > > by > >> > > > looking at "allowedLateness" on the TimeWindows config. > >> > > > * querying "counts" will produce results (eventually) consistent > >> with > >> > > > what's observable in "output-suppressed". > >> > > > * in all cases, "suppress" has no effect on business logic, just > on > >> > > event > >> > > > suppression. > >> > > > > >> > > > Is this API straightforward? Or do you still prefer the version > that > >> > both > >> > > > proposed: > >> > > > > >> > > > ... > >> > > > .windowedBy(TimeWindows > >> > > > .of(scaledTime(2L)) > >> > > > .until(scaledTime(3L)) > >> > > > .allowedLateness(scaledTime(1L)) > >> > > > ) > >> > > > .count( > >> > > > Materialized.as("counts"), > >> > > > emitFinalResultsOnly( > >> > > > BufferConfig.withBufferKeys(10_000L).bufferFullStrategy( > >> > SHUT_DOWN) > >> > > > ) > >> > > > ) > >> > > > ... > >> > > > > >> > > > To me, these two are practically identical, and I still vaguely > >> prefer > >> > > the > >> > > > first one. > >> > > > > >> > > > The prototype has made clearer to me that users of "final results > >> for > >> > > > windows" and users of "suppression for table events" both need to > >> > > configure > >> > > > the suppression buffer. > >> > > > > >> > > > This buffer configuration consists of: > >> > > > 1. how many keys or bytes to keep in memory > >> > > > 2. what to do if memory runs out (shut down, start using disk, > ...) > >> > > > > >> > > > So it's not as simple as setting a "final results" flag. We'll > >> either > >> > > have > >> > > > an "Emit" config object on the windowed aggregators that takes the > >> same > >> > > > BufferConfig that the "Suppress" config on the suppression > >> operator, or > >> > > we > >> > > > just use the suppression operator for both. > >> > > > > >> > > > Perhaps it would sweeten the deal a little to point out that we > >> have 2 > >> > > > overloads already for each windowed aggregator (with and without > >> > > > Materialized). Adding "Emitted" or something would mean that we'd > >> add a > >> > > new > >> > > > overload for each one, taking us up to 4 overloads each for > "count", > >> > > > "aggregate" and "reduce". Using "suppress" means that we don't add > >> any > >> > > new > >> > > > overloads. > >> > > > > >> > > > Thanks again for helping to hash this out, > >> > > > -John > >> > > > > >> > > > On Fri, Jul 6, 2018 at 6:20 PM Guozhang Wang <wangg...@gmail.com> > >> > wrote: > >> > > > > >> > > > > I think I agree with Matthias for having dedicated APIs for > >> windowed > >> > > > > operation final output scenario, PLUS separating the window > close > >> > which > >> > > > the > >> > > > > "final output" would rely on, from the window retention time > >> itself > >> > > > > (admittedly it would make this KIP effort larger, but if we > >> believe > >> > we > >> > > > need > >> > > > > to do this separation anyways we could just do it now). > >> > > > > > >> > > > > And then we can have the `KTable#suppress()` for > >> > > intermediate-suppression > >> > > > > only, not for late-record-suppression, until we've seen that > >> becomes > >> > a > >> > > > > common feature request because our current design still allows > to > >> be > >> > > > > extended for that purpose. > >> > > > > > >> > > > > > >> > > > > Guozhang > >> > > > > > >> > > > > On Wed, Jul 4, 2018 at 12:53 PM, Matthias J. Sax < > >> > > matth...@confluent.io> > >> > > > > wrote: > >> > > > > > >> > > > > > Thanks for the discussion. I am just catching up. > >> > > > > > > >> > > > > > In general, I think we have different uses cases and > >> non-windowed > >> > and > >> > > > > > windowed is quite different. For the non-windowed case, > >> suppress() > >> > > has > >> > > > > > no (useful) close or retention time, no final semantics, and > >> also > >> > no > >> > > > > > business logic impact. > >> > > > > > > >> > > > > > On the other hand, for windowed aggregations, close time and > >> final > >> > > > > > result do have a meaning. IMHO, `close()` is part of business > >> logic > >> > > > > > while retention time is not. Also, suppression of intermediate > >> > result > >> > > > is > >> > > > > > not a business rule and there might be use case for which > either > >> > > "early > >> > > > > > intermediate" (before window end time) are suppressed only, or > >> all > >> > > > > > intermediates are suppressed (maybe also something in the > >> middle, > >> > ie, > >> > > > > > just reduce the load of intermediate updates). Thus, > >> > > window-suppression > >> > > > > > is much richer. > >> > > > > > > >> > > > > > IMHO, a generic `suppress()` operator that can be inserted > into > >> the > >> > > > data > >> > > > > > flow at any point is useful. Maybe we should keep is as > generic > >> as > >> > > > > > possible. However, it might be difficult to use with regard to > >> > > > > > windowing, as the mental effort to use it is high. > >> > > > > > > >> > > > > > With regard to Guozhang's comment: > >> > > > > > > >> > > > > > > we will actually > >> > > > > > > process data as old as 30 days as well, while most of the > late > >> > > > updates > >> > > > > > > beyond 5 minutes would be discarded anyways. > >> > > > > > > >> > > > > > If we use `suppress()` as a standalone operator, this is > correct > >> > and > >> > > > > > intended IMHO. To address the issue if the behavior is > >> unwanted, I > >> > > > would > >> > > > > > suggest to add a "suppress option" directly to > >> > > > > > `count()/reduce()/aggregate()` window operator similar to > >> > > > > > `Materialized`. This would be an "embedded suppress" and avoid > >> the > >> > > > > > issue. It would also address the issue about mental effort for > >> > > "single > >> > > > > > final window result" use case. > >> > > > > > > >> > > > > > I also think that a shorter close-time than retention time is > >> > useful > >> > > > for > >> > > > > > window aggregation. If we add close() to the window definition > >> and > >> > > > > > until() to `Materialized`, we can separate both correctly > IMHO. > >> > > > > > > >> > > > > > About setting `close = min(close,retention)` I am not sure. We > >> > might > >> > > > > > rather throw an exception than reducing the close time > >> > automatically. > >> > > > > > Otherwise, I see many user question about "I set close to X > but > >> it > >> > > does > >> > > > > > not get updated for some data that is with delay of X". > >> > > > > > > >> > > > > > The tricky question might be to design the API in a backward > >> > > compatible > >> > > > > > way though. > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > -Matthias > >> > > > > > > >> > > > > > On 7/3/18 5:38 AM, John Roesler wrote: > >> > > > > > > Hi Guozhang, > >> > > > > > > > >> > > > > > > I see. It seems like if we want to decouple 1) and 2), we > >> need to > >> > > > alter > >> > > > > > the > >> > > > > > > definition of the window. Do you think it would close the > gap > >> if > >> > we > >> > > > > > added a > >> > > > > > > "window close" time to the window definition? > >> > > > > > > > >> > > > > > > Such as: > >> > > > > > > > >> > > > > > > builder.stream("input") > >> > > > > > > .groupByKey() > >> > > > > > > .windowedBy( > >> > > > > > > TimeWindows > >> > > > > > > .of(60_000) > >> > > > > > > .closeAfter(10 * 60) > >> > > > > > > .until(30L * 24 * 60 * 60 * 1000) > >> > > > > > > ) > >> > > > > > > .count() > >> > > > > > > .suppress(Suppression.finalResultsOnly()); > >> > > > > > > > >> > > > > > > Possibly called "finalResultsAtWindowClose" or something? > >> > > > > > > > >> > > > > > > Thanks, > >> > > > > > > -John > >> > > > > > > > >> > > > > > > On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang < > >> wangg...@gmail.com > >> > > > >> > > > > wrote: > >> > > > > > > > >> > > > > > >> Hey John, > >> > > > > > >> > >> > > > > > >> Obviously I'm too lazy on email replying diligence compared > >> with > >> > > you > >> > > > > :) > >> > > > > > >> Will try to reply them separately: > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> ------------------------------ > ------------------------------ > >> > > > > > ----------------- > >> > > > > > >> > >> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 8:23 AM": > >> > > > > > >> > >> > > > > > >> I'm aware of this use case, but again, the concern is that, > >> in > >> > > this > >> > > > > > setting > >> > > > > > >> in order to let the window be queryable for 30 days, we > will > >> > > > actually > >> > > > > > >> process data as old as 30 days as well, while most of the > >> late > >> > > > updates > >> > > > > > >> beyond 5 minutes would be discarded anyways. Personally I > >> think > >> > > for > >> > > > > the > >> > > > > > >> final update scenario, the ideal situation users would want > >> is > >> > > that > >> > > > > "do > >> > > > > > not > >> > > > > > >> process any data that is less than 5 minutes, and of course > >> no > >> > > > update > >> > > > > > >> records to the downstream later than 5 minutes either; but > >> > retain > >> > > > the > >> > > > > > >> window to be queryable for 30 days". And by doing that the > >> final > >> > > > > window > >> > > > > > >> snapshot would also be aligned with the update stream as > >> well. > >> > In > >> > > > > other > >> > > > > > >> words, among these three periods: > >> > > > > > >> > >> > > > > > >> 1) the retention length of the window / table. > >> > > > > > >> 2) the late records acceptance for updating the window. > >> > > > > > >> 3) the late records update to be sent downstream. > >> > > > > > >> > >> > > > > > >> Final update use cases would naturally want 2) = 3), while > 1) > >> > may > >> > > be > >> > > > > > >> different and larger, while what we provide now is that 1) > = > >> 2), > >> > > > which > >> > > > > > >> could be different and in practice larger than 3), hence > not > >> the > >> > > > most > >> > > > > > >> intuitive for their needs. > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> ------------------------------ > ------------------------------ > >> > > > > > ----------------- > >> > > > > > >> > >> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 10:27 AM": > >> > > > > > >> > >> > > > > > >> I'd like option 2) over option 1) better as well from > >> > programming > >> > > > pov. > >> > > > > > But > >> > > > > > >> I'm wondering if option 2) would provide the above > semantics > >> or > >> > it > >> > > > is > >> > > > > > still > >> > > > > > >> coupling 1) with 2) as well ? > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> Guozhang > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> On Mon, Jul 2, 2018 at 1:08 PM, John Roesler < > >> j...@confluent.io > >> > > > >> > > > > wrote: > >> > > > > > >> > >> > > > > > >>> In fact, to push the idea further (which IIRC is what > >> Matthias > >> > > > > > originally > >> > > > > > >>> proposed), if we can accept "Suppression#finalResultsOnly" > >> in > >> > my > >> > > > last > >> > > > > > >>> email, then we could also consider whether to eliminate > >> > > > > > >>> "suppressLateEvents" entirely. > >> > > > > > >>> > >> > > > > > >>> We could always add it later, but you've both expressed > >> doubt > >> > > that > >> > > > > > there > >> > > > > > >>> are practical use cases for it outside of final-results. > >> > > > > > >>> > >> > > > > > >>> -John > >> > > > > > >>> > >> > > > > > >>> On Mon, Jul 2, 2018 at 12:27 PM John Roesler < > >> > j...@confluent.io> > >> > > > > > wrote: > >> > > > > > >>> > >> > > > > > >>>> Hi again, Guozhang ;) Here's the second part of my > >> response... > >> > > > > > >>>> > >> > > > > > >>>> It seems like your main concern is: "if I'm a user who > >> wants > >> > > final > >> > > > > > >> update > >> > > > > > >>>> semantics, how complicated is it for me to get it?" > >> > > > > > >>>> > >> > > > > > >>>> I think we have to assume that people don't always have > >> time > >> > to > >> > > > > become > >> > > > > > >>>> deeply familiar with all the nuances of a programming > >> > > environment > >> > > > > > >> before > >> > > > > > >>>> they use it. Especially if they're evaluating several > >> > frameworks > >> > > > for > >> > > > > > >>> their > >> > > > > > >>>> use case, it's very valuable to make it as obvious as > >> possible > >> > > how > >> > > > > to > >> > > > > > >>>> accomplish various computations with Streams. > >> > > > > > >>>> > >> > > > > > >>>> To me the biggest question is whether with a fresh > >> > perspective, > >> > > > > people > >> > > > > > >>>> would say "oh, I get it, I have to bound my lateness and > >> > > suppress > >> > > > > > >>>> intermediate updates, and of course I'll get only the > final > >> > > > > result!", > >> > > > > > >> or > >> > > > > > >>> if > >> > > > > > >>>> it's more like "wtf? all I want is the final result, what > >> are > >> > > all > >> > > > > > these > >> > > > > > >>>> parameters?". > >> > > > > > >>>> > >> > > > > > >>>> I was talking with Matthias a while back, and he had an > >> idea > >> > > that > >> > > > I > >> > > > > > >> think > >> > > > > > >>>> can help, which is to essentially set up a final-result > >> recipe > >> > > in > >> > > > > > >>> addition > >> > > > > > >>>> to the raw parameters. I previously thought that it > >> wouldn't > >> > be > >> > > > > > >> possible > >> > > > > > >>> to > >> > > > > > >>>> restrict its usage to Windowed KTables, but thinking > about > >> it > >> > > > again > >> > > > > > >> this > >> > > > > > >>>> weekend, I have a couple of ideas: > >> > > > > > >>>> > >> > > > > > >>>> ================ > >> > > > > > >>>> = 1. Static Wrapper = > >> > > > > > >>>> ================ > >> > > > > > >>>> We can define an extra static function that "wraps" a > >> KTable > >> > > with > >> > > > > > >>>> final-result semantics. > >> > > > > > >>>> > >> > > > > > >>>> public static <K extends Windowed, V> KTable<K, V> > >> > > > finalResultsOnly( > >> > > > > > >>>> final KTable<K, V> windowedKTable, > >> > > > > > >>>> final Duration maxAllowedLateness, > >> > > > > > >>>> final Suppression.BufferFullStrategy > bufferFullStrategy) > >> { > >> > > > > > >>>> return windowedKTable.suppress( > >> > > > > > >>>> Suppression.suppressLateEvents( > maxAllowedLateness) > >> > > > > > >>>> .suppressIntermediateEvents( > >> > > > > > >>>> IntermediateSuppression > >> > > > > > >>>> .emitAfter(maxAllowedLateness) > >> > > > > > >>>> .bufferFullStrategy( > >> > bufferFullStrategy) > >> > > > > > >>>> ) > >> > > > > > >>>> ); > >> > > > > > >>>> } > >> > > > > > >>>> > >> > > > > > >>>> Because windowedKTable is a parameter, the static > function > >> can > >> > > > > easily > >> > > > > > >>>> impose an extra bound on the key type, that it extends > >> > Windowed. > >> > > > > This > >> > > > > > >>> would > >> > > > > > >>>> make "final results only" only available on windowed > >> ktables. > >> > > > > > >>>> > >> > > > > > >>>> Here's how it would look to use: > >> > > > > > >>>> > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ... > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts = > >> > > > > > >>>> finalResultsOnly( > >> > > > > > >>>> windowCounts, > >> > > > > > >>>> Duration.ofMinutes(10), > >> > > > > > >>>> Suppression.BufferFullStrategy.SHUT_DOWN > >> > > > > > >>>> ); > >> > > > > > >>>> > >> > > > > > >>>> Trying to use it on a non-windowed KTable yields: > >> > > > > > >>>> > >> > > > > > >>>>> Error:(129, 35) java: method finalResultsOnly in class > >> > > > > > >>>>> org.apache.kafka.streams.kstream.internals. > >> > KTableAggregateTest > >> > > > > > cannot > >> > > > > > >>> be > >> > > > > > >>>>> applied to given types; > >> > > > > > >>>>> required: > >> > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<K,V>,java.time. > >> > > > > > >>> Duration,org.apache.kafka.streams.kstream.Suppression. > >> > > > > > BufferFullStrategy > >> > > > > > >>>>> found: > >> > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<java.lang. > >> > > > > > >>> String,java.lang.String>,java.time.Duration,org.apache. > >> > > > > > >>> kafka.streams.kstream.Suppression.BufferFullStrategy > >> > > > > > >>>>> reason: inference variable K has incompatible bounds > >> > > > > > >>>>> equality constraints: java.lang.String > >> > > > > > >>>>> upper bounds: > >> org.apache.kafka.streams.kstream.Windowed > >> > > > > > >>>> > >> > > > > > >>>> > >> > > > > > >>>> > >> > > > > > >>>> ================================================= > >> > > > > > >>>> = 2. Add <K,V> parameters and recipe method to > Suppression > >> = > >> > > > > > >>>> ================================================= > >> > > > > > >>>> > >> > > > > > >>>> By adding K,V parameters to Suppression, we can provide a > >> > > > similarly > >> > > > > > >>>> bounded config method directly on the Suppression class: > >> > > > > > >>>> > >> > > > > > >>>> public static <K extends Windowed, V> Suppression<K, V> > >> > > > > > >>>> finalResultsOnly(final Duration maxAllowedLateness, final > >> > > > > > >>>> BufferFullStrategy bufferFullStrategy) { > >> > > > > > >>>> return Suppression > >> > > > > > >>>> .<K, V>suppressLateEvents(maxAllowedLateness) > >> > > > > > >>>> .suppressIntermediateEvents( > IntermediateSuppression > >> > > > > > >>>> .emitAfter(maxAllowedLateness) > >> > > > > > >>>> .bufferFullStrategy(bufferFullStrategy) > >> > > > > > >>>> ); > >> > > > > > >>>> } > >> > > > > > >>>> > >> > > > > > >>>> Then, here's how it would look to use it: > >> > > > > > >>>> > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ... > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts = > >> > > > > > >>>> windowCounts.suppress( > >> > > > > > >>>> Suppression.finalResultsOnly( > >> > > > > > >>>> Duration.ofMinutes(10) > >> > > > > > >>>> Suppression.BufferFullStrategy.SHUT_DOWN > >> > > > > > >>>> ) > >> > > > > > >>>> ); > >> > > > > > >>>> > >> > > > > > >>>> Trying to use it on a non-windowed ktable yields: > >> > > > > > >>>> > >> > > > > > >>>>> Error:(127, 35) java: method finalResultsOnly in class > >> > > > > > >>>>> org.apache.kafka.streams.kstream.Suppression<K,V> > cannot > >> be > >> > > > applied > >> > > > > > to > >> > > > > > >>>>> given types; > >> > > > > > >>>>> required: > >> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream. > >> > > > > > >>> Suppression.BufferFullStrategy > >> > > > > > >>>>> found: > >> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream. > >> > > > > > >>> Suppression.BufferFullStrategy > >> > > > > > >>>>> reason: explicit type argument java.lang.String does > not > >> > > > conform > >> > > > > to > >> > > > > > >>>>> declared bound(s) > >> org.apache.kafka.streams.kstream.Windowed > >> > > > > > >>>> > >> > > > > > >>>> > >> > > > > > >>>> > >> > > > > > >>>> ============ > >> > > > > > >>>> = Downsides = > >> > > > > > >>>> ============ > >> > > > > > >>>> > >> > > > > > >>>> Of course, there's a downside either way: > >> > > > > > >>>> * for 1: this "wrapper" interaction would be the first > in > >> the > >> > > > DSL. > >> > > > > Is > >> > > > > > >> it > >> > > > > > >>>> too strange, and how discoverable would it be? > >> > > > > > >>>> * for 2: adding those type parameters to Suppression will > >> > force > >> > > > all > >> > > > > > >>>> callers to provide them in the event of a chained > >> construction > >> > > > > because > >> > > > > > >>> Java > >> > > > > > >>>> doesn't do RHS recursive type inference. This is already > >> > visible > >> > > > in > >> > > > > > >> other > >> > > > > > >>>> parts of the Streams DSL. For example, often calls to > >> > > Materialized > >> > > > > > >>> builders > >> > > > > > >>>> have to provide seemingly obvious type bounds. > >> > > > > > >>>> > >> > > > > > >>>> ============ > >> > > > > > >>>> = Conclusion = > >> > > > > > >>>> ============ > >> > > > > > >>>> > >> > > > > > >>>> I think option 2 is more "normal" and discoverable. It > does > >> > > have a > >> > > > > > >>>> downside, but it's one that's pre-existing elsewhere in > the > >> > DSL. > >> > > > > > >>>> > >> > > > > > >>>> WDYT? Would the addition of this "recipe" method to > >> > Suppression > >> > > > > > resolve > >> > > > > > >>>> your concern? > >> > > > > > >>>> > >> > > > > > >>>> Thanks again, > >> > > > > > >>>> -John > >> > > > > > >>>> > >> > > > > > >>>> On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang < > >> > > wangg...@gmail.com > >> > > > > > >> > > > > > >>> wrote: > >> > > > > > >>>> > >> > > > > > >>>>> Hi John, > >> > > > > > >>>>> > >> > > > > > >>>>> Regarding the metrics: yeah I think I'm with you that > the > >> > > dropped > >> > > > > > >>> records > >> > > > > > >>>>> due to window retention or emit suppression policies > >> should > >> > be > >> > > > > > >> recorded > >> > > > > > >>>>> differently, and using this KIP's proposed metric would > be > >> > > fine. > >> > > > If > >> > > > > > >> you > >> > > > > > >>>>> also think we can use this KIP's proposed metrics to > cover > >> > the > >> > > > > window > >> > > > > > >>>>> retention cased skipping records, then we can include > the > >> > > changes > >> > > > > in > >> > > > > > >>> this > >> > > > > > >>>>> KIP as well. > >> > > > > > >>>>> > >> > > > > > >>>>> Regarding the current proposal, I'm actually not too > >> worried > >> > > > about > >> > > > > > the > >> > > > > > >>>>> inconsistency between query semantics and downstream > emit > >> > > > > semantics. > >> > > > > > >> For > >> > > > > > >>>>> queries, we will always return the current running > >> results of > >> > > the > >> > > > > > >>> windows, > >> > > > > > >>>>> being it partial or final results depending on the > window > >> > > > retention > >> > > > > > >> time > >> > > > > > >>>>> anyways, which has nothing to do whether the emitted > >> stream > >> > > > should > >> > > > > be > >> > > > > > >>> one > >> > > > > > >>>>> final output per key or not. I also agree that having a > >> > unified > >> > > > > > >>> operation > >> > > > > > >>>>> is generally better for users to focus on leveraging > that > >> one > >> > > > only > >> > > > > > >> than > >> > > > > > >>>>> learning about two set of operations. The only question > I > >> had > >> > > is, > >> > > > > for > >> > > > > > >>>>> final > >> > > > > > >>>>> updates of window stores, if it is a bit awkward to > >> > understand > >> > > > the > >> > > > > > >>>>> configuration combo. Thinking about this more, I think > my > >> > root > >> > > > > worry > >> > > > > > >> in > >> > > > > > >>>>> the > >> > > > > > >>>>> "suppressLateEvents" call for windowed tables, since > from > >> a > >> > > user > >> > > > > > >>>>> perspective: if my retention time is X which means "pay > >> the > >> > > cost > >> > > > to > >> > > > > > >>> allow > >> > > > > > >>>>> late records up to X to still be applied updating the > >> > tables", > >> > > > why > >> > > > > > >>> would I > >> > > > > > >>>>> ever want to suppressLateEvents by Y ( < X), to say "do > >> not > >> > > send > >> > > > > the > >> > > > > > >>>>> updates up to Y, which means the downstream operator or > >> sink > >> > > > topic > >> > > > > > for > >> > > > > > >>>>> this > >> > > > > > >>>>> stream would actually see a truncated update stream > while > >> > I've > >> > > > paid > >> > > > > > >>> larger > >> > > > > > >>>>> cost for that"; and of course, Y > X would not make > sense > >> > > either > >> > > > as > >> > > > > > >> you > >> > > > > > >>>>> would not see any updates later than X anyways. So in > >> all, my > >> > > > > feeling > >> > > > > > >> is > >> > > > > > >>>>> that it makes less sense for windowed table's > >> > > > "suppressLateEvents" > >> > > > > > >> with > >> > > > > > >>> a > >> > > > > > >>>>> parameter that is not equal to the window retention, and > >> > > opening > >> > > > > the > >> > > > > > >>> door > >> > > > > > >>>>> in the current proposal may confuse people with that. > >> > > > > > >>>>> > >> > > > > > >>>>> Again, above is just a subjective opinion and probably > we > >> can > >> > > > also > >> > > > > > >> bring > >> > > > > > >>>>> up > >> > > > > > >>>>> some scenarios that users does want to set X != Y.. but > >> > > > personally > >> > > > > I > >> > > > > > >>> feel > >> > > > > > >>>>> that even if the semantics for this scenario if > intuitive > >> for > >> > > > user > >> > > > > to > >> > > > > > >>>>> understand, doe that really make sense and should we > >> really > >> > > open > >> > > > > the > >> > > > > > >>> door > >> > > > > > >>>>> for it. So I think maybe separating the final update in > a > >> > > > separate > >> > > > > > >> API's > >> > > > > > >>>>> benefits may overwhelm the advantage of having one > uniform > >> > > > > > definition. > >> > > > > > >>> And > >> > > > > > >>>>> for my alternative proposal, the rationale was from both > >> my > >> > > > concern > >> > > > > > >>> about > >> > > > > > >>>>> "suppressLateEvents" for windowed store, and Matthias' > >> > question > >> > > > > about > >> > > > > > >>>>> "suppressLateEvents" for non-windowed stores, that if it > >> is > >> > > less > >> > > > > > >>>>> meaningful > >> > > > > > >>>>> for both, we can consider removing it completely and > only > >> do > >> > > > > > >>>>> "IntermediateSuppression" in Suppress instead. > >> > > > > > >>>>> > >> > > > > > >>>>> So I'd summarize my thoughts in the following questions: > >> > > > > > >>>>> > >> > > > > > >>>>> 1. Does "suppressLateEvents" with parameter Y != X > (window > >> > > > > retention > >> > > > > > >>> time) > >> > > > > > >>>>> for windowed stores make sense in practice? > >> > > > > > >>>>> 2. Does "suppressLateEvents" with any parameter Y for > >> > > > non-windowed > >> > > > > > >>> stores > >> > > > > > >>>>> make sense in practice? > >> > > > > > >>>>> > >> > > > > > >>>>> > >> > > > > > >>>>> > >> > > > > > >>>>> Guozhang > >> > > > > > >>>>> > >> > > > > > >>>>> > >> > > > > > >>>>> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck < > >> > > bbej...@gmail.com> > >> > > > > > >> wrote: > >> > > > > > >>>>> > >> > > > > > >>>>>> Thanks for the explanation, that does make sense. I > have > >> > some > >> > > > > > >>>>> questions on > >> > > > > > >>>>>> operations, but I'll just wait for the PR and tests. > >> > > > > > >>>>>> > >> > > > > > >>>>>> Thanks, > >> > > > > > >>>>>> Bill > >> > > > > > >>>>>> > >> > > > > > >>>>>> On Wed, Jun 27, 2018 at 8:14 PM John Roesler < > >> > > j...@confluent.io > >> > > > > > >> > > > > > >>> wrote: > >> > > > > > >>>>>> > >> > > > > > >>>>>>> Hi Bill, > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> Thanks for the review! > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> Your question is very much applicable to the KIP and > >> not at > >> > > all > >> > > > > an > >> > > > > > >>>>>>> implementation detail. Thanks for bringing it up. > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> I'm proposing not to change the existing caches and > >> > > > > configurations > >> > > > > > >>> at > >> > > > > > >>>>> all > >> > > > > > >>>>>>> (for now). > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> Imagine you have a topology like this: > >> > > > > > >>>>>>> commit.interval.ms = 100 > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> (ktable1 (cached)) -> (suppress emitAfter 200) > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> The first ktable (ktable1) will respect the commit > >> interval > >> > > and > >> > > > > > >>> buffer > >> > > > > > >>>>>>> events for 100ms before logging, storing, or > forwarding > >> > them > >> > > > > > >> (IIRC). > >> > > > > > >>>>>>> Therefore, the second ktable (suppress) will only see > >> the > >> > > > events > >> > > > > > >> at > >> > > > > > >>> a > >> > > > > > >>>>>> rate > >> > > > > > >>>>>>> of once per 100ms. It will apply its own buffering, > and > >> > emit > >> > > > once > >> > > > > > >>> per > >> > > > > > >>>>>> 200ms > >> > > > > > >>>>>>> This case is pretty trivial because the suppress time > >> is a > >> > > > > > >> multiple > >> > > > > > >>> of > >> > > > > > >>>>>> the > >> > > > > > >>>>>>> commit interval. > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> When it's not an integer multiple, you'll get behavior > >> like > >> > > in > >> > > > > > >> this > >> > > > > > >>>>>> marble > >> > > > > > >>>>>>> diagram: > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)-> > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> [ KTable caching with commit interval = 2 ] > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> <--------(k:2)---------(k:4)---------(k:6)-> > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> [ suppress with emitAfter = 3 ] > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> <---------------(k:2)----------------(k:6)-> > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> If this behavior isn't desired (for example, if you > >> wanted > >> > to > >> > > > > emit > >> > > > > > >>>>> (k:3) > >> > > > > > >>>>>> at > >> > > > > > >>>>>>> time 3, I'd recommend setting the > >> > "cache.max.bytes.buffering" > >> > > > to > >> > > > > 0 > >> > > > > > >>> or > >> > > > > > >>>>>>> modifying the topology to disable caching. Then, the > >> > behavior > >> > > > is > >> > > > > > >>> more > >> > > > > > >>>>>>> simply determined just by the suppress operator. > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> Does that seem right to you? > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> Regarding the changelogs, because the suppression > >> operator > >> > > > hangs > >> > > > > > >>> onto > >> > > > > > >>>>>>> events for a while, it will need its own changelog. > The > >> > > > changelog > >> > > > > > >>>>>>> should represent the current state of the buffer at > all > >> > > times. > >> > > > So > >> > > > > > >>> when > >> > > > > > >>>>>> the > >> > > > > > >>>>>>> suppress operator sees (k:2), for example, it will log > >> > (k:2). > >> > > > > When > >> > > > > > >>> it > >> > > > > > >>>>>>> later gets to time 3, it's time to emit (k:2) > >> downstream. > >> > > > Because > >> > > > > > >> k > >> > > > > > >>>>> is no > >> > > > > > >>>>>>> longer buffered, the suppress operator will log > >> (k:null). > >> > > Thus, > >> > > > > > >> when > >> > > > > > >>>>>>> recovering, > >> > > > > > >>>>>>> it can rebuild the buffer by reading its changelog. > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> What do you think about this? > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> Thanks, > >> > > > > > >>>>>>> -John > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck < > >> > > bbej...@gmail.com > >> > > > > > >> > > > > > >>>>> wrote: > >> > > > > > >>>>>>> > >> > > > > > >>>>>>>> Hi John, thanks for the KIP. > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>> Early on in the KIP, you mention the current > approaches > >> > for > >> > > > > > >>>>> controlling > >> > > > > > >>>>>>> the > >> > > > > > >>>>>>>> rate of downstream records from a KTable, cache size > >> > > > > > >> configuration > >> > > > > > >>>>> and > >> > > > > > >>>>>>>> commit time. > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>> Will these configuration parameters still be in > effect > >> for > >> > > > > > >> tables > >> > > > > > >>>>> that > >> > > > > > >>>>>>>> don't use suppression? For tables taking advantage > of > >> > > > > > >>> suppression, > >> > > > > > >>>>>> will > >> > > > > > >>>>>>>> these configurations have no impact? > >> > > > > > >>>>>>>> This last question may be to implementation specific > >> but > >> > if > >> > > > the > >> > > > > > >>>>>> requested > >> > > > > > >>>>>>>> suppression time is longer than the specified commit > >> time, > >> > > > will > >> > > > > > >>> the > >> > > > > > >>>>>>> latest > >> > > > > > >>>>>>>> record in the suppression buffer get stored in a > >> > changelog? > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>> Thanks, > >> > > > > > >>>>>>>> Bill > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>> On Wed, Jun 27, 2018 at 3:04 PM John Roesler < > >> > > > j...@confluent.io > >> > > > > > >>> > >> > > > > > >>>>>> wrote: > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>>> Thanks for the feedback, Matthias, > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> It seems like in straightforward relational > processing > >> > > cases, > >> > > > > > >> it > >> > > > > > >>>>>> would > >> > > > > > >>>>>>>> not > >> > > > > > >>>>>>>>> make sense to bound the lateness of KTables. In > >> general, > >> > it > >> > > > > > >>> seems > >> > > > > > >>>>>>> better > >> > > > > > >>>>>>>> to > >> > > > > > >>>>>>>>> have "guard rails" in place that make it easier to > >> write > >> > > > > > >>> sensible > >> > > > > > >>>>>>>> programs > >> > > > > > >>>>>>>>> than insensible ones. > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> But I'm still going to argue in favor of keeping it > >> for > >> > all > >> > > > > > >>>>> KTables > >> > > > > > >>>>>> ;) > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> 1. I believe it is simpler to understand the > operator > >> if > >> > it > >> > > > > > >> has > >> > > > > > >>>>> one > >> > > > > > >>>>>>>> uniform > >> > > > > > >>>>>>>>> definition, regardless of context. It's well defined > >> and > >> > > > > > >>> intuitive > >> > > > > > >>>>>> what > >> > > > > > >>>>>>>>> will happen when you use late-event suppression on a > >> > > KTable, > >> > > > > > >> so > >> > > > > > >>> I > >> > > > > > >>>>>> think > >> > > > > > >>>>>>>>> nothing surprising or dangerous will happen in that > >> case. > >> > > > From > >> > > > > > >>> my > >> > > > > > >>>>>>>>> perspective, having two sets of allowed operations > is > >> > > > actually > >> > > > > > >>> an > >> > > > > > >>>>>>>> increase > >> > > > > > >>>>>>>>> in cognitive complexity. > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> 2. To me, it's not crazy to use the operator this > way. > >> > For > >> > > > > > >>>>> example, > >> > > > > > >>>>>> in > >> > > > > > >>>>>>>> lieu > >> > > > > > >>>>>>>>> of full-featured timestamp semantics, I can > implement > >> > MVCC > >> > > > > > >>>>> behavior > >> > > > > > >>>>>>> when > >> > > > > > >>>>>>>>> building a KTable by > >> "suppressLateEvents(Duration.ZERO)". > >> > I > >> > > > > > >>>>> suspect > >> > > > > > >>>>>>> that > >> > > > > > >>>>>>>>> there are other, non-obvious applications of > >> suppressing > >> > > late > >> > > > > > >>>>> events > >> > > > > > >>>>>> on > >> > > > > > >>>>>>>>> KTables. > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> 3. Not to get too much into implementation details > in > >> a > >> > KIP > >> > > > > > >>>>>> discussion, > >> > > > > > >>>>>>>> but > >> > > > > > >>>>>>>>> if we did want to make late-event suppression > >> available > >> > > only > >> > > > > > >> on > >> > > > > > >>>>>>> windowed > >> > > > > > >>>>>>>>> KTables, we have two enforcement options: > >> > > > > > >>>>>>>>> a. check when we build the topology - this would > be > >> > > simple > >> > > > > > >> to > >> > > > > > >>>>>>>> implement, > >> > > > > > >>>>>>>>> but would be a runtime check. Hopefully, people > write > >> > tests > >> > > > > > >> for > >> > > > > > >>>>> their > >> > > > > > >>>>>>>>> topology before deploying them, so the feedback loop > >> > isn't > >> > > > > > >>>>>>> instantaneous, > >> > > > > > >>>>>>>>> but it's not too long either. > >> > > > > > >>>>>>>>> b. add a new WindowedKTable type - this would be a > >> > > compile > >> > > > > > >>> time > >> > > > > > >>>>>>> check, > >> > > > > > >>>>>>>>> but would also be substantial increase of both > >> interface > >> > > and > >> > > > > > >>> code > >> > > > > > >>>>>>>>> complexity. > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> We should definitely strive to have guard rails > >> > protecting > >> > > > > > >>> against > >> > > > > > >>>>>>>>> surprising or dangerous behavior. Protecting against > >> > > programs > >> > > > > > >>>>> that we > >> > > > > > >>>>>>>> don't > >> > > > > > >>>>>>>>> currently predict is a lesser benefit, and I think > we > >> can > >> > > put > >> > > > > > >> up > >> > > > > > >>>>>> guard > >> > > > > > >>>>>>>>> rails on a case-by-case basis for that. It seems > like > >> the > >> > > > > > >>>>> increase in > >> > > > > > >>>>>>>>> cognitive (and potentially code and interface) > >> complexity > >> > > > > > >> makes > >> > > > > > >>> me > >> > > > > > >>>>>>> think > >> > > > > > >>>>>>>> we > >> > > > > > >>>>>>>>> should skip this case. > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> What do you think? > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> Thanks, > >> > > > > > >>>>>>>>> -John > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax < > >> > > > > > >>>>>>> matth...@confluent.io> > >> > > > > > >>>>>>>>> wrote: > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>>> Thanks for the KIP John. > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> One initial comments about the last example > "Bounded > >> > > > > > >>> lateness": > >> > > > > > >>>>>> For a > >> > > > > > >>>>>>>>>> non-windowed KTable bounding the lateness does not > >> > really > >> > > > > > >> make > >> > > > > > >>>>>> sense, > >> > > > > > >>>>>>>>>> does it? > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> Thus, I am wondering if we should allow > >> > > > > > >> `suppressLateEvents()` > >> > > > > > >>>>> for > >> > > > > > >>>>>>> this > >> > > > > > >>>>>>>>>> case? It seems to be better to only allow it for > >> > > > > > >>>>> windowed-KTables. > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> -Matthias > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> On 6/27/18 8:53 AM, Ted Yu wrote: > >> > > > > > >>>>>>>>>>> I noticed this (lack of primary parameter) as > well. > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>> What you gave as new example is semantically the > >> same > >> > as > >> > > > > > >>> what > >> > > > > > >>>>> I > >> > > > > > >>>>>>>>>> suggested. > >> > > > > > >>>>>>>>>>> So it is good by me. > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>> Thanks > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler < > >> > > > > > >>>>> j...@confluent.io > >> > > > > > >>>>>>> > >> > > > > > >>>>>>>>> wrote: > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> Thanks for taking look, Ted, > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> I agree this is a departure from the conventions > of > >> > > > > > >> Streams > >> > > > > > >>>>> DSL. > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> Most of our config objects have one or two > >> "required" > >> > > > > > >>>>>> parameters, > >> > > > > > >>>>>>>>> which > >> > > > > > >>>>>>>>>> fit > >> > > > > > >>>>>>>>>>>> naturally with the static factory method > approach. > >> > > > > > >>>>> TimeWindow, > >> > > > > > >>>>>> for > >> > > > > > >>>>>>>>>> example, > >> > > > > > >>>>>>>>>>>> requires a size parameter, so we can naturally > say > >> > > > > > >>>>>>>>> TimeWindows.of(size). > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> I think in the case of a suppression, there's > >> really > >> > no > >> > > > > > >>>>> "core" > >> > > > > > >>>>>>>>>> parameter, > >> > > > > > >>>>>>>>>>>> and "Suppression.of()" seems sillier than "new > >> > > > > > >>>>> Suppression()". I > >> > > > > > >>>>>>>> think > >> > > > > > >>>>>>>>>> that > >> > > > > > >>>>>>>>>>>> Suppression.of(duration) would be ambiguous, > since > >> > there > >> > > > > > >>> are > >> > > > > > >>>>>> many > >> > > > > > >>>>>>>>>> durations > >> > > > > > >>>>>>>>>>>> that we can configure. > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> However, thinking about it again, I suppose that > I > >> can > >> > > > > > >> give > >> > > > > > >>>>> each > >> > > > > > >>>>>>>>>>>> configuration method a static version, which > would > >> let > >> > > > > > >> you > >> > > > > > >>>>>> replace > >> > > > > > >>>>>>>>> "new > >> > > > > > >>>>>>>>>>>> Suppression()." with "Suppression." in all the > >> > examples. > >> > > > > > >>>>>>> Basically, > >> > > > > > >>>>>>>>>> instead > >> > > > > > >>>>>>>>>>>> of "of()", we'd support any of the methods I > >> listed. > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> For example: > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> windowCounts > >> > > > > > >>>>>>>>>>>> .suppress( > >> > > > > > >>>>>>>>>>>> Suppression > >> > > > > > >>>>>>>>>>>> .suppressLateEvents(Duration. > >> > ofMinutes(10)) > >> > > > > > >>>>>>>>>>>> .suppressIntermediateEvents( > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> IntermediateSuppression.emitAfter(Duration.ofMinutes( > >> > 10)) > >> > > > > > >>>>>>>>>>>> ) > >> > > > > > >>>>>>>>>>>> ); > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> Does that seem better? > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> Thanks, > >> > > > > > >>>>>>>>>>>> -John > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu < > >> > > > > > >>> yuzhih...@gmail.com > >> > > > > > >>>>>> > >> > > > > > >>>>>>>> wrote: > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> I started to read this KIP which contains a lot > of > >> > > > > > >>>>> materials. > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> One suggestion: > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> .suppress( > >> > > > > > >>>>>>>>>>>>> new Suppression() > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> Do you think it would be more consistent with > the > >> > rest > >> > > > > > >> of > >> > > > > > >>>>>> Streams > >> > > > > > >>>>>>>>> data > >> > > > > > >>>>>>>>>>>>> structures by supporting `of` ? > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> Suppression.of(Duration.ofMinutes(10)) > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> Cheers > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler < > >> > > > > > >>>>>> j...@confluent.io > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>>>> wrote: > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>> Hello devs and users, > >> > > > > > >>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>> Please take some time to consider this proposal > >> for > >> > > > > > >> Kafka > >> > > > > > >>>>>>> Streams: > >> > > > > > >>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>> KIP-328: Ability to suppress updates for > KTables > >> > > > > > >>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>> link: > >> https://cwiki.apache.org/confluence/x/sQU0BQ > >> > > > > > >>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>> The basic idea is to provide: > >> > > > > > >>>>>>>>>>>>>> * more usable control over update rate (vs the > >> > current > >> > > > > > >>>>> state > >> > > > > > >>>>>>> store > >> > > > > > >>>>>>>>>>>>> caches) > >> > > > > > >>>>>>>>>>>>>> * the final-result-for-windowed-computations > >> > feature > >> > > > > > >>> which > >> > > > > > >>>>>>> several > >> > > > > > >>>>>>>>>>>> people > >> > > > > > >>>>>>>>>>>>>> have requested > >> > > > > > >>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>> I look forward to your feedback! > >> > > > > > >>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>>> Thanks, > >> > > > > > >>>>>>>>>>>>>> -John > >> > > > > > >>>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>> > >> > > > > > >>>>> > >> > > > > > >>>>> > >> > > > > > >>>>> > >> > > > > > >>>>> -- > >> > > > > > >>>>> -- Guozhang > >> > > > > > >>>>> > >> > > > > > >>>> > >> > > > > > >>> > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> -- > >> > > > > > >> -- Guozhang > >> > > > > > >> > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > -- > >> > > > > -- Guozhang > >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > -- Guozhang > >> > > > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > -- -- Guozhang