mjsax commented on code in PR #12654:
URL: https://github.com/apache/kafka/pull/12654#discussion_r1091058442


##########
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##########
@@ -37,6 +37,9 @@
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * Also note that standby tasks restoration process are not monitored via this 
interface, since a standby task keep

Review Comment:
   not: missing `<p>` between the paragraphs. -- Seems they are also missing on 
other places in this file -- can we fix all of them?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -988,6 +988,18 @@ public void unregister(final Collection<TopicPartition> 
revokedChangelogs) {
             if (changelogMetadata != null) {
                 if 
(!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
                     revokedInitializedChangelogs.add(partition);
+
+                    // if the changelog is still in REGISTERED, it means it 
has not initialized and started

Review Comment:
   > if the changelog is still in REGISTERED
   
   We check `!changelogMetadata.state().equals(ChangelogState.REGISTERED)` 
above, so it seem we _are_ not in REGISTRED state here. Should the comment go 
somewhere else, or not use `if...` but say: `because...` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -988,6 +988,18 @@ public void unregister(final Collection<TopicPartition> 
revokedChangelogs) {
             if (changelogMetadata != null) {
                 if 
(!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
                     revokedInitializedChangelogs.add(partition);
+
+                    // if the changelog is still in REGISTERED, it means it 
has not initialized and started
+                    // restoring yet, and hence the corresponding 
onRestoreStart was not called; in this case
+                    // we should not call onRestorePaused either

Review Comment:
   nit `onRestorePaused` -> `onRestoreSuspended`



##########
streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java:
##########
@@ -37,6 +37,7 @@ public class MockStateRestoreListener implements 
StateRestoreListener {
     public static final String RESTORE_START = "restore_start";
     public static final String RESTORE_BATCH = "restore_batch";
     public static final String RESTORE_END = "restore_end";
+    public static final String RESTORE_PAUSED = "restore_paused";

Review Comment:
   nit: rename -> `RESTORE_SUSPENDED`



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -197,6 +198,49 @@ public void shouldNotRegisterStoreWithoutMetadata() {
             () -> changelogReader.register(new 
TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager));
     }
 
+    @Test
+    public void shouldSupportUnregisterChangelogBeforeCompletion() {

Review Comment:
   We should only call this is we are in `RESTORING` state, right? If we are 
`COMPLETED`, we should not call `suspend` because we did call `end` already? So 
the test name seems missleading?



##########
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##########
@@ -37,6 +37,9 @@
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * Also note that standby tasks restoration process are not monitored via this 
interface, since a standby task keep

Review Comment:
   > standby tasks restoration process
   
   Is "restoration" the best wording?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -197,6 +198,49 @@ public void shouldNotRegisterStoreWithoutMetadata() {
             () -> changelogReader.register(new 
TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager));
     }
 
+    @Test
+    public void shouldSupportUnregisterChangelogBeforeCompletion() {

Review Comment:
   Should we add a case for a "completed" partition, and that we don't call 
`suspended` during `unregister` for this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to