[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16843517#comment-16843517
 ] 

Matthias J. Sax commented on KAFKA-7994:
----------------------------------------

> So this might handle how partition time is transmitted.

Not sure what you mean by this? Tasks can exchange data only via repartition 
topics. To me, it's still unclear how an upstream task forwards it's partition 
time to a downstream task?

> Improve Stream-Time for rebalances and restarts
> -----------------------------------------------
>
>                 Key: KAFKA-7994
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7994
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Richard Yu
>            Priority: Major
>         Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to