[
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16843223#comment-16843223
]
Richard Yu edited comment on KAFKA-7994 at 5/18/19 7:28 PM:
------------------------------------------------------------
Oh, just realized that there should be more changes. Sorry, didn't realize the
scope of the big picture (needed to account for Guozhang's comment)
was (Author: yohan123):
Oh, just realized that there should be more changes. Sorry, didn't realize the
scope of the big picture.
> Improve Stream-Time for rebalances and restarts
> -----------------------------------------------
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Matthias J. Sax
> Assignee: Richard Yu
> Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all
> records processed so far. Furthermore, we use partition-time to compute
> stream-time for each task as maximum over all partition-times (for all
> corresponding task partitions). This stream-time is used to make decisions
> about processing out-of-order records or drop them if they are late (ie,
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie,
> -1) for tasks that are newly created (or migrated). In net effect, we forget
> current stream-time for this case what may lead to non-deterministic behavior
> if we stop processing right before a late record, that would be dropped if we
> continue processing, but is not dropped after rebalance/restart. Let's look
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and
> the following records (timestamps in parenthesis):
>
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11 and thus record `r4` is
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or
> rebalance after processing `r3` but before processing `r4`, we would
> reinitialize stream-time as -1, and thus would process `r4` on restart/after
> rebalance. The problem is, that stream-time does advance differently from a
> global point of view: 0, 5, 11, 2.
>
> Note, this is a corner case, because if we would stop processing one record
> earlier, ie, after processing `r2` but before processing `r3`, stream-time
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the
> metadata of committed offsets. Thus way, on restart/rebalance we can
> re-initialize time correctly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)