[
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16009365#comment-16009365
]
Michal Borowiecki commented on KAFKA-3514:
------------------------------------------
I've created KAFKA-5233 to track work related to KIP-138. As noted above, the
considerations on this ticket span beyond the scope of KIP-138, which is
agnostic to how the stream time gets advanced.
> Stream timestamp computation needs some further thoughts
> --------------------------------------------------------
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Guozhang Wang
> Assignee: Eno Thereska
> Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as
> selecting which stream to process next (i.e. best effort stream
> synchronization). And it is defined as the smallest timestamp over all
> partitions in the task's partition group. This results in two unintuitive
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for
> a period of time, and hence keep being process until that late record. For
> example take two partitions within the same task annotated by their
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9
> until the record itself is dequeued and processed, then stream B will be
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced,
> and hence the task timestamp as well since it is the smallest among all
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here:
> http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor)
> {quote}
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall
> clock time
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not
> send any messages. This is ok according to the documentation, because
> there is not any new messages to trigger the punctuate() call. When the
> first few messages arrives after a restart the sending (point 3. above) I
> see the following sequence of method calls:
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> What I would expect instead is that punctuate() is called first and then
> process() is called on the messages, because the first message's timestamp
> is already 3 seconds older then the last punctuate() was called, so the
> first message belongs after the 3 punctuate() calls.
> {quote}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)