[ https://issues.apache.org/jira/browse/KAFKA-14624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17676938#comment-17676938 ]
Balaji Rao commented on KAFKA-14624: ------------------------------------ {quote} Thanks for reporting this issue – will need to think about the details more (and look into you project that reproduces it) {quote} Thank you for taking a look. The project is very crude, sorry! I'll add a README tomorrow. Could the entire cache be invalidated in one step after restore (akin to the flushing - does that happen before a standby becomes active?), is it ? {quote} If the task/partition moves from 1 to 2, the task on one should be closed and the cache should be gone? Or are you saying the task on instance 1 becomes a standby? {quote} Yes, exactly - the task becomes standby on instance 1 when the partition moves to instance 2. > State restoration is broken with standby tasks and cache-enabled stores in > processor API > ---------------------------------------------------------------------------------------- > > Key: KAFKA-14624 > URL: https://issues.apache.org/jira/browse/KAFKA-14624 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.3.1 > Reporter: Balaji Rao > Priority: Major > > I found that cache-enabled state stores in PAPI with standby tasks sometimes > returns stale data when a partition moves from one app instance to another > and back. [Here's|https://github.com/balajirrao/kafka-streams-multi-runner] a > small project that I used to reproduce the issue. > I dug around a bit and it seems like it's a bug in standby task state > restoration when caching is enabled. If a partition moves from instance 1 to > 2 and then back to instance 1, since the `CachingKeyValueStore` doesn't > register a restore callback, it can return potentially stale data for > non-dirty keys. > I could fix the issue by modifying the `CachingKeyValueStore` to register a > restore callback in which the cache restored keys are added to the cache. Is > this fix in the right direction? > {code:java} > // register the store > context.register( > root, > (RecordBatchingStateRestoreCallback) records -> { > for (final ConsumerRecord<byte[], byte[]> record : > records) { > put(Bytes.wrap(record.key()), record.value()); > } > } > ); > {code} > > I would like to contribute a fix, if I can get some help! -- This message was sent by Atlassian Jira (v8.20.10#820010)