This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 24ea7e0b009 KAFKA-19666: Remove old restoration codepath from
PauseResumeIntegrationTest [2/N] (#20463)
24ea7e0b009 is described below
commit 24ea7e0b0093999a540620681b9dd199eb588cae
Author: Shashank <[email protected]>
AuthorDate: Mon Sep 8 00:18:12 2025 -0700
KAFKA-19666: Remove old restoration codepath from
PauseResumeIntegrationTest [2/N] (#20463)
Clean up `PauseResumeIntegrationTest`
Reviewers: Lucas Brutschy <[email protected]>
---
.../integration/PauseResumeIntegrationTest.java | 63 ++++++++++------------
1 file changed, 27 insertions(+), 36 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
index 29f5276cfb1..e0f080f8855 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
@@ -43,9 +43,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
import java.util.ArrayList;
@@ -122,7 +121,7 @@ public class PauseResumeIntegrationTest {
appId = safeUniqueTestName(testInfo);
}
- private Properties props(final boolean stateUpdaterEnabled) {
+ private Properties props() {
final Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
@@ -134,7 +133,6 @@ public class PauseResumeIntegrationTest {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
- properties.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED,
stateUpdaterEnabled);
return properties;
}
@@ -151,10 +149,9 @@ public class PauseResumeIntegrationTest {
IntegrationTestUtils.produceKeyValuesSynchronously(topic, records,
producerConfig, CLUSTER.time);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldPauseAndResumeKafkaStreams(final boolean
stateUpdaterEnabled) throws Exception {
- kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+ @Test
+ public void shouldPauseAndResumeKafkaStreams() throws Exception {
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
kafkaStreams.start();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING,
STARTUP_TIMEOUT);
@@ -176,10 +173,9 @@ public class PauseResumeIntegrationTest {
assertTopicSize(OUTPUT_STREAM_1, 10);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldAllowForTopologiesToStartPaused(final boolean
stateUpdaterEnabled) throws Exception {
- kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+ @Test
+ public void shouldAllowForTopologiesToStartPaused() throws Exception {
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
kafkaStreams.pause();
kafkaStreams.start();
waitForApplicationState(singletonList(kafkaStreams),
State.REBALANCING, STARTUP_TIMEOUT);
@@ -197,11 +193,10 @@ public class PauseResumeIntegrationTest {
assertTopicSize(OUTPUT_STREAM_1, 5);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @Test
@SuppressWarnings("deprecation")
- public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies(final
boolean stateUpdaterEnabled) throws Exception {
- streamsNamedTopologyWrapper = new
KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+ public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws
Exception {
+ streamsNamedTopologyWrapper = new
KafkaStreamsNamedTopologyWrapper(props());
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
@@ -233,11 +228,10 @@ public class PauseResumeIntegrationTest {
awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @Test
@SuppressWarnings("deprecation")
- public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies(final
boolean stateUpdaterEnabled) throws Exception {
- streamsNamedTopologyWrapper = new
KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+ public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies()
throws Exception {
+ streamsNamedTopologyWrapper = new
KafkaStreamsNamedTopologyWrapper(props());
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
@@ -270,11 +264,10 @@ public class PauseResumeIntegrationTest {
assertTopicSize(OUTPUT_STREAM_2, 5);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @Test
@SuppressWarnings("deprecation")
- public void shouldAllowForNamedTopologiesToStartPaused(final boolean
stateUpdaterEnabled) throws Exception {
- streamsNamedTopologyWrapper = new
KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
+ public void shouldAllowForNamedTopologiesToStartPaused() throws Exception {
+ streamsNamedTopologyWrapper = new
KafkaStreamsNamedTopologyWrapper(props());
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
@@ -301,19 +294,18 @@ public class PauseResumeIntegrationTest {
awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void pauseResumeShouldWorkAcrossInstances(final boolean
stateUpdaterEnabled) throws Exception {
+ @Test
+ public void pauseResumeShouldWorkAcrossInstances() throws Exception {
produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
- kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
+ kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
kafkaStreams.pause();
kafkaStreams.start();
waitForApplicationState(singletonList(kafkaStreams),
State.REBALANCING, STARTUP_TIMEOUT);
assertTrue(kafkaStreams.isPaused());
- kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2,
stateUpdaterEnabled);
+ kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2);
kafkaStreams2.pause();
kafkaStreams2.start();
waitForApplicationState(singletonList(kafkaStreams2),
State.REBALANCING, STARTUP_TIMEOUT);
@@ -331,12 +323,11 @@ public class PauseResumeIntegrationTest {
awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void pausedTopologyShouldNotRestoreStateStores(final boolean
stateUpdaterEnabled) throws Exception {
- final Properties properties1 = props(stateUpdaterEnabled);
+ @Test
+ public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+ final Properties properties1 = props();
properties1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
- final Properties properties2 = props(stateUpdaterEnabled);
+ final Properties properties2 = props();
properties2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
@@ -380,8 +371,8 @@ public class PauseResumeIntegrationTest {
assertEquals(stateStoreLag1, stateStoreLag2);
}
- private KafkaStreams buildKafkaStreams(final String outputTopic, final
boolean stateUpdaterEnabled) {
- return buildKafkaStreams(outputTopic, props(stateUpdaterEnabled));
+ private KafkaStreams buildKafkaStreams(final String outputTopic) {
+ return buildKafkaStreams(outputTopic, props());
}
private KafkaStreams buildKafkaStreams(final String outputTopic, final
Properties properties) {