[ 
https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853815#comment-17853815
 ] 

Matthias J. Sax commented on KAFKA-16925:
-----------------------------------------

Thanks for filing this ticket – it's a know problem, not limited to 
stream-table join.

I don't think that initializing with `context.currentStreamTime()` is the right 
fix though, and would in general prefer to fix this issue across the board, to 
avoid building operator specific island solutions.

> stream-table join does not immediately forward expired records on restart
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-16925
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16925
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Ayoub Omari
>            Assignee: Ayoub Omari
>            Priority: Major
>
> [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
>  introduced grace period for KStreamKTableJoin. This allows to join a stream 
> to a KTable backed by a Versioned state store.
> Upon receiving a record, it is put in a buffer until grace period is elapsed. 
> When the grace period elapses, the record is joined with its most recent 
> match from the versioned state store.
> +Late records+ are +not+ put in the buffer and are immediately joined.
>  
> {code:java}
> If the grace period is non zero, the record will enter a stream buffer and 
> will dequeue when the record timestamp is less than or equal to stream time 
> minus the grace period.  Late records, out of the grace period, will be 
> executed right as they come in. (KIP-923){code}
>  
> However, this is not the case today on rebalance or restart. The reason is 
> that observedStreamTime is taken from the underlying state store which looses 
> this information on rebalance/restart:  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164]
>  
> If the task restarts and receives an expired record, the buffer considers 
> that this record has the maximum stream time observed so far, and puts it in 
> the buffer instead of immediately joining it.
>  
> {*}Example{*}:
>  * Grace period = 60s
>  * KTable contains (key, rightValue)
>  
> +Normal scenario+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> streamInput2 (key, value2) <--- time = T - 60s : immediately joined // 
> streamTime = T{code}
>  
> +Scenario with rebalance+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> // --- rebalance ---
> streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime 
> = T - 60s{code}
>  
> The processor should use currentStreamTime from Context instead. Which is 
> recovered on restart.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to