Thanks for the update. I did a first pass over the updated KIP and think
it makes sense.

-Matthias

On 7/11/18 5:47 PM, John Roesler wrote:
> Hi all,
> 
> I have updated KIP-328 with all the feedback I've gotten so far. Please
> take another look and let me know what you think!
> 
> Thanks,
> -John
> 
> On Wed, Jul 11, 2018 at 12:28 AM Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> That is a good point..
>>
>> I cannot think of a better option than documentation and warning, and also
>> given that we'd probably better not reusing the function name `until` for
>> close time.
>>
>>
>> Guozhang
>>
>>
>> On Tue, Jul 10, 2018 at 3:31 PM, John Roesler <j...@confluent.io> wrote:
>>
>>> I had some opportunity to reflect on the default for close time today...
>>>
>>> Note that the current "close time" is equal to the retention time, and
>>> therefore "close" today shares the default retention of 24h.
>>>
>>> It would definitely break any application that today specifies a
>> retention
>>> time to set close shorter than that time. It's also likely to break apps
>> if
>>> they *don't* set the retention time and rely on the 24h default. So it's
>>> unfortunate, but I think if "close" isn't set, we should use the
>> retention
>>> time instead of a fixed default.
>>>
>>> When we ultimately remove the retention time parameter ("until"), we will
>>> have to set "close" to a default of 24h.
>>>
>>> Of course, this has a negative impact on the user of "final results",
>> since
>>> they won't see any output at all for retentionTime/24h, and may find this
>>> confusing. What can we do about this except document it well? Maybe log a
>>> warning if we see that close wasn't explicitly set while using "final
>>> results"?
>>>
>>> Thanks,
>>> -John
>>>
>>> On Tue, Jul 10, 2018 at 10:46 AM John Roesler <j...@confluent.io> wrote:
>>>
>>>> Hi Guozhang,
>>>>
>>>> That sounds good to me. I'll include that in the KIP.
>>>>
>>>> Thanks,
>>>> -John
>>>>
>>>> On Mon, Jul 9, 2018 at 6:33 PM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>>
>>>>> Let me clarify a bit on what I meant about moving `retentionPeriod` to
>>>>> WindowStoreBuilder:
>>>>>
>>>>> In another discussion we had around KIP-319 / 330, that the "retention
>>>>> period" should not really be a window spec, but only a window store
>>> spec,
>>>>> as it only affects how long to retain each window to be queryable
>> along
>>>>> with the storage cost.
>>>>>
>>>>> More specifically, today the "maintainMs" returned from Windows is
>> used
>>> in
>>>>> three places:
>>>>>
>>>>> 1) for windowed aggregations, they are passed in directly into
>>>>> `Stores.persistentWindows()` as the retention period parameters. For
>>> this
>>>>> use case we should just let the WindowStoreBuilder to specify this
>> value
>>>>> itself.
>>>>>
>>>>> NOTE: It is also returned in the KStreamWindowAggregate processor, to
>>>>> determine if a received record should be dropped due to its lateness.
>> We
>>>>> may need to think of another way to get this value inside the
>> processor
>>>>>
>>>>> 2) for windowed stream-stream join, it is used as the join range
>>> parameter
>>>>> but only to check that "windowSizeMs <= retentionPeriodMs". We can do
>>> this
>>>>> check at the store builder lever instead of at the processor level.
>>>>>
>>>>>
>>>>> If we can remove its usage in both 1) and 2), then we should be able
>> to
>>>>> safely remove this from the `Windows` spec.
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Mon, Jul 9, 2018 at 3:53 PM, John Roesler <j...@confluent.io>
>> wrote:
>>>>>
>>>>>> Thanks for the reply, Guozhang,
>>>>>>
>>>>>> Good! I agree, that is also a good reason, and I actually made use
>> of
>>>>> that
>>>>>> in my tests. I'll update the KIP.
>>>>>>
>>>>>> By the way, I chose "allowedLateness" as I was trying to pick a
>> better
>>>>> name
>>>>>> than "close", but I think it's actually the wrong name. We don't
>> want
>>> to
>>>>>> bound the lateness of events in general, only with respect to the
>> end
>>> of
>>>>>> their window.
>>>>>>
>>>>>> If we have a window [0,10), with "allowedLateness" of 5, then if we
>>> get
>>>>> an
>>>>>> event with timestamp 3 at time 9, the name implies we'd reject it,
>>> which
>>>>>> seems silly. Really, we'd only want to start rejecting that event at
>>>>> stream
>>>>>> time 15.
>>>>>>
>>>>>> What I meant was more like "allowedLatenessAfterWindowEnd", but
>>> that's
>>>>> too
>>>>>> verbose. I think that "close" + some documentation about what it
>> means
>>>>> will
>>>>>> be better.
>>>>>>
>>>>>> 1: "Close" would be measured from the end of the window, so a
>>> reasonable
>>>>>> default would be "0". Recall that "close" really only needs to be
>>>>> specified
>>>>>> for final results, and a default of 0 would produce the most
>> intuitive
>>>>>> results. If folks later discover that they are missing some late
>>> events,
>>>>>> they can adjust the parameter accordingly. IMHO, any other value
>> would
>>>>> just
>>>>>> be a guess on our part.
>>>>>>
>>>>>> 2a:
>>>>>> I think you're saying to re-use "until" instead of adding "close" to
>>> the
>>>>>> window.
>>>>>>
>>>>>> The downside here would be that the semantic change could be more
>>>>> confusing
>>>>>> than deprecating "until" and introducing window "close" and a
>>>>>> "retentionTime" on the store builder. The deprecation is a good,
>>>>> controlled
>>>>>> way for us to make sure people are getting the semantics they think
>>>>> they're
>>>>>> getting, as well as giving us an opportunity to link people to the
>> API
>>>>> they
>>>>>> should use instead.
>>>>>>
>>>>>> I didn't fully understand the second part, but it sounds like you're
>>>>>> suggesting to add a new "retentionTime" setter to Windows to bridge
>>> the
>>>>> gap
>>>>>> until we add it to the store builder? That seems kind of roundabout
>> to
>>>>> me,
>>>>>> if that's what you meant. We could just immediately add it to the
>>> store
>>>>>> builders in the same PR.
>>>>>>
>>>>>> 2b: Sounds good to me!
>>>>>>
>>>>>> Thanks again,
>>>>>> -John
>>>>>>
>>>>>>
>>>>>> On Mon, Jul 9, 2018 at 4:55 PM Guozhang Wang <wangg...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>>> John,
>>>>>>>
>>>>>>> Thanks for your replies. As for the two options of the API, I
>> think
>>>>> I'm
>>>>>>> slightly inclined to the first option as well. My motivation is a
>>> bit
>>>>>>> different, as I think of the first one maybe more flexible, for
>>>>> example:
>>>>>>>
>>>>>>> KTable<Windowed<..>> table = ... count();
>>>>>>>
>>>>>>> table.toStream().peek(..);   // want to peek at the changelog
>>> stream,
>>>>> do
>>>>>>> not care about final results.
>>>>>>>
>>>>>>> table.suppress().toStream().to("topic");    // sending to a topic,
>>>>> want
>>>>>> to
>>>>>>> only send the final results.
>>>>>>>
>>>>>>> --------------
>>>>>>>
>>>>>>> Besides that, I have a few more minor questions:
>>>>>>>
>>>>>>> 1. For "allowedLateness", what should be the default value? I.e.
>> if
>>>>> user
>>>>>> do
>>>>>>> not specify "allowedLateness" in TimeWindows, what value should we
>>>>> set?
>>>>>>>
>>>>>>> 2. For API names, some personal suggestions here:
>>>>>>>
>>>>>>> 2.a) "allowedLateness"  -> "until" (semantics changed, and also
>>> value
>>>>> is
>>>>>>> defined as delta on top of window length), where "until" ->
>>>>>>> "retentionPeriod", and the latter will be removed from `Windows`
>> to
>>> `
>>>>>>> WindowStoreBuilder` in the future.
>>>>>>>
>>>>>>> 2.b) "BufferConfig" -> "Buffered" ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jul 9, 2018 at 2:09 PM, John Roesler <j...@confluent.io>
>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Matthias and Guozhang,
>>>>>>>>
>>>>>>>> Sorry for the slow reply. I was mulling about your feedback and
>>>>>> weighing
>>>>>>>> some ideas in a sketchbook PR: https://github.com/apache/
>>>>>> kafka/pull/5337
>>>>>>> .
>>>>>>>>
>>>>>>>> Your thought about keeping suppression independent of business
>>> logic
>>>>>> is a
>>>>>>>> very good one. I agree that it would make more sense to add some
>>>>> kind
>>>>>> of
>>>>>>>> "window close" concept to the window definition.
>>>>>>>>
>>>>>>>> In fact, doing that immediately solves the inconsistency problem
>>>>>> Guozhang
>>>>>>>> brought up. There's no need to add a "final results" or
>> "emission"
>>>>>> option
>>>>>>>> to the windowed aggregation.
>>>>>>>>
>>>>>>>> What do you think about an API more like this:
>>>>>>>>
>>>>>>>> final StreamsBuilder builder = new StreamsBuilder();
>>>>>>>>
>>>>>>>> builder
>>>>>>>>   .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
>>>>>>>>   .groupBy(
>>>>>>>>     (String k1, String v1) -> k1,
>>>>>>>>     Serialized.with(STRING_SERDE, STRING_SERDE)
>>>>>>>>   )
>>>>>>>>   .windowedBy(TimeWindows
>>>>>>>>     .of(scaledTime(2L))
>>>>>>>>     .until(scaledTime(3L))
>>>>>>>>     .allowedLateness(scaledTime(1L))
>>>>>>>>   )
>>>>>>>>   .count(Materialized.as("counts"))
>>>>>>>>   .suppress(
>>>>>>>>     emitFinalResultsOnly(
>>>>>>>>       BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(
>>>>>> SHUT_DOWN)
>>>>>>>>     )
>>>>>>>>   )
>>>>>>>>   .toStream()
>>>>>>>>   .to("output-suppressed", Produced.with(STRING_SERDE,
>>> LONG_SERDE));
>>>>>>>>
>>>>>>>> Note that:
>>>>>>>>  * "emitFinalResultsOnly" is available *only* on windowed tables
>>>>>>> (enforced
>>>>>>>> by the type system at compile time), and it determines the time
>> to
>>>>> wait
>>>>>>> by
>>>>>>>> looking at "allowedLateness" on the TimeWindows config.
>>>>>>>>  * querying "counts" will produce results (eventually)
>> consistent
>>>>> with
>>>>>>>> what's observable in "output-suppressed".
>>>>>>>>  * in all cases, "suppress" has no effect on business logic,
>> just
>>> on
>>>>>>> event
>>>>>>>> suppression.
>>>>>>>>
>>>>>>>> Is this API straightforward? Or do you still prefer the version
>>> that
>>>>>> both
>>>>>>>> proposed:
>>>>>>>>
>>>>>>>>   ...
>>>>>>>>   .windowedBy(TimeWindows
>>>>>>>>     .of(scaledTime(2L))
>>>>>>>>     .until(scaledTime(3L))
>>>>>>>>     .allowedLateness(scaledTime(1L))
>>>>>>>>   )
>>>>>>>>   .count(
>>>>>>>>     Materialized.as("counts"),
>>>>>>>>     emitFinalResultsOnly(
>>>>>>>>       BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(
>>>>>> SHUT_DOWN)
>>>>>>>>     )
>>>>>>>>   )
>>>>>>>>   ...
>>>>>>>>
>>>>>>>> To me, these two are practically identical, and I still vaguely
>>>>> prefer
>>>>>>> the
>>>>>>>> first one.
>>>>>>>>
>>>>>>>> The prototype has made clearer to me that users of "final
>> results
>>>>> for
>>>>>>>> windows" and users of "suppression for table events" both need
>> to
>>>>>>> configure
>>>>>>>> the suppression buffer.
>>>>>>>>
>>>>>>>> This buffer configuration consists of:
>>>>>>>> 1. how many keys or bytes to keep in memory
>>>>>>>> 2. what to do if memory runs out (shut down, start using disk,
>>> ...)
>>>>>>>>
>>>>>>>> So it's not as simple as setting a "final results" flag. We'll
>>>>> either
>>>>>>> have
>>>>>>>> an "Emit" config object on the windowed aggregators that takes
>> the
>>>>> same
>>>>>>>> BufferConfig that the "Suppress" config on the suppression
>>>>> operator, or
>>>>>>> we
>>>>>>>> just use the suppression operator for both.
>>>>>>>>
>>>>>>>> Perhaps it would sweeten the deal a little to point out that we
>>>>> have 2
>>>>>>>> overloads already for each windowed aggregator (with and without
>>>>>>>> Materialized). Adding "Emitted" or something would mean that
>> we'd
>>>>> add a
>>>>>>> new
>>>>>>>> overload for each one, taking us up to 4 overloads each for
>>> "count",
>>>>>>>> "aggregate" and "reduce". Using "suppress" means that we don't
>> add
>>>>> any
>>>>>>> new
>>>>>>>> overloads.
>>>>>>>>
>>>>>>>> Thanks again for helping to hash this out,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Fri, Jul 6, 2018 at 6:20 PM Guozhang Wang <
>> wangg...@gmail.com>
>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think I agree with Matthias for having dedicated APIs for
>>>>> windowed
>>>>>>>>> operation final output scenario, PLUS separating the window
>>> close
>>>>>> which
>>>>>>>> the
>>>>>>>>> "final output" would rely on, from the window retention time
>>>>> itself
>>>>>>>>> (admittedly it would make this KIP effort larger, but if we
>>>>> believe
>>>>>> we
>>>>>>>> need
>>>>>>>>> to do this separation anyways we could just do it now).
>>>>>>>>>
>>>>>>>>> And then we can have the `KTable#suppress()` for
>>>>>>> intermediate-suppression
>>>>>>>>> only, not for late-record-suppression, until we've seen that
>>>>> becomes
>>>>>> a
>>>>>>>>> common feature request because our current design still allows
>>> to
>>>>> be
>>>>>>>>> extended for that purpose.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>> On Wed, Jul 4, 2018 at 12:53 PM, Matthias J. Sax <
>>>>>>> matth...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for the discussion. I am just catching up.
>>>>>>>>>>
>>>>>>>>>> In general, I think we have different uses cases and
>>>>> non-windowed
>>>>>> and
>>>>>>>>>> windowed is quite different. For the non-windowed case,
>>>>> suppress()
>>>>>>> has
>>>>>>>>>> no (useful) close or retention time, no final semantics, and
>>>>> also
>>>>>> no
>>>>>>>>>> business logic impact.
>>>>>>>>>>
>>>>>>>>>> On the other hand, for windowed aggregations, close time and
>>>>> final
>>>>>>>>>> result do have a meaning. IMHO, `close()` is part of
>> business
>>>>> logic
>>>>>>>>>> while retention time is not. Also, suppression of
>> intermediate
>>>>>> result
>>>>>>>> is
>>>>>>>>>> not a business rule and there might be use case for which
>>> either
>>>>>>> "early
>>>>>>>>>> intermediate" (before window end time) are suppressed only,
>> or
>>>>> all
>>>>>>>>>> intermediates are suppressed (maybe also something in the
>>>>> middle,
>>>>>> ie,
>>>>>>>>>> just reduce the load of intermediate updates). Thus,
>>>>>>> window-suppression
>>>>>>>>>> is much richer.
>>>>>>>>>>
>>>>>>>>>> IMHO, a generic `suppress()` operator that can be inserted
>>> into
>>>>> the
>>>>>>>> data
>>>>>>>>>> flow at any point is useful. Maybe we should keep is as
>>> generic
>>>>> as
>>>>>>>>>> possible. However, it might be difficult to use with regard
>> to
>>>>>>>>>> windowing, as the mental effort to use it is high.
>>>>>>>>>>
>>>>>>>>>> With regard to Guozhang's comment:
>>>>>>>>>>
>>>>>>>>>>> we will actually
>>>>>>>>>>> process data as old as 30 days as well, while most of the
>>> late
>>>>>>>> updates
>>>>>>>>>>> beyond 5 minutes would be discarded anyways.
>>>>>>>>>>
>>>>>>>>>> If we use `suppress()` as a standalone operator, this is
>>> correct
>>>>>> and
>>>>>>>>>> intended IMHO. To address the issue if the behavior is
>>>>> unwanted, I
>>>>>>>> would
>>>>>>>>>> suggest to add a "suppress option" directly to
>>>>>>>>>> `count()/reduce()/aggregate()` window operator similar to
>>>>>>>>>> `Materialized`. This would be an "embedded suppress" and
>> avoid
>>>>> the
>>>>>>>>>> issue. It would also address the issue about mental effort
>> for
>>>>>>> "single
>>>>>>>>>> final window result" use case.
>>>>>>>>>>
>>>>>>>>>> I also think that a shorter close-time than retention time
>> is
>>>>>> useful
>>>>>>>> for
>>>>>>>>>> window aggregation. If we add close() to the window
>> definition
>>>>> and
>>>>>>>>>> until() to `Materialized`, we can separate both correctly
>>> IMHO.
>>>>>>>>>>
>>>>>>>>>> About setting `close = min(close,retention)` I am not sure.
>> We
>>>>>> might
>>>>>>>>>> rather throw an exception than reducing the close time
>>>>>> automatically.
>>>>>>>>>> Otherwise, I see many user question about "I set close to X
>>> but
>>>>> it
>>>>>>> does
>>>>>>>>>> not get updated for some data that is with delay of X".
>>>>>>>>>>
>>>>>>>>>> The tricky question might be to design the API in a backward
>>>>>>> compatible
>>>>>>>>>> way though.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 7/3/18 5:38 AM, John Roesler wrote:
>>>>>>>>>>> Hi Guozhang,
>>>>>>>>>>>
>>>>>>>>>>> I see. It seems like if we want to decouple 1) and 2), we
>>>>> need to
>>>>>>>> alter
>>>>>>>>>> the
>>>>>>>>>>> definition of the window. Do you think it would close the
>>> gap
>>>>> if
>>>>>> we
>>>>>>>>>> added a
>>>>>>>>>>> "window close" time to the window definition?
>>>>>>>>>>>
>>>>>>>>>>> Such as:
>>>>>>>>>>>
>>>>>>>>>>> builder.stream("input")
>>>>>>>>>>> .groupByKey()
>>>>>>>>>>> .windowedBy(
>>>>>>>>>>>   TimeWindows
>>>>>>>>>>>     .of(60_000)
>>>>>>>>>>>     .closeAfter(10 * 60)
>>>>>>>>>>>     .until(30L * 24 * 60 * 60 * 1000)
>>>>>>>>>>> )
>>>>>>>>>>> .count()
>>>>>>>>>>> .suppress(Suppression.finalResultsOnly());
>>>>>>>>>>>
>>>>>>>>>>> Possibly called "finalResultsAtWindowClose" or something?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> -John
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang <
>>>>> wangg...@gmail.com
>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey John,
>>>>>>>>>>>>
>>>>>>>>>>>> Obviously I'm too lazy on email replying diligence
>> compared
>>>>> with
>>>>>>> you
>>>>>>>>> :)
>>>>>>>>>>>> Will try to reply them separately:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ------------------------------
>>> ------------------------------
>>>>>>>>>> -----------------
>>>>>>>>>>>>
>>>>>>>>>>>> To reply your email on "Mon, Jul 2, 2018 at 8:23 AM":
>>>>>>>>>>>>
>>>>>>>>>>>> I'm aware of this use case, but again, the concern is
>> that,
>>>>> in
>>>>>>> this
>>>>>>>>>> setting
>>>>>>>>>>>> in order to let the window be queryable for 30 days, we
>>> will
>>>>>>>> actually
>>>>>>>>>>>> process data as old as 30 days as well, while most of the
>>>>> late
>>>>>>>> updates
>>>>>>>>>>>> beyond 5 minutes would be discarded anyways. Personally I
>>>>> think
>>>>>>> for
>>>>>>>>> the
>>>>>>>>>>>> final update scenario, the ideal situation users would
>> want
>>>>> is
>>>>>>> that
>>>>>>>>> "do
>>>>>>>>>> not
>>>>>>>>>>>> process any data that is less than 5 minutes, and of
>> course
>>>>> no
>>>>>>>> update
>>>>>>>>>>>> records to the downstream later than 5 minutes either;
>> but
>>>>>> retain
>>>>>>>> the
>>>>>>>>>>>> window to be queryable for 30 days". And by doing that
>> the
>>>>> final
>>>>>>>>> window
>>>>>>>>>>>> snapshot would also be aligned with the update stream as
>>>>> well.
>>>>>> In
>>>>>>>>> other
>>>>>>>>>>>> words, among these three periods:
>>>>>>>>>>>>
>>>>>>>>>>>> 1) the retention length of the window / table.
>>>>>>>>>>>> 2) the late records acceptance for updating the window.
>>>>>>>>>>>> 3) the late records update to be sent downstream.
>>>>>>>>>>>>
>>>>>>>>>>>> Final update use cases would naturally want 2) = 3),
>> while
>>> 1)
>>>>>> may
>>>>>>> be
>>>>>>>>>>>> different and larger, while what we provide now is that
>> 1)
>>> =
>>>>> 2),
>>>>>>>> which
>>>>>>>>>>>> could be different and in practice larger than 3), hence
>>> not
>>>>> the
>>>>>>>> most
>>>>>>>>>>>> intuitive for their needs.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ------------------------------
>>> ------------------------------
>>>>>>>>>> -----------------
>>>>>>>>>>>>
>>>>>>>>>>>> To reply your email on "Mon, Jul 2, 2018 at 10:27 AM":
>>>>>>>>>>>>
>>>>>>>>>>>> I'd like option 2) over option 1) better as well from
>>>>>> programming
>>>>>>>> pov.
>>>>>>>>>> But
>>>>>>>>>>>> I'm wondering if option 2) would provide the above
>>> semantics
>>>>> or
>>>>>> it
>>>>>>>> is
>>>>>>>>>> still
>>>>>>>>>>>> coupling 1) with 2) as well ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Jul 2, 2018 at 1:08 PM, John Roesler <
>>>>> j...@confluent.io
>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> In fact, to push the idea further (which IIRC is what
>>>>> Matthias
>>>>>>>>>> originally
>>>>>>>>>>>>> proposed), if we can accept
>> "Suppression#finalResultsOnly"
>>>>> in
>>>>>> my
>>>>>>>> last
>>>>>>>>>>>>> email, then we could also consider whether to eliminate
>>>>>>>>>>>>> "suppressLateEvents" entirely.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We could always add it later, but you've both expressed
>>>>> doubt
>>>>>>> that
>>>>>>>>>> there
>>>>>>>>>>>>> are practical use cases for it outside of final-results.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -John
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Jul 2, 2018 at 12:27 PM John Roesler <
>>>>>> j...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi again, Guozhang ;) Here's the second part of my
>>>>> response...
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It seems like your main concern is: "if I'm a user who
>>>>> wants
>>>>>>> final
>>>>>>>>>>>> update
>>>>>>>>>>>>>> semantics, how complicated is it for me to get it?"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think we have to assume that people don't always have
>>>>> time
>>>>>> to
>>>>>>>>> become
>>>>>>>>>>>>>> deeply familiar with all the nuances of a programming
>>>>>>> environment
>>>>>>>>>>>> before
>>>>>>>>>>>>>> they use it. Especially if they're evaluating several
>>>>>> frameworks
>>>>>>>> for
>>>>>>>>>>>>> their
>>>>>>>>>>>>>> use case, it's very valuable to make it as obvious as
>>>>> possible
>>>>>>> how
>>>>>>>>> to
>>>>>>>>>>>>>> accomplish various computations with Streams.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> To me the biggest question is whether with a fresh
>>>>>> perspective,
>>>>>>>>> people
>>>>>>>>>>>>>> would say "oh, I get it, I have to bound my lateness
>> and
>>>>>>> suppress
>>>>>>>>>>>>>> intermediate updates, and of course I'll get only the
>>> final
>>>>>>>>> result!",
>>>>>>>>>>>> or
>>>>>>>>>>>>> if
>>>>>>>>>>>>>> it's more like "wtf? all I want is the final result,
>> what
>>>>> are
>>>>>>> all
>>>>>>>>>> these
>>>>>>>>>>>>>> parameters?".
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I was talking with Matthias a while back, and he had an
>>>>> idea
>>>>>>> that
>>>>>>>> I
>>>>>>>>>>>> think
>>>>>>>>>>>>>> can help, which is to essentially set up a final-result
>>>>> recipe
>>>>>>> in
>>>>>>>>>>>>> addition
>>>>>>>>>>>>>> to the raw parameters. I previously thought that it
>>>>> wouldn't
>>>>>> be
>>>>>>>>>>>> possible
>>>>>>>>>>>>> to
>>>>>>>>>>>>>> restrict its usage to Windowed KTables, but thinking
>>> about
>>>>> it
>>>>>>>> again
>>>>>>>>>>>> this
>>>>>>>>>>>>>> weekend, I have a couple of ideas:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ================
>>>>>>>>>>>>>> = 1. Static Wrapper =
>>>>>>>>>>>>>> ================
>>>>>>>>>>>>>> We can define an extra static function that "wraps" a
>>>>> KTable
>>>>>>> with
>>>>>>>>>>>>>> final-result semantics.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public static <K extends Windowed, V> KTable<K, V>
>>>>>>>> finalResultsOnly(
>>>>>>>>>>>>>>   final KTable<K, V> windowedKTable,
>>>>>>>>>>>>>>   final Duration maxAllowedLateness,
>>>>>>>>>>>>>>   final Suppression.BufferFullStrategy
>>> bufferFullStrategy)
>>>>> {
>>>>>>>>>>>>>>     return windowedKTable.suppress(
>>>>>>>>>>>>>>         Suppression.suppressLateEvents(
>>> maxAllowedLateness)
>>>>>>>>>>>>>>                    .suppressIntermediateEvents(
>>>>>>>>>>>>>>                      IntermediateSuppression
>>>>>>>>>>>>>>                        .emitAfter(maxAllowedLateness)
>>>>>>>>>>>>>>                        .bufferFullStrategy(
>>>>>> bufferFullStrategy)
>>>>>>>>>>>>>>                    )
>>>>>>>>>>>>>>     );
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Because windowedKTable is a parameter, the static
>>> function
>>>>> can
>>>>>>>>> easily
>>>>>>>>>>>>>> impose an extra bound on the key type, that it extends
>>>>>> Windowed.
>>>>>>>>> This
>>>>>>>>>>>>> would
>>>>>>>>>>>>>> make "final results only" only available on windowed
>>>>> ktables.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here's how it would look to use:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> final KTable<Windowed<Integer>, Long> windowCounts =
>> ...
>>>>>>>>>>>>>> final KTable<Windowed<Integer>, Long> finalCounts =
>>>>>>>>>>>>>>   finalResultsOnly(
>>>>>>>>>>>>>>     windowCounts,
>>>>>>>>>>>>>>     Duration.ofMinutes(10),
>>>>>>>>>>>>>>     Suppression.BufferFullStrategy.SHUT_DOWN
>>>>>>>>>>>>>>   );
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Trying to use it on a non-windowed KTable yields:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Error:(129, 35) java: method finalResultsOnly in class
>>>>>>>>>>>>>>> org.apache.kafka.streams.kstream.internals.
>>>>>> KTableAggregateTest
>>>>>>>>>> cannot
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> applied to given types;
>>>>>>>>>>>>>>>   required:
>>>>>>>>>>>>>>>
>> org.apache.kafka.streams.kstream.KTable<K,V>,java.time.
>>>>>>>>>>>>> Duration,org.apache.kafka.streams.kstream.Suppression.
>>>>>>>>>> BufferFullStrategy
>>>>>>>>>>>>>>>   found:
>>>>>>>>>>>>>>> org.apache.kafka.streams.kstream.KTable<java.lang.
>>>>>>>>>>>>> String,java.lang.String>,java.time.Duration,org.apache.
>>>>>>>>>>>>> kafka.streams.kstream.Suppression.BufferFullStrategy
>>>>>>>>>>>>>>>   reason: inference variable K has incompatible bounds
>>>>>>>>>>>>>>>     equality constraints: java.lang.String
>>>>>>>>>>>>>>>     upper bounds:
>>>>> org.apache.kafka.streams.kstream.Windowed
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> =================================================
>>>>>>>>>>>>>> = 2. Add <K,V> parameters and recipe method to
>>> Suppression
>>>>> =
>>>>>>>>>>>>>> =================================================
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> By adding K,V parameters to Suppression, we can
>> provide a
>>>>>>>> similarly
>>>>>>>>>>>>>> bounded config method directly on the Suppression
>> class:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public static <K extends Windowed, V> Suppression<K, V>
>>>>>>>>>>>>>> finalResultsOnly(final Duration maxAllowedLateness,
>> final
>>>>>>>>>>>>>> BufferFullStrategy bufferFullStrategy) {
>>>>>>>>>>>>>>     return Suppression
>>>>>>>>>>>>>>         .<K, V>suppressLateEvents(maxAllowedLateness)
>>>>>>>>>>>>>>         .suppressIntermediateEvents(
>>> IntermediateSuppression
>>>>>>>>>>>>>>             .emitAfter(maxAllowedLateness)
>>>>>>>>>>>>>>             .bufferFullStrategy(bufferFullStrategy)
>>>>>>>>>>>>>>         );
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Then, here's how it would look to use it:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> final KTable<Windowed<Integer>, Long> windowCounts =
>> ...
>>>>>>>>>>>>>> final KTable<Windowed<Integer>, Long> finalCounts =
>>>>>>>>>>>>>>   windowCounts.suppress(
>>>>>>>>>>>>>>     Suppression.finalResultsOnly(
>>>>>>>>>>>>>>       Duration.ofMinutes(10)
>>>>>>>>>>>>>>       Suppression.BufferFullStrategy.SHUT_DOWN
>>>>>>>>>>>>>>     )
>>>>>>>>>>>>>>   );
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Trying to use it on a non-windowed ktable yields:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Error:(127, 35) java: method finalResultsOnly in class
>>>>>>>>>>>>>>> org.apache.kafka.streams.kstream.Suppression<K,V>
>>> cannot
>>>>> be
>>>>>>>> applied
>>>>>>>>>> to
>>>>>>>>>>>>>>> given types;
>>>>>>>>>>>>>>>   required:
>>>>>>>>>>>>>>> java.time.Duration,org.apache.kafka.streams.kstream.
>>>>>>>>>>>>> Suppression.BufferFullStrategy
>>>>>>>>>>>>>>>   found:
>>>>>>>>>>>>>>> java.time.Duration,org.apache.kafka.streams.kstream.
>>>>>>>>>>>>> Suppression.BufferFullStrategy
>>>>>>>>>>>>>>>   reason: explicit type argument java.lang.String does
>>> not
>>>>>>>> conform
>>>>>>>>> to
>>>>>>>>>>>>>>> declared bound(s)
>>>>> org.apache.kafka.streams.kstream.Windowed
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ============
>>>>>>>>>>>>>> = Downsides =
>>>>>>>>>>>>>> ============
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Of course, there's a downside either way:
>>>>>>>>>>>>>> * for 1:  this "wrapper" interaction would be the first
>>> in
>>>>> the
>>>>>>>> DSL.
>>>>>>>>> Is
>>>>>>>>>>>> it
>>>>>>>>>>>>>> too strange, and how discoverable would it be?
>>>>>>>>>>>>>> * for 2: adding those type parameters to Suppression
>> will
>>>>>> force
>>>>>>>> all
>>>>>>>>>>>>>> callers to provide them in the event of a chained
>>>>> construction
>>>>>>>>> because
>>>>>>>>>>>>> Java
>>>>>>>>>>>>>> doesn't do RHS recursive type inference. This is
>> already
>>>>>> visible
>>>>>>>> in
>>>>>>>>>>>> other
>>>>>>>>>>>>>> parts of the Streams DSL. For example, often calls to
>>>>>>> Materialized
>>>>>>>>>>>>> builders
>>>>>>>>>>>>>> have to provide seemingly obvious type bounds.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ============
>>>>>>>>>>>>>> = Conclusion =
>>>>>>>>>>>>>> ============
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think option 2 is more "normal" and discoverable. It
>>> does
>>>>>>> have a
>>>>>>>>>>>>>> downside, but it's one that's pre-existing elsewhere in
>>> the
>>>>>> DSL.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> WDYT? Would the addition of this "recipe" method to
>>>>>> Suppression
>>>>>>>>>> resolve
>>>>>>>>>>>>>> your concern?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks again,
>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang <
>>>>>>> wangg...@gmail.com
>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi John,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding the metrics: yeah I think I'm with you that
>>> the
>>>>>>> dropped
>>>>>>>>>>>>> records
>>>>>>>>>>>>>>> due to window retention or emit suppression policies
>>>>> should
>>>>>> be
>>>>>>>>>>>> recorded
>>>>>>>>>>>>>>> differently, and using this KIP's proposed metric
>> would
>>> be
>>>>>>> fine.
>>>>>>>> If
>>>>>>>>>>>> you
>>>>>>>>>>>>>>> also think we can use this KIP's proposed metrics to
>>> cover
>>>>>> the
>>>>>>>>> window
>>>>>>>>>>>>>>> retention cased skipping records, then we can include
>>> the
>>>>>>> changes
>>>>>>>>> in
>>>>>>>>>>>>> this
>>>>>>>>>>>>>>> KIP as well.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding the current proposal, I'm actually not too
>>>>> worried
>>>>>>>> about
>>>>>>>>>> the
>>>>>>>>>>>>>>> inconsistency between query semantics and downstream
>>> emit
>>>>>>>>> semantics.
>>>>>>>>>>>> For
>>>>>>>>>>>>>>> queries, we will always return the current running
>>>>> results of
>>>>>>> the
>>>>>>>>>>>>> windows,
>>>>>>>>>>>>>>> being it partial or final results depending on the
>>> window
>>>>>>>> retention
>>>>>>>>>>>> time
>>>>>>>>>>>>>>> anyways, which has nothing to do whether the emitted
>>>>> stream
>>>>>>>> should
>>>>>>>>> be
>>>>>>>>>>>>> one
>>>>>>>>>>>>>>> final output per key or not. I also agree that having
>> a
>>>>>> unified
>>>>>>>>>>>>> operation
>>>>>>>>>>>>>>> is generally better for users to focus on leveraging
>>> that
>>>>> one
>>>>>>>> only
>>>>>>>>>>>> than
>>>>>>>>>>>>>>> learning about two set of operations. The only
>> question
>>> I
>>>>> had
>>>>>>> is,
>>>>>>>>> for
>>>>>>>>>>>>>>> final
>>>>>>>>>>>>>>> updates of window stores, if it is a bit awkward to
>>>>>> understand
>>>>>>>> the
>>>>>>>>>>>>>>> configuration combo. Thinking about this more, I think
>>> my
>>>>>> root
>>>>>>>>> worry
>>>>>>>>>>>> in
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> "suppressLateEvents" call for windowed tables, since
>>> from
>>>>> a
>>>>>>> user
>>>>>>>>>>>>>>> perspective: if my retention time is X which means
>> "pay
>>>>> the
>>>>>>> cost
>>>>>>>> to
>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>> late records up to X to still be applied updating the
>>>>>> tables",
>>>>>>>> why
>>>>>>>>>>>>> would I
>>>>>>>>>>>>>>> ever want to suppressLateEvents by Y ( < X), to say
>> "do
>>>>> not
>>>>>>> send
>>>>>>>>> the
>>>>>>>>>>>>>>> updates up to Y, which means the downstream operator
>> or
>>>>> sink
>>>>>>>> topic
>>>>>>>>>> for
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>> stream would actually see a truncated update stream
>>> while
>>>>>> I've
>>>>>>>> paid
>>>>>>>>>>>>> larger
>>>>>>>>>>>>>>> cost for that"; and of course, Y > X would not make
>>> sense
>>>>>>> either
>>>>>>>> as
>>>>>>>>>>>> you
>>>>>>>>>>>>>>> would not see any updates later than X anyways. So in
>>>>> all, my
>>>>>>>>> feeling
>>>>>>>>>>>> is
>>>>>>>>>>>>>>> that it makes less sense for windowed table's
>>>>>>>> "suppressLateEvents"
>>>>>>>>>>>> with
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>> parameter that is not equal to the window retention,
>> and
>>>>>>> opening
>>>>>>>>> the
>>>>>>>>>>>>> door
>>>>>>>>>>>>>>> in the current proposal may confuse people with that.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Again, above is just a subjective opinion and probably
>>> we
>>>>> can
>>>>>>>> also
>>>>>>>>>>>> bring
>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>> some scenarios that users does want to set X != Y..
>> but
>>>>>>>> personally
>>>>>>>>> I
>>>>>>>>>>>>> feel
>>>>>>>>>>>>>>> that even if the semantics for this scenario if
>>> intuitive
>>>>> for
>>>>>>>> user
>>>>>>>>> to
>>>>>>>>>>>>>>> understand, doe that really make sense and should we
>>>>> really
>>>>>>> open
>>>>>>>>> the
>>>>>>>>>>>>> door
>>>>>>>>>>>>>>> for it. So I think maybe separating the final update
>> in
>>> a
>>>>>>>> separate
>>>>>>>>>>>> API's
>>>>>>>>>>>>>>> benefits may overwhelm the advantage of having one
>>> uniform
>>>>>>>>>> definition.
>>>>>>>>>>>>> And
>>>>>>>>>>>>>>> for my alternative proposal, the rationale was from
>> both
>>>>> my
>>>>>>>> concern
>>>>>>>>>>>>> about
>>>>>>>>>>>>>>> "suppressLateEvents" for windowed store, and Matthias'
>>>>>> question
>>>>>>>>> about
>>>>>>>>>>>>>>> "suppressLateEvents" for non-windowed stores, that if
>> it
>>>>> is
>>>>>>> less
>>>>>>>>>>>>>>> meaningful
>>>>>>>>>>>>>>> for both, we can consider removing it completely and
>>> only
>>>>> do
>>>>>>>>>>>>>>> "IntermediateSuppression" in Suppress instead.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So I'd summarize my thoughts in the following
>> questions:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. Does "suppressLateEvents" with parameter Y != X
>>> (window
>>>>>>>>> retention
>>>>>>>>>>>>> time)
>>>>>>>>>>>>>>> for windowed stores make sense in practice?
>>>>>>>>>>>>>>> 2. Does "suppressLateEvents" with any parameter Y for
>>>>>>>> non-windowed
>>>>>>>>>>>>> stores
>>>>>>>>>>>>>>> make sense in practice?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck <
>>>>>>> bbej...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the explanation, that does make sense.  I
>>> have
>>>>>> some
>>>>>>>>>>>>>>> questions on
>>>>>>>>>>>>>>>> operations, but I'll just wait for the PR and tests.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 8:14 PM John Roesler <
>>>>>>> j...@confluent.io
>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Bill,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the review!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Your question is very much applicable to the KIP and
>>>>> not at
>>>>>>> all
>>>>>>>>> an
>>>>>>>>>>>>>>>>> implementation detail. Thanks for bringing it up.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm proposing not to change the existing caches and
>>>>>>>>> configurations
>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>> (for now).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Imagine you have a topology like this:
>>>>>>>>>>>>>>>>> commit.interval.ms = 100
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> (ktable1 (cached)) -> (suppress emitAfter 200)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The first ktable (ktable1) will respect the commit
>>>>> interval
>>>>>>> and
>>>>>>>>>>>>> buffer
>>>>>>>>>>>>>>>>> events for 100ms before logging, storing, or
>>> forwarding
>>>>>> them
>>>>>>>>>>>> (IIRC).
>>>>>>>>>>>>>>>>> Therefore, the second ktable (suppress) will only
>> see
>>>>> the
>>>>>>>> events
>>>>>>>>>>>> at
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> rate
>>>>>>>>>>>>>>>>> of once per 100ms. It will apply its own buffering,
>>> and
>>>>>> emit
>>>>>>>> once
>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>> 200ms
>>>>>>>>>>>>>>>>> This case is pretty trivial because the suppress
>> time
>>>>> is a
>>>>>>>>>>>> multiple
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> commit interval.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When it's not an integer multiple, you'll get
>> behavior
>>>>> like
>>>>>>> in
>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> marble
>>>>>>>>>>>>>>>>> diagram:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)->
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [ KTable caching with commit interval = 2 ]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> <--------(k:2)---------(k:4)---------(k:6)->
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>       [ suppress with emitAfter = 3 ]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> <---------------(k:2)----------------(k:6)->
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If this behavior isn't desired (for example, if you
>>>>> wanted
>>>>>> to
>>>>>>>>> emit
>>>>>>>>>>>>>>> (k:3)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> time 3, I'd recommend setting the
>>>>>> "cache.max.bytes.buffering"
>>>>>>>> to
>>>>>>>>> 0
>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>> modifying the topology to disable caching. Then, the
>>>>>> behavior
>>>>>>>> is
>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>> simply determined just by the suppress operator.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Does that seem right to you?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regarding the changelogs, because the suppression
>>>>> operator
>>>>>>>> hangs
>>>>>>>>>>>>> onto
>>>>>>>>>>>>>>>>> events for a while, it will need its own changelog.
>>> The
>>>>>>>> changelog
>>>>>>>>>>>>>>>>> should represent the current state of the buffer at
>>> all
>>>>>>> times.
>>>>>>>> So
>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> suppress operator sees (k:2), for example, it will
>> log
>>>>>> (k:2).
>>>>>>>>> When
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> later gets to time 3, it's time to emit (k:2)
>>>>> downstream.
>>>>>>>> Because
>>>>>>>>>>>> k
>>>>>>>>>>>>>>> is no
>>>>>>>>>>>>>>>>> longer buffered, the suppress operator will log
>>>>> (k:null).
>>>>>>> Thus,
>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>> recovering,
>>>>>>>>>>>>>>>>> it can rebuild the buffer by reading its changelog.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What do you think about this?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck <
>>>>>>> bbej...@gmail.com
>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi John,  thanks for the KIP.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Early on in the KIP, you mention the current
>>> approaches
>>>>>> for
>>>>>>>>>>>>>>> controlling
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> rate of downstream records from a KTable, cache
>> size
>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> commit time.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Will these configuration parameters still be in
>>> effect
>>>>> for
>>>>>>>>>>>> tables
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> don't use suppression?  For tables taking advantage
>>> of
>>>>>>>>>>>>> suppression,
>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>> these configurations have no impact?
>>>>>>>>>>>>>>>>>> This last question may be to implementation
>> specific
>>>>> but
>>>>>> if
>>>>>>>> the
>>>>>>>>>>>>>>>> requested
>>>>>>>>>>>>>>>>>> suppression time is longer than the specified
>> commit
>>>>> time,
>>>>>>>> will
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> latest
>>>>>>>>>>>>>>>>>> record in the suppression buffer get stored in a
>>>>>> changelog?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 3:04 PM John Roesler <
>>>>>>>> j...@confluent.io
>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for the feedback, Matthias,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> It seems like in straightforward relational
>>> processing
>>>>>>> cases,
>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>> make sense to bound the lateness of KTables. In
>>>>> general,
>>>>>> it
>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>> better
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> have "guard rails" in place that make it easier to
>>>>> write
>>>>>>>>>>>>> sensible
>>>>>>>>>>>>>>>>>> programs
>>>>>>>>>>>>>>>>>>> than insensible ones.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> But I'm still going to argue in favor of keeping
>> it
>>>>> for
>>>>>> all
>>>>>>>>>>>>>>> KTables
>>>>>>>>>>>>>>>> ;)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1. I believe it is simpler to understand the
>>> operator
>>>>> if
>>>>>> it
>>>>>>>>>>>> has
>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>> uniform
>>>>>>>>>>>>>>>>>>> definition, regardless of context. It's well
>> defined
>>>>> and
>>>>>>>>>>>>> intuitive
>>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>> will happen when you use late-event suppression
>> on a
>>>>>>> KTable,
>>>>>>>>>>>> so
>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>> nothing surprising or dangerous will happen in
>> that
>>>>> case.
>>>>>>>> From
>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>> perspective, having two sets of allowed operations
>>> is
>>>>>>>> actually
>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>> increase
>>>>>>>>>>>>>>>>>>> in cognitive complexity.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2. To me, it's not crazy to use the operator this
>>> way.
>>>>>> For
>>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>> lieu
>>>>>>>>>>>>>>>>>>> of full-featured timestamp semantics, I can
>>> implement
>>>>>> MVCC
>>>>>>>>>>>>>>> behavior
>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>> building a KTable by
>>>>> "suppressLateEvents(Duration.ZERO)".
>>>>>> I
>>>>>>>>>>>>>>> suspect
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> there are other, non-obvious applications of
>>>>> suppressing
>>>>>>> late
>>>>>>>>>>>>>>> events
>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>> KTables.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 3. Not to get too much into implementation details
>>> in
>>>>> a
>>>>>> KIP
>>>>>>>>>>>>>>>> discussion,
>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>> if we did want to make late-event suppression
>>>>> available
>>>>>>> only
>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>> windowed
>>>>>>>>>>>>>>>>>>> KTables, we have two enforcement options:
>>>>>>>>>>>>>>>>>>>   a. check when we build the topology - this would
>>> be
>>>>>>> simple
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> implement,
>>>>>>>>>>>>>>>>>>> but would be a runtime check. Hopefully, people
>>> write
>>>>>> tests
>>>>>>>>>>>> for
>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>> topology before deploying them, so the feedback
>> loop
>>>>>> isn't
>>>>>>>>>>>>>>>>> instantaneous,
>>>>>>>>>>>>>>>>>>> but it's not too long either.
>>>>>>>>>>>>>>>>>>>   b. add a new WindowedKTable type - this would
>> be a
>>>>>>> compile
>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>> check,
>>>>>>>>>>>>>>>>>>> but would also be substantial increase of both
>>>>> interface
>>>>>>> and
>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>> complexity.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We should definitely strive to have guard rails
>>>>>> protecting
>>>>>>>>>>>>> against
>>>>>>>>>>>>>>>>>>> surprising or dangerous behavior. Protecting
>> against
>>>>>>> programs
>>>>>>>>>>>>>>> that we
>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>> currently predict is a lesser benefit, and I think
>>> we
>>>>> can
>>>>>>> put
>>>>>>>>>>>> up
>>>>>>>>>>>>>>>> guard
>>>>>>>>>>>>>>>>>>> rails on a case-by-case basis for that. It seems
>>> like
>>>>> the
>>>>>>>>>>>>>>> increase in
>>>>>>>>>>>>>>>>>>> cognitive (and potentially code and interface)
>>>>> complexity
>>>>>>>>>>>> makes
>>>>>>>>>>>>> me
>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> should skip this case.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax <
>>>>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for the KIP John.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> One initial comments about the last example
>>> "Bounded
>>>>>>>>>>>>> lateness":
>>>>>>>>>>>>>>>> For a
>>>>>>>>>>>>>>>>>>>> non-windowed KTable bounding the lateness does
>> not
>>>>>> really
>>>>>>>>>>>> make
>>>>>>>>>>>>>>>> sense,
>>>>>>>>>>>>>>>>>>>> does it?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thus, I am wondering if we should allow
>>>>>>>>>>>> `suppressLateEvents()`
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>> case? It seems to be better to only allow it for
>>>>>>>>>>>>>>> windowed-KTables.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 6/27/18 8:53 AM, Ted Yu wrote:
>>>>>>>>>>>>>>>>>>>>> I noticed this (lack of primary parameter) as
>>> well.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> What you gave as new example is semantically the
>>>>> same
>>>>>> as
>>>>>>>>>>>>> what
>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>> suggested.
>>>>>>>>>>>>>>>>>>>>> So it is good by me.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler <
>>>>>>>>>>>>>>> j...@confluent.io
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks for taking look, Ted,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I agree this is a departure from the
>> conventions
>>> of
>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>> DSL.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Most of our config objects have one or two
>>>>> "required"
>>>>>>>>>>>>>>>> parameters,
>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>> fit
>>>>>>>>>>>>>>>>>>>>>> naturally with the static factory method
>>> approach.
>>>>>>>>>>>>>>> TimeWindow,
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>>>>>>>>>> requires a size parameter, so we can naturally
>>> say
>>>>>>>>>>>>>>>>>>> TimeWindows.of(size).
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I think in the case of a suppression, there's
>>>>> really
>>>>>> no
>>>>>>>>>>>>>>> "core"
>>>>>>>>>>>>>>>>>>>> parameter,
>>>>>>>>>>>>>>>>>>>>>> and "Suppression.of()" seems sillier than "new
>>>>>>>>>>>>>>> Suppression()". I
>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> Suppression.of(duration) would be ambiguous,
>>> since
>>>>>> there
>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> many
>>>>>>>>>>>>>>>>>>>> durations
>>>>>>>>>>>>>>>>>>>>>> that we can configure.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> However, thinking about it again, I suppose
>> that
>>> I
>>>>> can
>>>>>>>>>>>> give
>>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>>>>>> configuration method a static version, which
>>> would
>>>>> let
>>>>>>>>>>>> you
>>>>>>>>>>>>>>>> replace
>>>>>>>>>>>>>>>>>>> "new
>>>>>>>>>>>>>>>>>>>>>> Suppression()." with "Suppression." in all the
>>>>>> examples.
>>>>>>>>>>>>>>>>> Basically,
>>>>>>>>>>>>>>>>>>>> instead
>>>>>>>>>>>>>>>>>>>>>> of "of()", we'd support any of the methods I
>>>>> listed.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> For example:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> windowCounts
>>>>>>>>>>>>>>>>>>>>>>     .suppress(
>>>>>>>>>>>>>>>>>>>>>>         Suppression
>>>>>>>>>>>>>>>>>>>>>>             .suppressLateEvents(Duration.
>>>>>> ofMinutes(10))
>>>>>>>>>>>>>>>>>>>>>>             .suppressIntermediateEvents(
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>> IntermediateSuppression.emitAfter(Duration.ofMinutes(
>>>>>> 10))
>>>>>>>>>>>>>>>>>>>>>>             )
>>>>>>>>>>>>>>>>>>>>>>     );
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Does that seem better?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu <
>>>>>>>>>>>>> yuzhih...@gmail.com
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I started to read this KIP which contains a
>> lot
>>> of
>>>>>>>>>>>>>>> materials.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> One suggestion:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>     .suppress(
>>>>>>>>>>>>>>>>>>>>>>>         new Suppression()
>>>>>>>>>>>>>>>>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to