Hey Jonathan,

What's the status of this KIP? I was just in a discussion about
suppress, and it sparked my memory of this idea.

Thanks,
-John

On Mon, Mar 11, 2019 at 4:39 PM John Roesler <j...@confluent.io> wrote:
>
> Thanks for the response, Matthias, I agree on both of these points.
>
> I didn't mean to question whether we should discuss it; we should, since as 
> you point out both points affect the behavior of the API.
>
> Regarding checking system time,
> After reviewing the motivation of the KIP, it seems like a lower bound on how 
> long to suppress updates should be sufficient. This is reinforced by the fact 
> that the proposed behavior is to emit only when processing new data. Since 
> this is the proposed behavior, it should be fine to use the system time we 
> already checked at the start of the processing loop. Practically speaking, a 
> "best effort lower bound"-type guarantee might be a good starting point. It 
> gives us the flexibility to implement it efficiently, and we can always 
> tighten the bound later, if there are requests to do so.
>
> Regarding flushing on shutdown, can you elaborate of the motivation for doing 
> so?
>
> Thanks,
> -John
>
> On Mon, Mar 11, 2019 at 3:13 PM Matthias J. Sax <matth...@confluent.io> wrote:
>>
>> I agree that there are multiple ways how to avoid calling
>> `System.currentTimeMillis()`. However, the KIP needs to define the
>> public contract to explain users what behavior they can expect (the
>> simplest thing might be to say, it's based on `punctuation()` schedule
>> -- not sure if this is desired or not).
>>
>> Similarly, the question about "why should we slush on shutdown" is part
>> of the contract and multiple ways how to design it seem possible.
>>
>>
>>
>> -Matthias
>>
>> On 3/11/19 8:30 AM, John Roesler wrote:
>> > Hey, all, just chiming in to keep the discussion moving...
>> >
>> > Regarding whether to flush or not on shutdown, I'm curious why we *would*
>> > flush...
>> > The record cache does this, but that's because it's not durable. The
>> > suppression buffer is already backed by a changelog specifically so that it
>> > can provide exactly the timing you configure, and not have to emit early
>> > just because the commit interval is short or the task is migrated. So,
>> > regardless of the commit interval or application lifecycle, if I tell
>> > suppression to wait 5 minutes before emitting, it'll wait 5 minutes. It
>> > seems asymmetric for wall-clock suppression to behave differently.
>> >
>> > Regarding checking wall-clock time, yes, it can be expensive, but there are
>> > a number of ways we can cope with it without introducing a complicated
>> > algorithm:
>> > * use nano time
>> > * check the wall-clock once per batch and set it on the processor context
>> > in org.apache.kafka.streams.processor.internals.StreamThread#runOnce (we
>> > already check system time here anyway)
>> > * maybe just do the naive thing and measure the overhead. I.e., maybe we
>> > should benchmark the implementation anyway to look for this or other
>> > bottlenecks, and fix performance problem in the order they appear.
>> >
>> > Thoughts?
>> >
>> > Thanks,
>> > -John
>> >
>> > On Mon, Feb 25, 2019 at 4:36 PM jonathangor...@newrelic.com <
>> > jonathangor...@newrelic.com> wrote:
>> >
>> >> On 2019/02/21 02:19:27, "Matthias J. Sax" <matth...@confluent.io> wrote:
>> >>> thanks for the KIP. Corner case question:
>> >>>
>> >>> What happens if an application is stopped an restarted?
>> >>>
>> >>>  - Should suppress() flush all records (would be _before_ the time
>> >> elapsed)?
>> >>>  - Or should it preserve buffered records and reload on restart? For
>> >>> this case, should the record be flushed on reload (elapsed time is
>> >>> unknown) or should we reset the timer to zero?
>> >>
>> >> My opinion is that we should aim for simplicity for the first
>> >> implementation of this feature: Flush all the records on shutdown. If
>> >> there's demand in the future for strict adherence on shutdown we can
>> >> implement them as extra params to Suppressed api.
>> >>
>> >>> What is unclear to me atm, is the use-case you anticipate. If you assume
>> >>> a live run of an applications, event-time and processing-time should be
>> >>> fairly identical (at least with regard to data rates). Thus, suppress()
>> >>> on event-time should give you about the same behavior as wall-clock
>> >>> time? If you disagree, can you elaborate?
>> >>
>> >> Imagine a session window where you aggregate 10K events that usually occur
>> >> within 2-3 seconds of each other (event time). However, they are ingested
>> >> in batches of 1000 or so, spread out over 2-3 minutes (ingest time), and
>> >> not necessarily in order. It's important for us to be able to publish this
>> >> aggregate in real-time as we get new data (every 10 seconds) to keep our
>> >> time to glass low, but our data store is non-updateable so we'd like to
>> >> limit the number of aggregates we publish.
>> >>
>> >> If you imagine a case where all the event batches arrive in reverse order
>> >> for one particular session window, then once the stream time advances past
>> >> the suppression threshold, we could publish an aggregate update for each
>> >> newly received event.
>> >>
>> >>> This leave the case for data reprocessing, for which event-time advances
>> >>> much faster than wall-clock time. Is this the target use-case?
>> >>
>> >> No, see above.
>> >>
>> >>> About the implementation: checking wall-clock time is an expensive
>> >>> system call, so I am little worried about run-time overhead. This seems
>> >>> not to be an implementation detail and thus, it might be worth to
>> >>> includes is in the discussion. The question is, how strict the guarantee
>> >>> when records should be flushed should be. Assume you set a timer of 1
>> >>> seconds, and you have a data rate of 1000 records per second, with each
>> >>> record arriving one ms after the other all each with different key. To
>> >>> flush this data "correctly" we would need to check wall-clock time very
>> >>> millisecond... Thoughts?
>> >>>
>> >>> (We don't need to dive into all details, but a high level discussion
>> >>> about the desired algorithm and guarantees would be good to have IMHO.)
>> >>
>> >> I had never dug into the performance characteristics of
>> >> currentTimeMillis() before:
>> >>
>> >> http://pzemtsov.github.io/2017/07/23/the-slow-currenttimemillis.html
>> >>
>> >> So if we assume the slow Linux average of 640 ns/call, at a 1000 calls/sec
>> >> that's 0.64 ms. Doesn't seem terrible to me but I imagine for some use
>> >> cases 1M calls/sec might be reasonable and now we're up to 0.64s just in
>> >> system time checking. Perhaps we add some logic that calculates the rate 
>> >> of
>> >> data input and if it exceeds some threshold we only check the time every n
>> >> records? The trick there I suppose is for very bursty traffic you could
>> >> exceed and then wait too long to trigger another check. Maybe we store a
>> >> moving average? Or perhaps this is getting too complicated?
>> >>
>> >>
>> >>
>> >
>>

Reply via email to