This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 24ea7e0b009 KAFKA-19666: Remove old restoration codepath from 
PauseResumeIntegrationTest [2/N] (#20463)
24ea7e0b009 is described below

commit 24ea7e0b0093999a540620681b9dd199eb588cae
Author: Shashank <[email protected]>
AuthorDate: Mon Sep 8 00:18:12 2025 -0700

    KAFKA-19666: Remove old restoration codepath from 
PauseResumeIntegrationTest [2/N] (#20463)
    
    Clean up `PauseResumeIntegrationTest`
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../integration/PauseResumeIntegrationTest.java    | 63 ++++++++++------------
 1 file changed, 27 insertions(+), 36 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
index 29f5276cfb1..e0f080f8855 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
@@ -43,9 +43,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -122,7 +121,7 @@ public class PauseResumeIntegrationTest {
         appId = safeUniqueTestName(testInfo);
     }
 
-    private Properties props(final boolean stateUpdaterEnabled) {
+    private Properties props() {
         final Properties properties = new Properties();
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
@@ -134,7 +133,6 @@ public class PauseResumeIntegrationTest {
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
-        properties.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled);
         return properties;
     }
 
@@ -151,10 +149,9 @@ public class PauseResumeIntegrationTest {
         IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, 
producerConfig, CLUSTER.time);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldPauseAndResumeKafkaStreams(final boolean 
stateUpdaterEnabled) throws Exception {
-        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+    @Test
+    public void shouldPauseAndResumeKafkaStreams() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
         kafkaStreams.start();
         waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
 
@@ -176,10 +173,9 @@ public class PauseResumeIntegrationTest {
         assertTopicSize(OUTPUT_STREAM_1, 10);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldAllowForTopologiesToStartPaused(final boolean 
stateUpdaterEnabled) throws Exception {
-        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+    @Test
+    public void shouldAllowForTopologiesToStartPaused() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
         kafkaStreams.pause();
         kafkaStreams.start();
         waitForApplicationState(singletonList(kafkaStreams), 
State.REBALANCING, STARTUP_TIMEOUT);
@@ -197,11 +193,10 @@ public class PauseResumeIntegrationTest {
         assertTopicSize(OUTPUT_STREAM_1, 5);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @Test
     @SuppressWarnings("deprecation")
-    public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies(final 
boolean stateUpdaterEnabled) throws Exception {
-        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+    public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws 
Exception {
+        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props());
         final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
         final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
 
@@ -233,11 +228,10 @@ public class PauseResumeIntegrationTest {
         awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @Test
     @SuppressWarnings("deprecation")
-    public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies(final 
boolean stateUpdaterEnabled) throws Exception {
-        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+    public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() 
throws Exception {
+        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props());
         final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
         final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
 
@@ -270,11 +264,10 @@ public class PauseResumeIntegrationTest {
         assertTopicSize(OUTPUT_STREAM_2, 5);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @Test
     @SuppressWarnings("deprecation")
-    public void shouldAllowForNamedTopologiesToStartPaused(final boolean 
stateUpdaterEnabled) throws Exception {
-        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+    public void shouldAllowForNamedTopologiesToStartPaused() throws Exception {
+        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props());
         final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
         final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
 
@@ -301,19 +294,18 @@ public class PauseResumeIntegrationTest {
         awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void pauseResumeShouldWorkAcrossInstances(final boolean 
stateUpdaterEnabled) throws Exception {
+    @Test
+    public void pauseResumeShouldWorkAcrossInstances() throws Exception {
         produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
 
-        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
         kafkaStreams.pause();
         kafkaStreams.start();
 
         waitForApplicationState(singletonList(kafkaStreams), 
State.REBALANCING, STARTUP_TIMEOUT);
         assertTrue(kafkaStreams.isPaused());
 
-        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2, 
stateUpdaterEnabled);
+        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2);
         kafkaStreams2.pause();
         kafkaStreams2.start();
         waitForApplicationState(singletonList(kafkaStreams2), 
State.REBALANCING, STARTUP_TIMEOUT);
@@ -331,12 +323,11 @@ public class PauseResumeIntegrationTest {
         awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void pausedTopologyShouldNotRestoreStateStores(final boolean 
stateUpdaterEnabled) throws Exception {
-        final Properties properties1 = props(stateUpdaterEnabled);
+    @Test
+    public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+        final Properties properties1 = props();
         properties1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
-        final Properties properties2 = props(stateUpdaterEnabled);
+        final Properties properties2 = props();
         properties2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
 
@@ -380,8 +371,8 @@ public class PauseResumeIntegrationTest {
         assertEquals(stateStoreLag1, stateStoreLag2);
     }
 
-    private KafkaStreams buildKafkaStreams(final String outputTopic, final 
boolean stateUpdaterEnabled) {
-        return buildKafkaStreams(outputTopic, props(stateUpdaterEnabled));
+    private KafkaStreams buildKafkaStreams(final String outputTopic) {
+        return buildKafkaStreams(outputTopic, props());
     }
 
     private KafkaStreams buildKafkaStreams(final String outputTopic, final 
Properties properties) {

Reply via email to