mjsax commented on code in PR #12654: URL: https://github.com/apache/kafka/pull/12654#discussion_r1091058442
########## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ########## @@ -37,6 +37,9 @@ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * Also note that standby tasks restoration process are not monitored via this interface, since a standby task keep Review Comment: not: missing `<p>` between the paragraphs. -- Seems they are also missing on other places in this file -- can we fix all of them? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -988,6 +988,18 @@ public void unregister(final Collection<TopicPartition> revokedChangelogs) { if (changelogMetadata != null) { if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) { revokedInitializedChangelogs.add(partition); + + // if the changelog is still in REGISTERED, it means it has not initialized and started Review Comment: > if the changelog is still in REGISTERED We check `!changelogMetadata.state().equals(ChangelogState.REGISTERED)` above, so it seem we _are_ not in REGISTRED state here. Should the comment go somewhere else, or not use `if...` but say: `because...` ? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -988,6 +988,18 @@ public void unregister(final Collection<TopicPartition> revokedChangelogs) { if (changelogMetadata != null) { if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) { revokedInitializedChangelogs.add(partition); + + // if the changelog is still in REGISTERED, it means it has not initialized and started + // restoring yet, and hence the corresponding onRestoreStart was not called; in this case + // we should not call onRestorePaused either Review Comment: nit `onRestorePaused` -> `onRestoreSuspended` ########## streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java: ########## @@ -37,6 +37,7 @@ public class MockStateRestoreListener implements StateRestoreListener { public static final String RESTORE_START = "restore_start"; public static final String RESTORE_BATCH = "restore_batch"; public static final String RESTORE_END = "restore_end"; + public static final String RESTORE_PAUSED = "restore_paused"; Review Comment: nit: rename -> `RESTORE_SUSPENDED` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ########## @@ -197,6 +198,49 @@ public void shouldNotRegisterStoreWithoutMetadata() { () -> changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager)); } + @Test + public void shouldSupportUnregisterChangelogBeforeCompletion() { Review Comment: We should only call this is we are in `RESTORING` state, right? If we are `COMPLETED`, we should not call `suspend` because we did call `end` already? So the test name seems missleading? ########## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ########## @@ -37,6 +37,9 @@ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * Also note that standby tasks restoration process are not monitored via this interface, since a standby task keep Review Comment: > standby tasks restoration process Is "restoration" the best wording? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ########## @@ -197,6 +198,49 @@ public void shouldNotRegisterStoreWithoutMetadata() { () -> changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager)); } + @Test + public void shouldSupportUnregisterChangelogBeforeCompletion() { Review Comment: Should we add a case for a "completed" partition, and that we don't call `suspended` during `unregister` for this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org