[ https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782178#comment-16782178 ]
John Roesler commented on KAFKA-7994: ------------------------------------- I did some mental math when I made that last change to stream time. It's funny, I keep proving the math to myself, but I can't shake the feeling that there might be something wrong with it... Thanks for bringing this up, the more we go over it, the better. It's always possible I've made an arithmetic error, but I don't think we'll ever emit earlier than necessary, and we shouldn't emit later than necessary either. The stream-time variable is initialized to -1, which is smaller than any record timestamp (because we're not considering negative timestamps yet), so it's also smaller than any "real" stream time. Records are only emitted from the suppression when a certain time passes *after* the record time. I.e., (ST is stream time, BR is a buffered record, D is suppression duration) when ST >= BR.time + D . Neither BR.time nor D can be negative, so that expression is never true for ST = -1 (the initial, "unknown time", value). Therefore, resetting stream time to -1 at any point is safe, in that it can never cause premature evictions. It wouldn't be terrible if we sometimes held on to records a little longer than necessary, but actually, I think this won't happen either. Stream time advances when we process new records through the Suppress processor, and the processing of a new record isn't complete until we emit all of the buffered records that should be emitted as a result of the stream time advancement that record causes. Therefore, we have an invariant that, if the processing of record R is complete, ST >= R.time and the buffer contains only buffered records BR such that BR.time > ST - D . Now, if we restart after processing R, we'll forget all about that prior stream time, and just take the next record's (NR) timestamp as the new stream time. NR.time can either be larger or smaller than the previous ST. If it's larger, it's equivalent to a normal increment of stream time, and all the normal logic applies. The unusual situation is that if NR.time is smaller than the previous ST. This means the new stream time ST' is smaller than the prior stream time. If we hadn't had the "reset", what would have happened? Since ST' = max(ST, NR.time) and NR.time < ST, we get ST' = ST. The stream time doesn't advance at all, and we should not emit anything. So whether we reset the stream time to -1 or preserve the prior ST, the exact same records will be emitted or not. I wrote the above out long form to try and eliminate any hand-waving from my thinking about this. Is there any problem with the reasoning? If not, then we have a proof that resetting the stream time to -1 has no impact on suppress. > Improve Stream-Time for rebalances and restarts > ----------------------------------------------- > > Key: KAFKA-7994 > URL: https://issues.apache.org/jira/browse/KAFKA-7994 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Matthias J. Sax > Priority: Major > > We compute a per-partition partition-time as the maximum timestamp over all > records processed so far. Furthermore, we use partition-time to compute > stream-time for each task as maximum over all partition-times (for all > corresponding task partitions). This stream-time is used to make decisions > about processing out-of-order records or drop them if they are late (ie, > timestamp < stream-time - grace-period). > During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, > -1) for tasks that are newly created (or migrated). In net effect, we forget > current stream-time for this case what may lead to non-deterministic behavior > if we stop processing right before a late record, that would be dropped if we > continue processing, but is not dropped after rebalance/restart. Let's look > at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and > the following records (timestamps in parenthesis): > > {code:java} > r1(0) r2(5) r3(11) r4(2){code} > In the example, stream-time advances as 0, 5, 11, 11 and thus record `r4` is > dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or > rebalance after processing `r3` but before processing `r4`, we would > reinitialize stream-time as -1, and thus would process `r4` on restart/after > rebalance. The problem is, that stream-time does advance differently from a > global point of view: 0, 5, 11, 2. > > Note, this is a corner case, because if we would stop processing one record > earlier, ie, after processing `r2` but before processing `r3`, stream-time > would be advance correctly from a global point of view. > A potential fix would be, to store latest observed partition-time in the > metadata of committed offsets. Thus way, on restart/rebalance we can > re-initialize time correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)