This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b4fa3496e1 KAFKA-10199: Adapt restoration integration tests to state updater (#12650) b4fa3496e1 is described below commit b4fa3496e19471aa083251337fa32ec811d10079 Author: Bruno Cadonna <cado...@apache.org> AuthorDate: Mon Sep 19 19:27:17 2022 +0200 KAFKA-10199: Adapt restoration integration tests to state updater (#12650) Transforms the integration test that verifies restoration in a parametrized test. The parametrized test runs once with state updater enabled and once with state updater disabled. Reviewer: Guozhang Wang <wangg...@gmail.com> --- .../integration/RestoreIntegrationTest.java | 51 ++++++++++++++-------- .../integration/utils/IntegrationTestUtils.java | 9 ++-- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index d95468c080..4048868486 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; @@ -60,9 +61,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import java.io.File; import java.io.IOException; @@ -74,6 +76,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -117,7 +120,7 @@ public class RestoreIntegrationTest { CLUSTER.createTopic(inputStream, 2, 1); } - private Properties props() { + private Properties props(final boolean stateUpdaterEnabled) { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -127,6 +130,7 @@ public class RestoreIntegrationTest { streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); return streamsConfiguration; } @@ -137,12 +141,19 @@ public class RestoreIntegrationTest { } } - @Test - public void shouldRestoreStateFromSourceTopic() throws Exception { + private static Stream<Boolean> parameters() { + return Stream.of( + Boolean.TRUE, + Boolean.FALSE); + } + + @ParameterizedTest + @MethodSource("parameters") + public void shouldRestoreStateFromSourceTopic(final boolean stateUpdaterEnabled) throws Exception { final AtomicInteger numReceived = new AtomicInteger(0); final StreamsBuilder builder = new StreamsBuilder(); - final Properties props = props(); + final Properties props = props(stateUpdaterEnabled); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions @@ -202,15 +213,16 @@ public class RestoreIntegrationTest { assertThat(numReceived.get(), equalTo(offsetLimitDelta * 2)); } - @Test - public void shouldRestoreStateFromChangelogTopic() throws Exception { + @ParameterizedTest + @MethodSource("parameters") + public void shouldRestoreStateFromChangelogTopic(final boolean stateUpdaterEnabled) throws Exception { final String changelog = appId + "-store-changelog"; CLUSTER.createTopic(changelog, 2, 1); final AtomicInteger numReceived = new AtomicInteger(0); final StreamsBuilder builder = new StreamsBuilder(); - final Properties props = props(); + final Properties props = props(stateUpdaterEnabled); // restoring from 1000 to 5000, and then process from 5000 to 10000 on each of the two partitions final int offsetCheckpointed = 1000; @@ -268,8 +280,9 @@ public class RestoreIntegrationTest { assertThat(numReceived.get(), equalTo(numberOfKeys)); } - @Test - public void shouldSuccessfullyStartWhenLoggingDisabled() throws InterruptedException { + @ParameterizedTest + @MethodSource("parameters") + public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean stateUpdaterEnabled) throws InterruptedException { final StreamsBuilder builder = new StreamsBuilder(); final KStream<Integer, Integer> stream = builder.stream(inputStream); @@ -279,7 +292,7 @@ public class RestoreIntegrationTest { Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled()); final CountDownLatch startupLatch = new CountDownLatch(1); - kafkaStreams = new KafkaStreams(builder.build(), props()); + kafkaStreams = new KafkaStreams(builder.build(), props(stateUpdaterEnabled)); kafkaStreams.setStateListener((newState, oldState) -> { if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) { startupLatch.countDown(); @@ -291,8 +304,9 @@ public class RestoreIntegrationTest { assertTrue(startupLatch.await(30, TimeUnit.SECONDS)); } - @Test - public void shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedException { + @ParameterizedTest + @MethodSource("parameters") + public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean stateUpdaterEnabled) throws InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously(inputStream, asList(KeyValue.pair(1, 1), KeyValue.pair(2, 2), @@ -320,7 +334,7 @@ public class RestoreIntegrationTest { final Topology topology = streamsBuilder.build(); - kafkaStreams = new KafkaStreams(topology, props()); + kafkaStreams = new KafkaStreams(topology, props(stateUpdaterEnabled)); final CountDownLatch latch = new CountDownLatch(1); kafkaStreams.setStateListener((newState, oldState) -> { @@ -335,8 +349,9 @@ public class RestoreIntegrationTest { assertTrue(processorLatch.await(30, TimeUnit.SECONDS)); } - @Test - public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore() throws Exception { + @ParameterizedTest + @MethodSource("parameters") + public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final boolean stateUpdaterEnabled) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); builder.table( inputStream, @@ -344,13 +359,13 @@ public class RestoreIntegrationTest { ); createStateForRestoration(inputStream, 0); - final Properties props1 = props(); + final Properties props1 = props(stateUpdaterEnabled); props1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props1.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-1").getPath()); purgeLocalStreamsState(props1); final KafkaStreams client1 = new KafkaStreams(builder.build(), props1); - final Properties props2 = props(); + final Properties props2 = props(stateUpdaterEnabled); props2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props2.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-2").getPath()); purgeLocalStreamsState(props2); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 689b1c0beb..c14988cdae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -239,11 +239,14 @@ public class IntegrationTestUtils { * Used by tests migrated to JUnit 5. */ public static String safeUniqueTestName(final Class<?> testClass, final TestInfo testInfo) { - return safeUniqueTestName(testClass, testInfo.getTestMethod().map(Method::getName).orElse("")); + final String displayName = testInfo.getDisplayName(); + final String methodName = testInfo.getTestMethod().map(Method::getName).orElse("unknownMethodName"); + final String testName = displayName.contains(methodName) ? methodName : methodName + displayName; + return safeUniqueTestName(testClass, testName); } - private static String safeUniqueTestName(final Class<?> testClass, final String methodName) { - return (testClass.getSimpleName() + methodName) + private static String safeUniqueTestName(final Class<?> testClass, final String testName) { + return (testClass.getSimpleName() + testName) .replace(':', '_') .replace('.', '_') .replace('[', '_')