Thanks for the response, Matthias, I agree on both of these points. I didn't mean to question whether we should discuss it; we should, since as you point out both points affect the behavior of the API.
Regarding checking system time, After reviewing the motivation of the KIP, it seems like a lower bound on how long to suppress updates should be sufficient. This is reinforced by the fact that the proposed behavior is to emit only when processing new data. Since this is the proposed behavior, it should be fine to use the system time we already checked at the start of the processing loop. Practically speaking, a "best effort lower bound"-type guarantee might be a good starting point. It gives us the flexibility to implement it efficiently, and we can always tighten the bound later, if there are requests to do so. Regarding flushing on shutdown, can you elaborate of the motivation for doing so? Thanks, -John On Mon, Mar 11, 2019 at 3:13 PM Matthias J. Sax <matth...@confluent.io> wrote: > I agree that there are multiple ways how to avoid calling > `System.currentTimeMillis()`. However, the KIP needs to define the > public contract to explain users what behavior they can expect (the > simplest thing might be to say, it's based on `punctuation()` schedule > -- not sure if this is desired or not). > > Similarly, the question about "why should we slush on shutdown" is part > of the contract and multiple ways how to design it seem possible. > > > > -Matthias > > On 3/11/19 8:30 AM, John Roesler wrote: > > Hey, all, just chiming in to keep the discussion moving... > > > > Regarding whether to flush or not on shutdown, I'm curious why we *would* > > flush... > > The record cache does this, but that's because it's not durable. The > > suppression buffer is already backed by a changelog specifically so that > it > > can provide exactly the timing you configure, and not have to emit early > > just because the commit interval is short or the task is migrated. So, > > regardless of the commit interval or application lifecycle, if I tell > > suppression to wait 5 minutes before emitting, it'll wait 5 minutes. It > > seems asymmetric for wall-clock suppression to behave differently. > > > > Regarding checking wall-clock time, yes, it can be expensive, but there > are > > a number of ways we can cope with it without introducing a complicated > > algorithm: > > * use nano time > > * check the wall-clock once per batch and set it on the processor context > > in org.apache.kafka.streams.processor.internals.StreamThread#runOnce (we > > already check system time here anyway) > > * maybe just do the naive thing and measure the overhead. I.e., maybe we > > should benchmark the implementation anyway to look for this or other > > bottlenecks, and fix performance problem in the order they appear. > > > > Thoughts? > > > > Thanks, > > -John > > > > On Mon, Feb 25, 2019 at 4:36 PM jonathangor...@newrelic.com < > > jonathangor...@newrelic.com> wrote: > > > >> On 2019/02/21 02:19:27, "Matthias J. Sax" <matth...@confluent.io> > wrote: > >>> thanks for the KIP. Corner case question: > >>> > >>> What happens if an application is stopped an restarted? > >>> > >>> - Should suppress() flush all records (would be _before_ the time > >> elapsed)? > >>> - Or should it preserve buffered records and reload on restart? For > >>> this case, should the record be flushed on reload (elapsed time is > >>> unknown) or should we reset the timer to zero? > >> > >> My opinion is that we should aim for simplicity for the first > >> implementation of this feature: Flush all the records on shutdown. If > >> there's demand in the future for strict adherence on shutdown we can > >> implement them as extra params to Suppressed api. > >> > >>> What is unclear to me atm, is the use-case you anticipate. If you > assume > >>> a live run of an applications, event-time and processing-time should be > >>> fairly identical (at least with regard to data rates). Thus, suppress() > >>> on event-time should give you about the same behavior as wall-clock > >>> time? If you disagree, can you elaborate? > >> > >> Imagine a session window where you aggregate 10K events that usually > occur > >> within 2-3 seconds of each other (event time). However, they are > ingested > >> in batches of 1000 or so, spread out over 2-3 minutes (ingest time), and > >> not necessarily in order. It's important for us to be able to publish > this > >> aggregate in real-time as we get new data (every 10 seconds) to keep our > >> time to glass low, but our data store is non-updateable so we'd like to > >> limit the number of aggregates we publish. > >> > >> If you imagine a case where all the event batches arrive in reverse > order > >> for one particular session window, then once the stream time advances > past > >> the suppression threshold, we could publish an aggregate update for each > >> newly received event. > >> > >>> This leave the case for data reprocessing, for which event-time > advances > >>> much faster than wall-clock time. Is this the target use-case? > >> > >> No, see above. > >> > >>> About the implementation: checking wall-clock time is an expensive > >>> system call, so I am little worried about run-time overhead. This seems > >>> not to be an implementation detail and thus, it might be worth to > >>> includes is in the discussion. The question is, how strict the > guarantee > >>> when records should be flushed should be. Assume you set a timer of 1 > >>> seconds, and you have a data rate of 1000 records per second, with each > >>> record arriving one ms after the other all each with different key. To > >>> flush this data "correctly" we would need to check wall-clock time very > >>> millisecond... Thoughts? > >>> > >>> (We don't need to dive into all details, but a high level discussion > >>> about the desired algorithm and guarantees would be good to have IMHO.) > >> > >> I had never dug into the performance characteristics of > >> currentTimeMillis() before: > >> > >> http://pzemtsov.github.io/2017/07/23/the-slow-currenttimemillis.html > >> > >> So if we assume the slow Linux average of 640 ns/call, at a 1000 > calls/sec > >> that's 0.64 ms. Doesn't seem terrible to me but I imagine for some use > >> cases 1M calls/sec might be reasonable and now we're up to 0.64s just in > >> system time checking. Perhaps we add some logic that calculates the > rate of > >> data input and if it exceeds some threshold we only check the time > every n > >> records? The trick there I suppose is for very bursty traffic you could > >> exceed and then wait too long to trigger another check. Maybe we store a > >> moving average? Or perhaps this is getting too complicated? > >> > >> > >> > > > >