Hey Jonathan, What's the status of this KIP? I was just in a discussion about suppress, and it sparked my memory of this idea.
Thanks, -John On Mon, Mar 11, 2019 at 4:39 PM John Roesler <j...@confluent.io> wrote: > > Thanks for the response, Matthias, I agree on both of these points. > > I didn't mean to question whether we should discuss it; we should, since as > you point out both points affect the behavior of the API. > > Regarding checking system time, > After reviewing the motivation of the KIP, it seems like a lower bound on how > long to suppress updates should be sufficient. This is reinforced by the fact > that the proposed behavior is to emit only when processing new data. Since > this is the proposed behavior, it should be fine to use the system time we > already checked at the start of the processing loop. Practically speaking, a > "best effort lower bound"-type guarantee might be a good starting point. It > gives us the flexibility to implement it efficiently, and we can always > tighten the bound later, if there are requests to do so. > > Regarding flushing on shutdown, can you elaborate of the motivation for doing > so? > > Thanks, > -John > > On Mon, Mar 11, 2019 at 3:13 PM Matthias J. Sax <matth...@confluent.io> wrote: >> >> I agree that there are multiple ways how to avoid calling >> `System.currentTimeMillis()`. However, the KIP needs to define the >> public contract to explain users what behavior they can expect (the >> simplest thing might be to say, it's based on `punctuation()` schedule >> -- not sure if this is desired or not). >> >> Similarly, the question about "why should we slush on shutdown" is part >> of the contract and multiple ways how to design it seem possible. >> >> >> >> -Matthias >> >> On 3/11/19 8:30 AM, John Roesler wrote: >> > Hey, all, just chiming in to keep the discussion moving... >> > >> > Regarding whether to flush or not on shutdown, I'm curious why we *would* >> > flush... >> > The record cache does this, but that's because it's not durable. The >> > suppression buffer is already backed by a changelog specifically so that it >> > can provide exactly the timing you configure, and not have to emit early >> > just because the commit interval is short or the task is migrated. So, >> > regardless of the commit interval or application lifecycle, if I tell >> > suppression to wait 5 minutes before emitting, it'll wait 5 minutes. It >> > seems asymmetric for wall-clock suppression to behave differently. >> > >> > Regarding checking wall-clock time, yes, it can be expensive, but there are >> > a number of ways we can cope with it without introducing a complicated >> > algorithm: >> > * use nano time >> > * check the wall-clock once per batch and set it on the processor context >> > in org.apache.kafka.streams.processor.internals.StreamThread#runOnce (we >> > already check system time here anyway) >> > * maybe just do the naive thing and measure the overhead. I.e., maybe we >> > should benchmark the implementation anyway to look for this or other >> > bottlenecks, and fix performance problem in the order they appear. >> > >> > Thoughts? >> > >> > Thanks, >> > -John >> > >> > On Mon, Feb 25, 2019 at 4:36 PM jonathangor...@newrelic.com < >> > jonathangor...@newrelic.com> wrote: >> > >> >> On 2019/02/21 02:19:27, "Matthias J. Sax" <matth...@confluent.io> wrote: >> >>> thanks for the KIP. Corner case question: >> >>> >> >>> What happens if an application is stopped an restarted? >> >>> >> >>> - Should suppress() flush all records (would be _before_ the time >> >> elapsed)? >> >>> - Or should it preserve buffered records and reload on restart? For >> >>> this case, should the record be flushed on reload (elapsed time is >> >>> unknown) or should we reset the timer to zero? >> >> >> >> My opinion is that we should aim for simplicity for the first >> >> implementation of this feature: Flush all the records on shutdown. If >> >> there's demand in the future for strict adherence on shutdown we can >> >> implement them as extra params to Suppressed api. >> >> >> >>> What is unclear to me atm, is the use-case you anticipate. If you assume >> >>> a live run of an applications, event-time and processing-time should be >> >>> fairly identical (at least with regard to data rates). Thus, suppress() >> >>> on event-time should give you about the same behavior as wall-clock >> >>> time? If you disagree, can you elaborate? >> >> >> >> Imagine a session window where you aggregate 10K events that usually occur >> >> within 2-3 seconds of each other (event time). However, they are ingested >> >> in batches of 1000 or so, spread out over 2-3 minutes (ingest time), and >> >> not necessarily in order. It's important for us to be able to publish this >> >> aggregate in real-time as we get new data (every 10 seconds) to keep our >> >> time to glass low, but our data store is non-updateable so we'd like to >> >> limit the number of aggregates we publish. >> >> >> >> If you imagine a case where all the event batches arrive in reverse order >> >> for one particular session window, then once the stream time advances past >> >> the suppression threshold, we could publish an aggregate update for each >> >> newly received event. >> >> >> >>> This leave the case for data reprocessing, for which event-time advances >> >>> much faster than wall-clock time. Is this the target use-case? >> >> >> >> No, see above. >> >> >> >>> About the implementation: checking wall-clock time is an expensive >> >>> system call, so I am little worried about run-time overhead. This seems >> >>> not to be an implementation detail and thus, it might be worth to >> >>> includes is in the discussion. The question is, how strict the guarantee >> >>> when records should be flushed should be. Assume you set a timer of 1 >> >>> seconds, and you have a data rate of 1000 records per second, with each >> >>> record arriving one ms after the other all each with different key. To >> >>> flush this data "correctly" we would need to check wall-clock time very >> >>> millisecond... Thoughts? >> >>> >> >>> (We don't need to dive into all details, but a high level discussion >> >>> about the desired algorithm and guarantees would be good to have IMHO.) >> >> >> >> I had never dug into the performance characteristics of >> >> currentTimeMillis() before: >> >> >> >> http://pzemtsov.github.io/2017/07/23/the-slow-currenttimemillis.html >> >> >> >> So if we assume the slow Linux average of 640 ns/call, at a 1000 calls/sec >> >> that's 0.64 ms. Doesn't seem terrible to me but I imagine for some use >> >> cases 1M calls/sec might be reasonable and now we're up to 0.64s just in >> >> system time checking. Perhaps we add some logic that calculates the rate >> >> of >> >> data input and if it exceeds some threshold we only check the time every n >> >> records? The trick there I suppose is for very bursty traffic you could >> >> exceed and then wait too long to trigger another check. Maybe we store a >> >> moving average? Or perhaps this is getting too complicated? >> >> >> >> >> >> >> > >>