[ https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853815#comment-17853815 ]
Matthias J. Sax commented on KAFKA-16925: ----------------------------------------- Thanks for filing this ticket – it's a know problem, not limited to stream-table join. I don't think that initializing with `context.currentStreamTime()` is the right fix though, and would in general prefer to fix this issue across the board, to avoid building operator specific island solutions. > stream-table join does not immediately forward expired records on restart > ------------------------------------------------------------------------- > > Key: KAFKA-16925 > URL: https://issues.apache.org/jira/browse/KAFKA-16925 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Ayoub Omari > Assignee: Ayoub Omari > Priority: Major > > [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join] > introduced grace period for KStreamKTableJoin. This allows to join a stream > to a KTable backed by a Versioned state store. > Upon receiving a record, it is put in a buffer until grace period is elapsed. > When the grace period elapses, the record is joined with its most recent > match from the versioned state store. > +Late records+ are +not+ put in the buffer and are immediately joined. > > {code:java} > If the grace period is non zero, the record will enter a stream buffer and > will dequeue when the record timestamp is less than or equal to stream time > minus the grace period. Late records, out of the grace period, will be > executed right as they come in. (KIP-923){code} > > However, this is not the case today on rebalance or restart. The reason is > that observedStreamTime is taken from the underlying state store which looses > this information on rebalance/restart: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164] > > If the task restarts and receives an expired record, the buffer considers > that this record has the maximum stream time observed so far, and puts it in > the buffer instead of immediately joining it. > > {*}Example{*}: > * Grace period = 60s > * KTable contains (key, rightValue) > > +Normal scenario+ > {code:java} > streamInput1 (key, value1) <--- time = T : put in buffer > streamInput2 (key, value2) <--- time = T - 60s : immediately joined // > streamTime = T{code} > > +Scenario with rebalance+ > {code:java} > streamInput1 (key, value1) <--- time = T : put in buffer > // --- rebalance --- > streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime > = T - 60s{code} > > The processor should use currentStreamTime from Context instead. Which is > recovered on restart. > -- This message was sent by Atlassian Jira (v8.20.10#820010)