[ 
https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854577#comment-17854577
 ] 

Matthias J. Sax commented on KAFKA-16925:
-----------------------------------------

{quote}My understanding is that the processor here wants to know the maximum 
observed stream time so far (including the current record), and 
context.currentStreamTimeMs() is set from the timestamp of input records. For 
me that's the information this processor is looking for ?
{quote}
Assume there is an upstream processor with a state store, and the upstream 
processor puts input record into the state store, but does yet forward them 
(could be more than one record). Thus, task stream-time would account for the 
records buffered in the state store. Assume that this advanced stream-time 
might close some downstream window in the join. Later, the upstream processor 
takes out the records from the store and finally forwards them into the join – 
the join might now incorrectly drop the record if the window got already closed 
(via a rebalance and re-init of stream-time would would jump forward now) if it 
would use the task tracked stream-time, even if there is no actual reason to 
drop the records. Thus, the task's tracked stream-time should not be used IMHO 
to init the operator's stream-time.

Putting the information into the store is usually tricky. – We need to find a 
key for the stream time which is guaranteed to be unique and does not conflict 
with any other data key from the actually records we process... It would become 
a very operator (or store) specific solution, which seems to be not generic 
(especially for PAPI usage it would be nice to have something generic inside 
the KS runtime that is independent of the use processors and/or state stores).
{quote}I am not talking about general case, where I agree that it is better to 
put the logic in the store itself or any other way that is reusable in 
processors.
{quote}
Not sure if we would need to put it into the store – IMHO, we could also track 
stream-time inside the KS runtime on a per-operator basis, but as it might 
result in some overhead, we might want to make it opt-in and disable by 
default. The tracked time is stores via commit offset metadata, snd this 
metadata size can become an issue if too large (thus we should try to keep it 
as small as possible). – What actually raises a more general question anyway: 
this metadata did grow over time, and we did consider to maybe even add an 
in-memory metadata store (backed by a topic) to get rid of the commit metadata 
overhead...

I would really like to get to an more holistic solution (we did bolt on too 
many island solutions over the years, what it totally ok for some time, but I 
believe we are reaching a state where it's worth to build a generic good 
solution, and stop adding more island solutions...)

> stream-table join does not immediately forward expired records on restart
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-16925
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16925
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Ayoub Omari
>            Assignee: Ayoub Omari
>            Priority: Major
>
> [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
>  introduced grace period for KStreamKTableJoin. This allows to join a stream 
> to a KTable backed by a Versioned state store.
> Upon receiving a record, it is put in a buffer until grace period is elapsed. 
> When the grace period elapses, the record is joined with its most recent 
> match from the versioned state store.
> +Late records+ are +not+ put in the buffer and are immediately joined.
>  
> {code:java}
> If the grace period is non zero, the record will enter a stream buffer and 
> will dequeue when the record timestamp is less than or equal to stream time 
> minus the grace period.  Late records, out of the grace period, will be 
> executed right as they come in. (KIP-923){code}
>  
> However, this is not the case today on rebalance or restart. The reason is 
> that observedStreamTime is taken from the underlying state store which looses 
> this information on rebalance/restart:  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164]
>  
> If the task restarts and receives an expired record, the buffer considers 
> that this record has the maximum stream time observed so far, and puts it in 
> the buffer instead of immediately joining it.
>  
> {*}Example{*}:
>  * Grace period = 60s
>  * KTable contains (key, rightValue)
>  
> +Normal scenario+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> streamInput2 (key, value2) <--- time = T - 60s : immediately joined // 
> streamTime = T{code}
>  
> +Scenario with rebalance+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> // --- rebalance ---
> streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime 
> = T - 60s{code}
>  
> The processor should use currentStreamTime from Context instead. Which is 
> recovered on restart.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to