[ 
https://issues.apache.org/jira/browse/KAFKA-14624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17676938#comment-17676938
 ] 

Balaji Rao commented on KAFKA-14624:
------------------------------------

{quote}
Thanks for reporting this issue – will need to think about the details more 
(and look into you project that reproduces it) 
{quote}

Thank you for taking a look. The project is very crude, sorry! I'll add a 
README tomorrow.

Could the entire cache be invalidated in one step after restore (akin to the 
flushing - does that happen before a standby becomes active?), is it ?

{quote}
If the task/partition moves from 1 to 2, the task on one should be closed and 
the cache should be gone? Or are you saying the task on instance 1 becomes a 
standby?
{quote}

Yes, exactly - the task becomes standby on instance 1 when the partition moves 
to instance 2.



> State restoration is broken with standby tasks and cache-enabled stores in 
> processor API
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14624
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14624
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.3.1
>            Reporter: Balaji Rao
>            Priority: Major
>
> I found that cache-enabled state stores in PAPI with standby tasks sometimes 
> returns stale data when a partition moves from one app instance to another 
> and back. [Here's|https://github.com/balajirrao/kafka-streams-multi-runner] a 
> small project that I used to reproduce the issue.
> I dug around a bit and it seems like it's a bug in standby task state 
> restoration when caching is enabled. If a partition moves from instance 1 to 
> 2 and then back to instance 1,  since the `CachingKeyValueStore` doesn't 
> register a restore callback, it can return potentially stale data for 
> non-dirty keys. 
> I could fix the issue by modifying the `CachingKeyValueStore` to register a 
> restore callback in which the cache restored keys are added to the cache. Is 
> this fix in the right direction?
> {code:java}
>         // register the store
>         context.register(
>                 root,
>                 (RecordBatchingStateRestoreCallback) records -> {
>                     for (final ConsumerRecord<byte[], byte[]> record : 
> records) {
>                         put(Bytes.wrap(record.key()), record.value());
>                     }
>                 }
>         );
> {code}
>  
> I would like to contribute a fix, if I can get some help!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to