This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dd824a2e748 KAFKA-19666: Remove old restoration codepath from
RestoreIntegrationTest [4/N] (#20498)
dd824a2e748 is described below
commit dd824a2e748d7a61f02d3647a6606b8395b9220d
Author: Shashank <[email protected]>
AuthorDate: Thu Sep 11 07:06:25 2025 -0700
KAFKA-19666: Remove old restoration codepath from RestoreIntegrationTest
[4/N] (#20498)
Clean up `RestoreIntegrationTest.java`
Reviewers: Lucas Brutschy <[email protected]>
---
.../integration/RestoreIntegrationTest.java | 74 ++++++----------------
1 file changed, 21 insertions(+), 53 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 7370d488757..e85ac344157 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -43,7 +43,6 @@ import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -77,7 +76,6 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -161,8 +159,8 @@ public class RestoreIntegrationTest {
CLUSTER.createTopic(inputStream, 2, 1);
}
- private Properties props(final boolean stateUpdaterEnabled) {
- return
props(mkObjectProperties(mkMap(mkEntry(InternalConfig.STATE_UPDATER_ENABLED,
stateUpdaterEnabled))));
+ private Properties props() {
+ return props(mkObjectProperties(mkMap()));
}
private Properties props(final Properties extraProperties) {
@@ -267,17 +265,12 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @CsvSource({
- "true,true",
- "true,false",
- "false,true",
- "false,false"
- })
- public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final
boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
+ @ValueSource(booleans = {true, false})
+ public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final
boolean useNewProtocol) throws Exception {
final AtomicInteger numReceived = new AtomicInteger(0);
final Topology topology = new Topology();
- final Properties props = props(stateUpdaterEnabled);
+ final Properties props = props();
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
@@ -338,17 +331,12 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @CsvSource({
- "true,true",
- "true,false",
- "false,true",
- "false,false"
- })
- public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean
stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
+ @ValueSource(booleans = {true, false})
+ public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean
useNewProtocol) throws Exception {
final AtomicInteger numReceived = new AtomicInteger(0);
final StreamsBuilder builder = new StreamsBuilder();
- final Properties props = props(stateUpdaterEnabled);
+ final Properties props = props();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
@@ -413,20 +401,15 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @CsvSource({
- "true,true",
- "true,false",
- "false,true",
- "false,false"
- })
- public void shouldRestoreStateFromChangelogTopic(final boolean
stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
+ @ValueSource(booleans = {true, false})
+ public void shouldRestoreStateFromChangelogTopic(final boolean
useNewProtocol) throws Exception {
final String changelog = appId + "-store-changelog";
CLUSTER.createTopic(changelog, 2, 1);
final AtomicInteger numReceived = new AtomicInteger(0);
final StreamsBuilder builder = new StreamsBuilder();
- final Properties props = props(stateUpdaterEnabled);
+ final Properties props = props();
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
@@ -474,13 +457,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @CsvSource({
- "true,true",
- "true,false",
- "false,true",
- "false,false"
- })
- public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean
stateUpdaterEnabled, final boolean useNewProtocol) throws InterruptedException {
+ @ValueSource(booleans = {true, false})
+ public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean
useNewProtocol) throws InterruptedException {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, Integer> stream = builder.stream(inputStream);
@@ -490,7 +468,7 @@ public class RestoreIntegrationTest {
Integer::sum,
Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as("reduce-store").withLoggingDisabled()
);
- final Properties props = props(stateUpdaterEnabled);
+ final Properties props = props();
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
}
@@ -503,13 +481,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @CsvSource({
- "true,true",
- "true,false",
- "false,true",
- "false,false"
- })
- public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean
stateUpdaterEnabled, final boolean useNewProtocol) throws InterruptedException {
+ @ValueSource(booleans = {true, false})
+ public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean
useNewProtocol) throws InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(inputStream,
asList(KeyValue.pair(1, 1),
KeyValue.pair(2, 2),
@@ -537,7 +510,7 @@ public class RestoreIntegrationTest {
final Topology topology = streamsBuilder.build();
- final Properties props = props(stateUpdaterEnabled);
+ final Properties props = props();
if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name());
@@ -558,13 +531,8 @@ public class RestoreIntegrationTest {
}
@ParameterizedTest
- @CsvSource({
- "true,true",
- "true,false",
- "false,true",
- "false,false"
- })
- public void
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final
boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
+ @ValueSource(booleans = {true, false})
+ public void
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final
boolean useNewProtocol) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
builder.table(
inputStream,
@@ -576,7 +544,7 @@ public class RestoreIntegrationTest {
CLUSTER.setGroupStandbyReplicas(appId, 1);
}
- final Properties props1 = props(stateUpdaterEnabled);
+ final Properties props1 = props();
props1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props1.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(appId + "-1").getPath());
if (useNewProtocol) {
@@ -585,7 +553,7 @@ public class RestoreIntegrationTest {
purgeLocalStreamsState(props1);
final KafkaStreams streams1 = new KafkaStreams(builder.build(),
props1);
- final Properties props2 = props(stateUpdaterEnabled);
+ final Properties props2 = props();
props2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props2.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(appId + "-2").getPath());
if (useNewProtocol) {