John,

Thanks for your replies. As for the two options of the API, I think I'm
slightly inclined to the first option as well. My motivation is a bit
different, as I think of the first one maybe more flexible, for example:

KTable<Windowed<..>> table = ... count();

table.toStream().peek(..);   // want to peek at the changelog stream, do
not care about final results.

table.suppress().toStream().to("topic");    // sending to a topic, want to
only send the final results.

--------------

Besides that, I have a few more minor questions:

1. For "allowedLateness", what should be the default value? I.e. if user do
not specify "allowedLateness" in TimeWindows, what value should we set?

2. For API names, some personal suggestions here:

2.a) "allowedLateness"  -> "until" (semantics changed, and also value is
defined as delta on top of window length), where "until" ->
"retentionPeriod", and the latter will be removed from `Windows` to `
WindowStoreBuilder` in the future.

2.b) "BufferConfig" -> "Buffered" ?



Guozhang


On Mon, Jul 9, 2018 at 2:09 PM, John Roesler <j...@confluent.io> wrote:

> Hey Matthias and Guozhang,
>
> Sorry for the slow reply. I was mulling about your feedback and weighing
> some ideas in a sketchbook PR: https://github.com/apache/kafka/pull/5337.
>
> Your thought about keeping suppression independent of business logic is a
> very good one. I agree that it would make more sense to add some kind of
> "window close" concept to the window definition.
>
> In fact, doing that immediately solves the inconsistency problem Guozhang
> brought up. There's no need to add a "final results" or "emission" option
> to the windowed aggregation.
>
> What do you think about an API more like this:
>
> final StreamsBuilder builder = new StreamsBuilder();
>
> builder
>   .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
>   .groupBy(
>     (String k1, String v1) -> k1,
>     Serialized.with(STRING_SERDE, STRING_SERDE)
>   )
>   .windowedBy(TimeWindows
>     .of(scaledTime(2L))
>     .until(scaledTime(3L))
>     .allowedLateness(scaledTime(1L))
>   )
>   .count(Materialized.as("counts"))
>   .suppress(
>     emitFinalResultsOnly(
>       BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(SHUT_DOWN)
>     )
>   )
>   .toStream()
>   .to("output-suppressed", Produced.with(STRING_SERDE, LONG_SERDE));
>
> Note that:
>  * "emitFinalResultsOnly" is available *only* on windowed tables (enforced
> by the type system at compile time), and it determines the time to wait by
> looking at "allowedLateness" on the TimeWindows config.
>  * querying "counts" will produce results (eventually) consistent with
> what's observable in "output-suppressed".
>  * in all cases, "suppress" has no effect on business logic, just on event
> suppression.
>
> Is this API straightforward? Or do you still prefer the version that both
> proposed:
>
>   ...
>   .windowedBy(TimeWindows
>     .of(scaledTime(2L))
>     .until(scaledTime(3L))
>     .allowedLateness(scaledTime(1L))
>   )
>   .count(
>     Materialized.as("counts"),
>     emitFinalResultsOnly(
>       BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(SHUT_DOWN)
>     )
>   )
>   ...
>
> To me, these two are practically identical, and I still vaguely prefer the
> first one.
>
> The prototype has made clearer to me that users of "final results for
> windows" and users of "suppression for table events" both need to configure
> the suppression buffer.
>
> This buffer configuration consists of:
> 1. how many keys or bytes to keep in memory
> 2. what to do if memory runs out (shut down, start using disk, ...)
>
> So it's not as simple as setting a "final results" flag. We'll either have
> an "Emit" config object on the windowed aggregators that takes the same
> BufferConfig that the "Suppress" config on the suppression operator, or we
> just use the suppression operator for both.
>
> Perhaps it would sweeten the deal a little to point out that we have 2
> overloads already for each windowed aggregator (with and without
> Materialized). Adding "Emitted" or something would mean that we'd add a new
> overload for each one, taking us up to 4 overloads each for "count",
> "aggregate" and "reduce". Using "suppress" means that we don't add any new
> overloads.
>
> Thanks again for helping to hash this out,
> -John
>
> On Fri, Jul 6, 2018 at 6:20 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > I think I agree with Matthias for having dedicated APIs for windowed
> > operation final output scenario, PLUS separating the window close which
> the
> > "final output" would rely on, from the window retention time itself
> > (admittedly it would make this KIP effort larger, but if we believe we
> need
> > to do this separation anyways we could just do it now).
> >
> > And then we can have the `KTable#suppress()` for intermediate-suppression
> > only, not for late-record-suppression, until we've seen that becomes a
> > common feature request because our current design still allows to be
> > extended for that purpose.
> >
> >
> > Guozhang
> >
> > On Wed, Jul 4, 2018 at 12:53 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Thanks for the discussion. I am just catching up.
> > >
> > > In general, I think we have different uses cases and non-windowed and
> > > windowed is quite different. For the non-windowed case, suppress() has
> > > no (useful) close or retention time, no final semantics, and also no
> > > business logic impact.
> > >
> > > On the other hand, for windowed aggregations, close time and final
> > > result do have a meaning. IMHO, `close()` is part of business logic
> > > while retention time is not. Also, suppression of intermediate result
> is
> > > not a business rule and there might be use case for which either "early
> > > intermediate" (before window end time) are suppressed only, or all
> > > intermediates are suppressed (maybe also something in the middle, ie,
> > > just reduce the load of intermediate updates). Thus, window-suppression
> > > is much richer.
> > >
> > > IMHO, a generic `suppress()` operator that can be inserted into the
> data
> > > flow at any point is useful. Maybe we should keep is as generic as
> > > possible. However, it might be difficult to use with regard to
> > > windowing, as the mental effort to use it is high.
> > >
> > > With regard to Guozhang's comment:
> > >
> > > > we will actually
> > > > process data as old as 30 days as well, while most of the late
> updates
> > > > beyond 5 minutes would be discarded anyways.
> > >
> > > If we use `suppress()` as a standalone operator, this is correct and
> > > intended IMHO. To address the issue if the behavior is unwanted, I
> would
> > > suggest to add a "suppress option" directly to
> > > `count()/reduce()/aggregate()` window operator similar to
> > > `Materialized`. This would be an "embedded suppress" and avoid the
> > > issue. It would also address the issue about mental effort for "single
> > > final window result" use case.
> > >
> > > I also think that a shorter close-time than retention time is useful
> for
> > > window aggregation. If we add close() to the window definition and
> > > until() to `Materialized`, we can separate both correctly IMHO.
> > >
> > > About setting `close = min(close,retention)` I am not sure. We might
> > > rather throw an exception than reducing the close time automatically.
> > > Otherwise, I see many user question about "I set close to X but it does
> > > not get updated for some data that is with delay of X".
> > >
> > > The tricky question might be to design the API in a backward compatible
> > > way though.
> > >
> > >
> > >
> > > -Matthias
> > >
> > > On 7/3/18 5:38 AM, John Roesler wrote:
> > > > Hi Guozhang,
> > > >
> > > > I see. It seems like if we want to decouple 1) and 2), we need to
> alter
> > > the
> > > > definition of the window. Do you think it would close the gap if we
> > > added a
> > > > "window close" time to the window definition?
> > > >
> > > > Such as:
> > > >
> > > > builder.stream("input")
> > > > .groupByKey()
> > > > .windowedBy(
> > > >   TimeWindows
> > > >     .of(60_000)
> > > >     .closeAfter(10 * 60)
> > > >     .until(30L * 24 * 60 * 60 * 1000)
> > > > )
> > > > .count()
> > > > .suppress(Suppression.finalResultsOnly());
> > > >
> > > > Possibly called "finalResultsAtWindowClose" or something?
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > > >
> > > >> Hey John,
> > > >>
> > > >> Obviously I'm too lazy on email replying diligence compared with you
> > :)
> > > >> Will try to reply them separately:
> > > >>
> > > >>
> > > >> ------------------------------------------------------------
> > > -----------------
> > > >>
> > > >> To reply your email on "Mon, Jul 2, 2018 at 8:23 AM":
> > > >>
> > > >> I'm aware of this use case, but again, the concern is that, in this
> > > setting
> > > >> in order to let the window be queryable for 30 days, we will
> actually
> > > >> process data as old as 30 days as well, while most of the late
> updates
> > > >> beyond 5 minutes would be discarded anyways. Personally I think for
> > the
> > > >> final update scenario, the ideal situation users would want is that
> > "do
> > > not
> > > >> process any data that is less than 5 minutes, and of course no
> update
> > > >> records to the downstream later than 5 minutes either; but retain
> the
> > > >> window to be queryable for 30 days". And by doing that the final
> > window
> > > >> snapshot would also be aligned with the update stream as well. In
> > other
> > > >> words, among these three periods:
> > > >>
> > > >> 1) the retention length of the window / table.
> > > >> 2) the late records acceptance for updating the window.
> > > >> 3) the late records update to be sent downstream.
> > > >>
> > > >> Final update use cases would naturally want 2) = 3), while 1) may be
> > > >> different and larger, while what we provide now is that 1) = 2),
> which
> > > >> could be different and in practice larger than 3), hence not the
> most
> > > >> intuitive for their needs.
> > > >>
> > > >>
> > > >>
> > > >> ------------------------------------------------------------
> > > -----------------
> > > >>
> > > >> To reply your email on "Mon, Jul 2, 2018 at 10:27 AM":
> > > >>
> > > >> I'd like option 2) over option 1) better as well from programming
> pov.
> > > But
> > > >> I'm wondering if option 2) would provide the above semantics or it
> is
> > > still
> > > >> coupling 1) with 2) as well ?
> > > >>
> > > >>
> > > >>
> > > >> Guozhang
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Mon, Jul 2, 2018 at 1:08 PM, John Roesler <j...@confluent.io>
> > wrote:
> > > >>
> > > >>> In fact, to push the idea further (which IIRC is what Matthias
> > > originally
> > > >>> proposed), if we can accept "Suppression#finalResultsOnly" in my
> last
> > > >>> email, then we could also consider whether to eliminate
> > > >>> "suppressLateEvents" entirely.
> > > >>>
> > > >>> We could always add it later, but you've both expressed doubt that
> > > there
> > > >>> are practical use cases for it outside of final-results.
> > > >>>
> > > >>> -John
> > > >>>
> > > >>> On Mon, Jul 2, 2018 at 12:27 PM John Roesler <j...@confluent.io>
> > > wrote:
> > > >>>
> > > >>>> Hi again, Guozhang ;) Here's the second part of my response...
> > > >>>>
> > > >>>> It seems like your main concern is: "if I'm a user who wants final
> > > >> update
> > > >>>> semantics, how complicated is it for me to get it?"
> > > >>>>
> > > >>>> I think we have to assume that people don't always have time to
> > become
> > > >>>> deeply familiar with all the nuances of a programming environment
> > > >> before
> > > >>>> they use it. Especially if they're evaluating several frameworks
> for
> > > >>> their
> > > >>>> use case, it's very valuable to make it as obvious as possible how
> > to
> > > >>>> accomplish various computations with Streams.
> > > >>>>
> > > >>>> To me the biggest question is whether with a fresh perspective,
> > people
> > > >>>> would say "oh, I get it, I have to bound my lateness and suppress
> > > >>>> intermediate updates, and of course I'll get only the final
> > result!",
> > > >> or
> > > >>> if
> > > >>>> it's more like "wtf? all I want is the final result, what are all
> > > these
> > > >>>> parameters?".
> > > >>>>
> > > >>>> I was talking with Matthias a while back, and he had an idea that
> I
> > > >> think
> > > >>>> can help, which is to essentially set up a final-result recipe in
> > > >>> addition
> > > >>>> to the raw parameters. I previously thought that it wouldn't be
> > > >> possible
> > > >>> to
> > > >>>> restrict its usage to Windowed KTables, but thinking about it
> again
> > > >> this
> > > >>>> weekend, I have a couple of ideas:
> > > >>>>
> > > >>>> ================
> > > >>>> = 1. Static Wrapper =
> > > >>>> ================
> > > >>>> We can define an extra static function that "wraps" a KTable with
> > > >>>> final-result semantics.
> > > >>>>
> > > >>>> public static <K extends Windowed, V> KTable<K, V>
> finalResultsOnly(
> > > >>>>   final KTable<K, V> windowedKTable,
> > > >>>>   final Duration maxAllowedLateness,
> > > >>>>   final Suppression.BufferFullStrategy bufferFullStrategy) {
> > > >>>>     return windowedKTable.suppress(
> > > >>>>         Suppression.suppressLateEvents(maxAllowedLateness)
> > > >>>>                    .suppressIntermediateEvents(
> > > >>>>                      IntermediateSuppression
> > > >>>>                        .emitAfter(maxAllowedLateness)
> > > >>>>                        .bufferFullStrategy(bufferFullStrategy)
> > > >>>>                    )
> > > >>>>     );
> > > >>>> }
> > > >>>>
> > > >>>> Because windowedKTable is a parameter, the static function can
> > easily
> > > >>>> impose an extra bound on the key type, that it extends Windowed.
> > This
> > > >>> would
> > > >>>> make "final results only" only available on windowed ktables.
> > > >>>>
> > > >>>> Here's how it would look to use:
> > > >>>>
> > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ...
> > > >>>> final KTable<Windowed<Integer>, Long> finalCounts =
> > > >>>>   finalResultsOnly(
> > > >>>>     windowCounts,
> > > >>>>     Duration.ofMinutes(10),
> > > >>>>     Suppression.BufferFullStrategy.SHUT_DOWN
> > > >>>>   );
> > > >>>>
> > > >>>> Trying to use it on a non-windowed KTable yields:
> > > >>>>
> > > >>>>> Error:(129, 35) java: method finalResultsOnly in class
> > > >>>>> org.apache.kafka.streams.kstream.internals.KTableAggregateTest
> > > cannot
> > > >>> be
> > > >>>>> applied to given types;
> > > >>>>>   required:
> > > >>>>> org.apache.kafka.streams.kstream.KTable<K,V>,java.time.
> > > >>> Duration,org.apache.kafka.streams.kstream.Suppression.
> > > BufferFullStrategy
> > > >>>>>   found:
> > > >>>>> org.apache.kafka.streams.kstream.KTable<java.lang.
> > > >>> String,java.lang.String>,java.time.Duration,org.apache.
> > > >>> kafka.streams.kstream.Suppression.BufferFullStrategy
> > > >>>>>   reason: inference variable K has incompatible bounds
> > > >>>>>     equality constraints: java.lang.String
> > > >>>>>     upper bounds: org.apache.kafka.streams.kstream.Windowed
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> =================================================
> > > >>>> = 2. Add <K,V> parameters and recipe method to Suppression =
> > > >>>> =================================================
> > > >>>>
> > > >>>> By adding K,V parameters to Suppression, we can provide a
> similarly
> > > >>>> bounded config method directly on the Suppression class:
> > > >>>>
> > > >>>> public static <K extends Windowed, V> Suppression<K, V>
> > > >>>> finalResultsOnly(final Duration maxAllowedLateness, final
> > > >>>> BufferFullStrategy bufferFullStrategy) {
> > > >>>>     return Suppression
> > > >>>>         .<K, V>suppressLateEvents(maxAllowedLateness)
> > > >>>>         .suppressIntermediateEvents(IntermediateSuppression
> > > >>>>             .emitAfter(maxAllowedLateness)
> > > >>>>             .bufferFullStrategy(bufferFullStrategy)
> > > >>>>         );
> > > >>>> }
> > > >>>>
> > > >>>> Then, here's how it would look to use it:
> > > >>>>
> > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ...
> > > >>>> final KTable<Windowed<Integer>, Long> finalCounts =
> > > >>>>   windowCounts.suppress(
> > > >>>>     Suppression.finalResultsOnly(
> > > >>>>       Duration.ofMinutes(10)
> > > >>>>       Suppression.BufferFullStrategy.SHUT_DOWN
> > > >>>>     )
> > > >>>>   );
> > > >>>>
> > > >>>> Trying to use it on a non-windowed ktable yields:
> > > >>>>
> > > >>>>> Error:(127, 35) java: method finalResultsOnly in class
> > > >>>>> org.apache.kafka.streams.kstream.Suppression<K,V> cannot be
> applied
> > > to
> > > >>>>> given types;
> > > >>>>>   required:
> > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream.
> > > >>> Suppression.BufferFullStrategy
> > > >>>>>   found:
> > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream.
> > > >>> Suppression.BufferFullStrategy
> > > >>>>>   reason: explicit type argument java.lang.String does not
> conform
> > to
> > > >>>>> declared bound(s) org.apache.kafka.streams.kstream.Windowed
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> ============
> > > >>>> = Downsides =
> > > >>>> ============
> > > >>>>
> > > >>>> Of course, there's a downside either way:
> > > >>>> * for 1:  this "wrapper" interaction would be the first in the
> DSL.
> > Is
> > > >> it
> > > >>>> too strange, and how discoverable would it be?
> > > >>>> * for 2: adding those type parameters to Suppression will force
> all
> > > >>>> callers to provide them in the event of a chained construction
> > because
> > > >>> Java
> > > >>>> doesn't do RHS recursive type inference. This is already visible
> in
> > > >> other
> > > >>>> parts of the Streams DSL. For example, often calls to Materialized
> > > >>> builders
> > > >>>> have to provide seemingly obvious type bounds.
> > > >>>>
> > > >>>> ============
> > > >>>> = Conclusion =
> > > >>>> ============
> > > >>>>
> > > >>>> I think option 2 is more "normal" and discoverable. It does have a
> > > >>>> downside, but it's one that's pre-existing elsewhere in the DSL.
> > > >>>>
> > > >>>> WDYT? Would the addition of this "recipe" method to Suppression
> > > resolve
> > > >>>> your concern?
> > > >>>>
> > > >>>> Thanks again,
> > > >>>> -John
> > > >>>>
> > > >>>> On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang <wangg...@gmail.com
> >
> > > >>> wrote:
> > > >>>>
> > > >>>>> Hi John,
> > > >>>>>
> > > >>>>> Regarding the metrics: yeah I think I'm with you that the dropped
> > > >>> records
> > > >>>>> due to window retention or emit suppression policies should be
> > > >> recorded
> > > >>>>> differently, and using this KIP's proposed metric would be fine.
> If
> > > >> you
> > > >>>>> also think we can use this KIP's proposed metrics to cover the
> > window
> > > >>>>> retention cased skipping records, then we can include the changes
> > in
> > > >>> this
> > > >>>>> KIP as well.
> > > >>>>>
> > > >>>>> Regarding the current proposal, I'm actually not too worried
> about
> > > the
> > > >>>>> inconsistency between query semantics and downstream emit
> > semantics.
> > > >> For
> > > >>>>> queries, we will always return the current running results of the
> > > >>> windows,
> > > >>>>> being it partial or final results depending on the window
> retention
> > > >> time
> > > >>>>> anyways, which has nothing to do whether the emitted stream
> should
> > be
> > > >>> one
> > > >>>>> final output per key or not. I also agree that having a unified
> > > >>> operation
> > > >>>>> is generally better for users to focus on leveraging that one
> only
> > > >> than
> > > >>>>> learning about two set of operations. The only question I had is,
> > for
> > > >>>>> final
> > > >>>>> updates of window stores, if it is a bit awkward to understand
> the
> > > >>>>> configuration combo. Thinking about this more, I think my root
> > worry
> > > >> in
> > > >>>>> the
> > > >>>>> "suppressLateEvents" call for windowed tables, since from a user
> > > >>>>> perspective: if my retention time is X which means "pay the cost
> to
> > > >>> allow
> > > >>>>> late records up to X to still be applied updating the tables",
> why
> > > >>> would I
> > > >>>>> ever want to suppressLateEvents by Y ( < X), to say "do not send
> > the
> > > >>>>> updates up to Y, which means the downstream operator or sink
> topic
> > > for
> > > >>>>> this
> > > >>>>> stream would actually see a truncated update stream while I've
> paid
> > > >>> larger
> > > >>>>> cost for that"; and of course, Y > X would not make sense either
> as
> > > >> you
> > > >>>>> would not see any updates later than X anyways. So in all, my
> > feeling
> > > >> is
> > > >>>>> that it makes less sense for windowed table's
> "suppressLateEvents"
> > > >> with
> > > >>> a
> > > >>>>> parameter that is not equal to the window retention, and opening
> > the
> > > >>> door
> > > >>>>> in the current proposal may confuse people with that.
> > > >>>>>
> > > >>>>> Again, above is just a subjective opinion and probably we can
> also
> > > >> bring
> > > >>>>> up
> > > >>>>> some scenarios that users does want to set X != Y.. but
> personally
> > I
> > > >>> feel
> > > >>>>> that even if the semantics for this scenario if intuitive for
> user
> > to
> > > >>>>> understand, doe that really make sense and should we really open
> > the
> > > >>> door
> > > >>>>> for it. So I think maybe separating the final update in a
> separate
> > > >> API's
> > > >>>>> benefits may overwhelm the advantage of having one uniform
> > > definition.
> > > >>> And
> > > >>>>> for my alternative proposal, the rationale was from both my
> concern
> > > >>> about
> > > >>>>> "suppressLateEvents" for windowed store, and Matthias' question
> > about
> > > >>>>> "suppressLateEvents" for non-windowed stores, that if it is less
> > > >>>>> meaningful
> > > >>>>> for both, we can consider removing it completely and only do
> > > >>>>> "IntermediateSuppression" in Suppress instead.
> > > >>>>>
> > > >>>>> So I'd summarize my thoughts in the following questions:
> > > >>>>>
> > > >>>>> 1. Does "suppressLateEvents" with parameter Y != X (window
> > retention
> > > >>> time)
> > > >>>>> for windowed stores make sense in practice?
> > > >>>>> 2. Does "suppressLateEvents" with any parameter Y for
> non-windowed
> > > >>> stores
> > > >>>>> make sense in practice?
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> Guozhang
> > > >>>>>
> > > >>>>>
> > > >>>>> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck <bbej...@gmail.com>
> > > >> wrote:
> > > >>>>>
> > > >>>>>> Thanks for the explanation, that does make sense.  I have some
> > > >>>>> questions on
> > > >>>>>> operations, but I'll just wait for the PR and tests.
> > > >>>>>>
> > > >>>>>> Thanks,
> > > >>>>>> Bill
> > > >>>>>>
> > > >>>>>> On Wed, Jun 27, 2018 at 8:14 PM John Roesler <j...@confluent.io
> >
> > > >>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Bill,
> > > >>>>>>>
> > > >>>>>>> Thanks for the review!
> > > >>>>>>>
> > > >>>>>>> Your question is very much applicable to the KIP and not at all
> > an
> > > >>>>>>> implementation detail. Thanks for bringing it up.
> > > >>>>>>>
> > > >>>>>>> I'm proposing not to change the existing caches and
> > configurations
> > > >>> at
> > > >>>>> all
> > > >>>>>>> (for now).
> > > >>>>>>>
> > > >>>>>>> Imagine you have a topology like this:
> > > >>>>>>> commit.interval.ms = 100
> > > >>>>>>>
> > > >>>>>>> (ktable1 (cached)) -> (suppress emitAfter 200)
> > > >>>>>>>
> > > >>>>>>> The first ktable (ktable1) will respect the commit interval and
> > > >>> buffer
> > > >>>>>>> events for 100ms before logging, storing, or forwarding them
> > > >> (IIRC).
> > > >>>>>>> Therefore, the second ktable (suppress) will only see the
> events
> > > >> at
> > > >>> a
> > > >>>>>> rate
> > > >>>>>>> of once per 100ms. It will apply its own buffering, and emit
> once
> > > >>> per
> > > >>>>>> 200ms
> > > >>>>>>> This case is pretty trivial because the suppress time is a
> > > >> multiple
> > > >>> of
> > > >>>>>> the
> > > >>>>>>> commit interval.
> > > >>>>>>>
> > > >>>>>>> When it's not an integer multiple, you'll get behavior like in
> > > >> this
> > > >>>>>> marble
> > > >>>>>>> diagram:
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)->
> > > >>>>>>>
> > > >>>>>>> [ KTable caching with commit interval = 2 ]
> > > >>>>>>>
> > > >>>>>>> <--------(k:2)---------(k:4)---------(k:6)->
> > > >>>>>>>
> > > >>>>>>>       [ suppress with emitAfter = 3 ]
> > > >>>>>>>
> > > >>>>>>> <---------------(k:2)----------------(k:6)->
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> If this behavior isn't desired (for example, if you wanted to
> > emit
> > > >>>>> (k:3)
> > > >>>>>> at
> > > >>>>>>> time 3, I'd recommend setting the "cache.max.bytes.buffering"
> to
> > 0
> > > >>> or
> > > >>>>>>> modifying the topology to disable caching. Then, the behavior
> is
> > > >>> more
> > > >>>>>>> simply determined just by the suppress operator.
> > > >>>>>>>
> > > >>>>>>> Does that seem right to you?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Regarding the changelogs, because the suppression operator
> hangs
> > > >>> onto
> > > >>>>>>> events for a while, it will need its own changelog. The
> changelog
> > > >>>>>>> should represent the current state of the buffer at all times.
> So
> > > >>> when
> > > >>>>>> the
> > > >>>>>>> suppress operator sees (k:2), for example, it will log (k:2).
> > When
> > > >>> it
> > > >>>>>>> later gets to time 3, it's time to emit (k:2) downstream.
> Because
> > > >> k
> > > >>>>> is no
> > > >>>>>>> longer buffered, the suppress operator will log (k:null). Thus,
> > > >> when
> > > >>>>>>> recovering,
> > > >>>>>>> it can rebuild the buffer by reading its changelog.
> > > >>>>>>>
> > > >>>>>>> What do you think about this?
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> -John
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck <bbej...@gmail.com
> >
> > > >>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi John,  thanks for the KIP.
> > > >>>>>>>>
> > > >>>>>>>> Early on in the KIP, you mention the current approaches for
> > > >>>>> controlling
> > > >>>>>>> the
> > > >>>>>>>> rate of downstream records from a KTable, cache size
> > > >> configuration
> > > >>>>> and
> > > >>>>>>>> commit time.
> > > >>>>>>>>
> > > >>>>>>>> Will these configuration parameters still be in effect for
> > > >> tables
> > > >>>>> that
> > > >>>>>>>> don't use suppression?  For tables taking advantage of
> > > >>> suppression,
> > > >>>>>> will
> > > >>>>>>>> these configurations have no impact?
> > > >>>>>>>> This last question may be to implementation specific but if
> the
> > > >>>>>> requested
> > > >>>>>>>> suppression time is longer than the specified commit time,
> will
> > > >>> the
> > > >>>>>>> latest
> > > >>>>>>>> record in the suppression buffer get stored in a changelog?
> > > >>>>>>>>
> > > >>>>>>>> Thanks,
> > > >>>>>>>> Bill
> > > >>>>>>>>
> > > >>>>>>>> On Wed, Jun 27, 2018 at 3:04 PM John Roesler <
> j...@confluent.io
> > > >>>
> > > >>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Thanks for the feedback, Matthias,
> > > >>>>>>>>>
> > > >>>>>>>>> It seems like in straightforward relational processing cases,
> > > >> it
> > > >>>>>> would
> > > >>>>>>>> not
> > > >>>>>>>>> make sense to bound the lateness of KTables. In general, it
> > > >>> seems
> > > >>>>>>> better
> > > >>>>>>>> to
> > > >>>>>>>>> have "guard rails" in place that make it easier to write
> > > >>> sensible
> > > >>>>>>>> programs
> > > >>>>>>>>> than insensible ones.
> > > >>>>>>>>>
> > > >>>>>>>>> But I'm still going to argue in favor of keeping it for all
> > > >>>>> KTables
> > > >>>>>> ;)
> > > >>>>>>>>>
> > > >>>>>>>>> 1. I believe it is simpler to understand the operator if it
> > > >> has
> > > >>>>> one
> > > >>>>>>>> uniform
> > > >>>>>>>>> definition, regardless of context. It's well defined and
> > > >>> intuitive
> > > >>>>>> what
> > > >>>>>>>>> will happen when you use late-event suppression on a KTable,
> > > >> so
> > > >>> I
> > > >>>>>> think
> > > >>>>>>>>> nothing surprising or dangerous will happen in that case.
> From
> > > >>> my
> > > >>>>>>>>> perspective, having two sets of allowed operations is
> actually
> > > >>> an
> > > >>>>>>>> increase
> > > >>>>>>>>> in cognitive complexity.
> > > >>>>>>>>>
> > > >>>>>>>>> 2. To me, it's not crazy to use the operator this way. For
> > > >>>>> example,
> > > >>>>>> in
> > > >>>>>>>> lieu
> > > >>>>>>>>> of full-featured timestamp semantics, I can implement MVCC
> > > >>>>> behavior
> > > >>>>>>> when
> > > >>>>>>>>> building a KTable by "suppressLateEvents(Duration.ZERO)". I
> > > >>>>> suspect
> > > >>>>>>> that
> > > >>>>>>>>> there are other, non-obvious applications of suppressing late
> > > >>>>> events
> > > >>>>>> on
> > > >>>>>>>>> KTables.
> > > >>>>>>>>>
> > > >>>>>>>>> 3. Not to get too much into implementation details in a KIP
> > > >>>>>> discussion,
> > > >>>>>>>> but
> > > >>>>>>>>> if we did want to make late-event suppression available only
> > > >> on
> > > >>>>>>> windowed
> > > >>>>>>>>> KTables, we have two enforcement options:
> > > >>>>>>>>>   a. check when we build the topology - this would be simple
> > > >> to
> > > >>>>>>>> implement,
> > > >>>>>>>>> but would be a runtime check. Hopefully, people write tests
> > > >> for
> > > >>>>> their
> > > >>>>>>>>> topology before deploying them, so the feedback loop isn't
> > > >>>>>>> instantaneous,
> > > >>>>>>>>> but it's not too long either.
> > > >>>>>>>>>   b. add a new WindowedKTable type - this would be a compile
> > > >>> time
> > > >>>>>>> check,
> > > >>>>>>>>> but would also be substantial increase of both interface and
> > > >>> code
> > > >>>>>>>>> complexity.
> > > >>>>>>>>>
> > > >>>>>>>>> We should definitely strive to have guard rails protecting
> > > >>> against
> > > >>>>>>>>> surprising or dangerous behavior. Protecting against programs
> > > >>>>> that we
> > > >>>>>>>> don't
> > > >>>>>>>>> currently predict is a lesser benefit, and I think we can put
> > > >> up
> > > >>>>>> guard
> > > >>>>>>>>> rails on a case-by-case basis for that. It seems like the
> > > >>>>> increase in
> > > >>>>>>>>> cognitive (and potentially code and interface) complexity
> > > >> makes
> > > >>> me
> > > >>>>>>> think
> > > >>>>>>>> we
> > > >>>>>>>>> should skip this case.
> > > >>>>>>>>>
> > > >>>>>>>>> What do you think?
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks,
> > > >>>>>>>>> -John
> > > >>>>>>>>>
> > > >>>>>>>>> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax <
> > > >>>>>>> matth...@confluent.io>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Thanks for the KIP John.
> > > >>>>>>>>>>
> > > >>>>>>>>>> One initial comments about the last example "Bounded
> > > >>> lateness":
> > > >>>>>> For a
> > > >>>>>>>>>> non-windowed KTable bounding the lateness does not really
> > > >> make
> > > >>>>>> sense,
> > > >>>>>>>>>> does it?
> > > >>>>>>>>>>
> > > >>>>>>>>>> Thus, I am wondering if we should allow
> > > >> `suppressLateEvents()`
> > > >>>>> for
> > > >>>>>>> this
> > > >>>>>>>>>> case? It seems to be better to only allow it for
> > > >>>>> windowed-KTables.
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> -Matthias
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> On 6/27/18 8:53 AM, Ted Yu wrote:
> > > >>>>>>>>>>> I noticed this (lack of primary parameter) as well.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> What you gave as new example is semantically the same as
> > > >>> what
> > > >>>>> I
> > > >>>>>>>>>> suggested.
> > > >>>>>>>>>>> So it is good by me.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thanks
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler <
> > > >>>>> j...@confluent.io
> > > >>>>>>>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks for taking look, Ted,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I agree this is a departure from the conventions of
> > > >> Streams
> > > >>>>> DSL.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Most of our config objects have one or two "required"
> > > >>>>>> parameters,
> > > >>>>>>>>> which
> > > >>>>>>>>>> fit
> > > >>>>>>>>>>>> naturally with the static factory method approach.
> > > >>>>> TimeWindow,
> > > >>>>>> for
> > > >>>>>>>>>> example,
> > > >>>>>>>>>>>> requires a size parameter, so we can naturally say
> > > >>>>>>>>> TimeWindows.of(size).
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I think in the case of a suppression, there's really no
> > > >>>>> "core"
> > > >>>>>>>>>> parameter,
> > > >>>>>>>>>>>> and "Suppression.of()" seems sillier than "new
> > > >>>>> Suppression()". I
> > > >>>>>>>> think
> > > >>>>>>>>>> that
> > > >>>>>>>>>>>> Suppression.of(duration) would be ambiguous, since there
> > > >>> are
> > > >>>>>> many
> > > >>>>>>>>>> durations
> > > >>>>>>>>>>>> that we can configure.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> However, thinking about it again, I suppose that I can
> > > >> give
> > > >>>>> each
> > > >>>>>>>>>>>> configuration method a static version, which would let
> > > >> you
> > > >>>>>> replace
> > > >>>>>>>>> "new
> > > >>>>>>>>>>>> Suppression()." with "Suppression." in all the examples.
> > > >>>>>>> Basically,
> > > >>>>>>>>>> instead
> > > >>>>>>>>>>>> of "of()", we'd support any of the methods I listed.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> For example:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> windowCounts
> > > >>>>>>>>>>>>     .suppress(
> > > >>>>>>>>>>>>         Suppression
> > > >>>>>>>>>>>>             .suppressLateEvents(Duration.ofMinutes(10))
> > > >>>>>>>>>>>>             .suppressIntermediateEvents(
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>  IntermediateSuppression.emitAfter(Duration.ofMinutes(10))
> > > >>>>>>>>>>>>             )
> > > >>>>>>>>>>>>     );
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Does that seem better?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>> -John
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu <
> > > >>> yuzhih...@gmail.com
> > > >>>>>>
> > > >>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> I started to read this KIP which contains a lot of
> > > >>>>> materials.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> One suggestion:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>     .suppress(
> > > >>>>>>>>>>>>>         new Suppression()
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Do you think it would be more consistent with the rest
> > > >> of
> > > >>>>>> Streams
> > > >>>>>>>>> data
> > > >>>>>>>>>>>>> structures by supporting `of` ?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Suppression.of(Duration.ofMinutes(10))
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Cheers
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler <
> > > >>>>>> j...@confluent.io
> > > >>>>>>>>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hello devs and users,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Please take some time to consider this proposal for
> > > >> Kafka
> > > >>>>>>> Streams:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> KIP-328: Ability to suppress updates for KTables
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> link: https://cwiki.apache.org/confluence/x/sQU0BQ
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> The basic idea is to provide:
> > > >>>>>>>>>>>>>> * more usable control over update rate (vs the current
> > > >>>>> state
> > > >>>>>>> store
> > > >>>>>>>>>>>>> caches)
> > > >>>>>>>>>>>>>> * the final-result-for-windowed-computations feature
> > > >>> which
> > > >>>>>>> several
> > > >>>>>>>>>>>> people
> > > >>>>>>>>>>>>>> have requested
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I look forward to your feedback!
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>> -John
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> --
> > > >>>>> -- Guozhang
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to