lucasbru commented on code in PR #12773:
URL: https://github.com/apache/kafka/pull/12773#discussion_r1008163226


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -990,6 +991,10 @@ public void unregister(final Collection<TopicPartition> 
revokedChangelogs) {
         }
 
         removeChangelogsFromRestoreConsumer(revokedInitializedChangelogs);
+
+        if (changelogs.isEmpty()) {
+            state = ChangelogReaderState.ACTIVE_RESTORING;

Review Comment:
   Yeah, this change is maybe a bit confusing without further context because 
you only see the fix, not really where the bug is happening. 
   
   This fixes a bug that is only triggered in the new code path. So there is no 
risk that somebody in the wild will run into it. Basically, the bug is 
triggered if you `transitToUpdateStandby` before making sure that your 
`ChangeLogReader` is in `ACTIVE_RESTORING` mode. The old code path just calls 
`enforceRestoreActive` no matter what, while the new code path doesn't - if you 
happen to remove all tasks and then insert a standby task. I considered also 
calling `enforceRestoreActive` in the new path, just in case, but I think it is 
cleaner to have a well-defined invariant for this class (which is 
`changelogs.isEmpty` implies `state=ACTIVE_RESTORING`). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to