[ https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16843540#comment-16843540 ]
Richard Yu commented on KAFKA-7994: ----------------------------------- Ok, after digging around, I figured this much out: # Headers are also stored in ProducerRecords when they are sent over. The headers that are sent in the SinkNode are RecordContext's headers. # However, that does not mean the headers before they are serialized and put into a ProducerRecord, that they cannot be modified. What we can do is add an extra header to the headers we already have (i.e. add a {{new RecordHeader("partition time", Long.toByteArray(long timestamp))}}). # When StreamThread polls for ConsumerRecords to process, from the ConsumerRecords received, we would simply request for the header with the {{"partition time"}} string key. The value stored along side the key is our needed partition timestamp. In effect, this does not require to modify RecordContext, and also allows a quick, cheap way of delivering partition times across tasks. > Improve Stream-Time for rebalances and restarts > ----------------------------------------------- > > Key: KAFKA-7994 > URL: https://issues.apache.org/jira/browse/KAFKA-7994 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Matthias J. Sax > Assignee: Richard Yu > Priority: Major > Attachments: possible-patch.diff > > > We compute a per-partition partition-time as the maximum timestamp over all > records processed so far. Furthermore, we use partition-time to compute > stream-time for each task as maximum over all partition-times (for all > corresponding task partitions). This stream-time is used to make decisions > about processing out-of-order records or drop them if they are late (ie, > timestamp < stream-time - grace-period). > During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, > -1) for tasks that are newly created (or migrated). In net effect, we forget > current stream-time for this case what may lead to non-deterministic behavior > if we stop processing right before a late record, that would be dropped if we > continue processing, but is not dropped after rebalance/restart. Let's look > at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and > the following records (timestamps in parenthesis): > > {code:java} > r1(0) r2(5) r3(11) r4(2){code} > In the example, stream-time advances as 0, 5, 11, 11 and thus record `r4` is > dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or > rebalance after processing `r3` but before processing `r4`, we would > reinitialize stream-time as -1, and thus would process `r4` on restart/after > rebalance. The problem is, that stream-time does advance differently from a > global point of view: 0, 5, 11, 2. > Note, this is a corner case, because if we would stop processing one record > earlier, ie, after processing `r2` but before processing `r3`, stream-time > would be advance correctly from a global point of view. > A potential fix would be, to store latest observed partition-time in the > metadata of committed offsets. Thus way, on restart/rebalance we can > re-initialize time correctly. > Notice that this particular issue applies for all Stream Tasks in the > topology. The further down the DAG records flow, the more likely it is that > the StreamTask will have an incorrect stream time. For instance, if r3 was > filtered out, the tasks receiving the processed records will compute the > stream time as 5 instead of the correct timestamp being 11. This entails us > to also propagate the latest observed partition time as well downstream. > That means the sources located at the head of the topology must forward the > partition time to its subtopologies whenever records are sent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)