cadonna commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r897789473


##########
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##########
@@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws 
Exception {
         kafkaStreams.resume();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
 
-        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+    }
+
+    @Test
+    public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.start();
+        kafkaStreams2.start();
+
+        waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), 
State.RUNNING, STARTUP_TIMEOUT);
+
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+
+        kafkaStreams.close();
+        kafkaStreams2.close();
+
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.cleanUp();
+        kafkaStreams2.cleanUp();
+
+        kafkaStreams.pause();
+        kafkaStreams2.pause();
+        kafkaStreams.start();
+        kafkaStreams2.start();
+
+        waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), 
State.REBALANCING, STARTUP_TIMEOUT);
+
+        assertTrue(kafkaStreams.allLocalStorePartitionLags().isEmpty());
+        assertTrue(kafkaStreams2.allLocalStorePartitionLags().isEmpty());

Review Comment:
   Why are you verifying for emptiness? I would expect that there are entries 
for the state stores with a lag greater than 0.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -479,6 +485,47 @@ public void restore(final Map<TaskId, Task> tasks) {
         }
     }
 
+    private void updateStandbyPartitions(final Map<TaskId, Task> tasks,

Review Comment:
   Do not forget to rename this method to something more meaningful. 
   Proposal: `pauseResumePartitions()`



##########
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##########
@@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws 
Exception {
         kafkaStreams.resume();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
 
-        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+    }
+
+    @Test
+    public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);

Review Comment:
   If you do not use standby tasks, there is no reason to use two Kafka Streams 
clients. I would propose to use one standby only for this test. For that you 
need to set `num.standby.replicas` to 1. That has the effect that one client 
gets the active store assigned and the other gets the standby store assigned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to