lucasbru commented on code in PR #12773: URL: https://github.com/apache/kafka/pull/12773#discussion_r1008163226
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -990,6 +991,10 @@ public void unregister(final Collection<TopicPartition> revokedChangelogs) { } removeChangelogsFromRestoreConsumer(revokedInitializedChangelogs); + + if (changelogs.isEmpty()) { + state = ChangelogReaderState.ACTIVE_RESTORING; Review Comment: Yeah, this change is maybe a bit confusing without further context because you only see the fix, not really where the bug is happening. This fixes a bug that is only triggered in the new code path. So there is no risk that somebody in the wild will run into it. Basically, the bug is triggered if you `transitToUpdateStandby` before making sure that your `ChangeLogReader` is in `ACTIVE_RESTORING` mode. The old code path just calls `enforceRestoreActive` no matter what, while the new code path doesn't - if you happen to remove all tasks and then insert a standby task. I considered also calling `enforceRestoreActive` in the new path, just in case, but I think it is cleaner to have a well-defined invariant for this class (which is `changelogs.isEmpty` implies `state=ACTIVE_RESTORING`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org