[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689059#comment-16689059
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---------------------------------------

guozhangwang closed pull request #5915: KAFKA-7192: Wipe out state store if EOS 
is turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5915
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34350c17eb0..c03de2d4a2d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -59,9 +59,14 @@ public StoreChangelogReader(final Consumer<byte[], byte[]> 
consumer,
 
     @Override
     public void register(final StateRestorer restorer) {
-        restorer.setUserRestoreListener(userStateRestoreListener);
-        stateRestorers.put(restorer.partition(), restorer);
-        needsInitializing.put(restorer.partition(), restorer);
+        final StateRestorer existingRestorer = 
stateRestorers.get(restorer.partition());
+        if (existingRestorer == null) {
+            restorer.setUserRestoreListener(userStateRestoreListener);
+            stateRestorers.put(restorer.partition(), restorer);
+            needsInitializing.put(restorer.partition(), restorer);
+        } else {
+            needsInitializing.put(restorer.partition(), existingRestorer);
+        }
     }
 
     /**
@@ -188,7 +193,6 @@ private void startRestoration(final Map<TopicPartition, 
StateRestorer> initializ
                 
restorer.setCheckpointOffset(consumer.position(restoringPartition));
 
                 task.reinitializeStateStoresForPartitions(restoringPartition);
-                stateRestorers.get(restoringPartition).restoreStarted();
             } else {
                 log.info("Restoring task {}'s state store {} from beginning of 
the changelog {} ", task.id, restorer.storeName(), restoringPartition);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> --------------------------------------------
>
>                 Key: KAFKA-7192
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7192
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>            Reporter: Jon Bates
>            Assignee: Guozhang Wang
>            Priority: Critical
>              Labels: bugs
>             Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to