[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782178#comment-16782178
 ] 

John Roesler commented on KAFKA-7994:
-------------------------------------

I did some mental math when I made that last change to stream time. It's funny, 
I keep proving the math to myself, but I can't shake the feeling that there 
might be something wrong with it... Thanks for bringing this up, the more we go 
over it, the better.

 

It's always possible I've made an arithmetic error, but I don't think we'll 
ever emit earlier than necessary, and we shouldn't emit later than necessary 
either.

The stream-time variable is initialized to -1, which is smaller than any record 
timestamp (because we're not considering negative timestamps yet), so it's also 
smaller than any "real" stream time.

Records are only emitted from the suppression when a certain time passes 
*after* the record time. I.e., (ST is stream time, BR is a buffered record, D 
is suppression duration) when ST >= BR.time + D . Neither BR.time nor D can be 
negative, so that expression is never true for ST = -1 (the initial, "unknown 
time", value). Therefore, resetting stream time to -1 at any point is safe, in 
that it can never cause premature evictions.

 

It wouldn't be terrible if we sometimes held on to records a little longer than 
necessary, but actually, I think this won't happen either.

Stream time advances when we process new records through the Suppress 
processor, and the processing of a new record isn't complete until we emit all 
of the buffered records that should be emitted as a result of the stream time 
advancement that record causes. Therefore, we have an invariant that, if the 
processing of record R is complete, ST >= R.time and the buffer contains only 
buffered records BR such that BR.time > ST - D .

Now, if we restart after processing R, we'll forget all about that prior stream 
time, and just take the next record's (NR) timestamp as the new stream time. 
NR.time can either be larger or smaller than the previous ST. If it's larger, 
it's equivalent to a normal increment of stream time, and all the normal logic 
applies. The unusual situation is that if NR.time is smaller than the previous 
ST. This means the new stream time ST' is smaller than the prior stream time. 
If we hadn't had the "reset", what would have happened? Since ST' = max(ST, 
NR.time) and NR.time < ST, we get ST' = ST. The stream time doesn't advance at 
all, and we should not emit anything.

So whether we reset the stream time to -1 or preserve the prior ST, the exact 
same records will be emitted or not.

 

I wrote the above out long form to try and eliminate any hand-waving from my 
thinking about this. Is there any problem with the reasoning? If not, then we 
have a proof that resetting the stream time to -1 has no impact on suppress. 

> Improve Stream-Time for rebalances and restarts
> -----------------------------------------------
>
>                 Key: KAFKA-7994
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7994
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to