chia7712 commented on code in PR #16360: URL: https://github.com/apache/kafka/pull/16360#discussion_r1642020103
########## streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java: ########## @@ -61,33 +59,27 @@ /** * Tests all available joins of Kafka Streams DSL. */ -@Category({IntegrationTest.class}) -@RunWith(value = Parameterized.class) +@Tag("integration") public abstract class AbstractJoinIntegrationTest { - @Rule - public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory()); - - @Parameterized.Parameters(name = "caching enabled = {0}") - public static Collection<Object[]> data() { - final List<Object[]> values = new ArrayList<>(); - for (final boolean cacheEnabled : Arrays.asList(true, false)) { - values.add(new Object[]{cacheEnabled}); - } - return values; + private File testFolder = null; Review Comment: `private final File testFolder = TestUtils.tempDirectory();` ########## streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java: ########## @@ -80,8 +77,8 @@ public abstract class AbstractResetIntegrationTest { abstract Map<String, Object> getClientSslConfig(); - @Rule - public final TestName testName = new TestName(); + protected TestInfo testInfo; Review Comment: we can inject `TestInfo` in testing. ########## streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java: ########## @@ -66,11 +63,11 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; -@Category({IntegrationTest.class}) +@Tag("integration") Review Comment: we don't need to declare annotation for abstract class. ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -232,28 +229,33 @@ public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Except } } - @Test - public void shouldBeAbleToRestartAfterClose() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -232,28 +229,33 @@ public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Except } } - @Test - public void shouldBeAbleToRestartAfterClose() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") + public void shouldBeAbleToRestartAfterClose(final String eosConfig) throws Exception { runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig); } - @Test - public void shouldBeAbleToCommitToMultiplePartitions() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") + public void shouldBeAbleToCommitToMultiplePartitions(final String eosConfig) throws Exception { runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig); } - @Test - public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") + public void shouldBeAbleToCommitMultiplePartitionOffsets(final String eosConfig) throws Exception { runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig); } - @Test - public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") + public void shouldBeAbleToRunWithTwoSubtopologies(final String eosConfig) throws Exception { runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig); } - @Test - public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -612,8 +617,9 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception { } } - @Test - public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception { + @ParameterizedTest + @MethodSource("data") Review Comment: ```java @CsvSource({ StreamsConfig.AT_LEAST_ONCE + ",true", StreamsConfig.AT_LEAST_ONCE + ",false", StreamsConfig.EXACTLY_ONCE + ",true", StreamsConfig.EXACTLY_ONCE + ",false", StreamsConfig.EXACTLY_ONCE_V2 + ",true", StreamsConfig.EXACTLY_ONCE_V2 + ",false" }) ``` ########## streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java: ########## @@ -432,13 +424,14 @@ public void onRestoreEnd(final TopicPartition topicPartition, ); } - @Test - public void shouldNotRestoreAbortedMessages() throws Exception { + @ParameterizedTest + @MethodSource("data") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java: ########## @@ -80,8 +77,8 @@ public abstract class AbstractResetIntegrationTest { abstract Map<String, Object> getClientSslConfig(); - @Rule - public final TestName testName = new TestName(); + protected TestInfo testInfo; + private File testFolder; Review Comment: Could you initialize it in construction? ########## streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java: ########## @@ -178,7 +174,7 @@ void prepareTest() throws Exception { void cleanupTest() throws Exception { Utils.closeQuietly(streams, "kafka streams"); - IntegrationTestUtils.purgeLocalStreamsState(streamsConfig); Review Comment: we should keep origin behavior in migration. ########## streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java: ########## @@ -59,59 +56,52 @@ import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest; -import static org.junit.Assert.assertTrue; - +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test the unclean shutdown behavior around state store cleanup. */ -@RunWith(Parameterized.class) -@Category(IntegrationTest.class) +@Tag("integration") +@Timeout(600) public class EOSUncleanShutdownIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); @SuppressWarnings("deprecation") - @Parameterized.Parameters(name = "{0}") - public static Collection<String[]> data() { - return Arrays.asList(new String[][] { - {StreamsConfig.EXACTLY_ONCE}, - {StreamsConfig.EXACTLY_ONCE_V2} - }); + public static Stream<Arguments> data() { + return Stream.of( + Arguments.of(StreamsConfig.EXACTLY_ONCE), + Arguments.of(StreamsConfig.EXACTLY_ONCE_V2)); } - @Parameterized.Parameter - public String eosConfig; - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); + private static File testFolder; - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { + testFolder = TestUtils.tempDirectory(); CLUSTER.start(); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); - STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getRoot().getPath()); + STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getPath()); } - @AfterClass - public static void closeCluster() { + @AfterAll + public static void closeCluster() throws IOException { CLUSTER.stop(); + Utils.delete(testFolder); } - @ClassRule - public static final TemporaryFolder TEST_FOLDER = new TemporaryFolder(TestUtils.tempDirectory()); - private static final Properties STREAMS_CONFIG = new Properties(); private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); private static final Long COMMIT_INTERVAL = 100L; private static final int RECORD_TOTAL = 3; - @Test - public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException { + @ParameterizedTest + @MethodSource("data") Review Comment: Could you please use `@ValueSource(strings = {StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`? ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -200,13 +195,15 @@ public void createTopics() throws Exception { CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1); } - @Test - public void shouldBeAbleToRunWithEosEnabled() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java: ########## @@ -152,34 +144,36 @@ public static Collection<Object[]> data() { final ValueJoiner<String, String, String> valueJoiner = (value1, value2) -> value1 + "-" + value2; - final boolean cacheEnabled; - - AbstractJoinIntegrationTest(final boolean cacheEnabled) { - this.cacheEnabled = cacheEnabled; - } - - @BeforeClass - public static void setupConfigsAndUtils() { - STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); Review Comment: nice refactor ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -200,13 +195,15 @@ public void createTopics() throws Exception { CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1); } - @Test - public void shouldBeAbleToRunWithEosEnabled() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") + public void shouldBeAbleToRunWithEosEnabled(final String eosConfig) throws Exception { runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig); } - @Test - public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java: ########## @@ -59,59 +56,52 @@ import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest; -import static org.junit.Assert.assertTrue; - +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test the unclean shutdown behavior around state store cleanup. */ -@RunWith(Parameterized.class) -@Category(IntegrationTest.class) +@Tag("integration") +@Timeout(600) public class EOSUncleanShutdownIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); @SuppressWarnings("deprecation") - @Parameterized.Parameters(name = "{0}") - public static Collection<String[]> data() { - return Arrays.asList(new String[][] { - {StreamsConfig.EXACTLY_ONCE}, - {StreamsConfig.EXACTLY_ONCE_V2} - }); + public static Stream<Arguments> data() { + return Stream.of( + Arguments.of(StreamsConfig.EXACTLY_ONCE), + Arguments.of(StreamsConfig.EXACTLY_ONCE_V2)); } - @Parameterized.Parameter - public String eosConfig; - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); + private static File testFolder; Review Comment: `private static final File TEST_FOLDER = TestUtils.tempDirectory();` ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -232,28 +229,33 @@ public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Except } } - @Test - public void shouldBeAbleToRestartAfterClose() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") + public void shouldBeAbleToRestartAfterClose(final String eosConfig) throws Exception { runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig); } - @Test - public void shouldBeAbleToCommitToMultiplePartitions() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") + public void shouldBeAbleToCommitToMultiplePartitions(final String eosConfig) throws Exception { runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig); } - @Test - public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -232,28 +229,33 @@ public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Except } } - @Test - public void shouldBeAbleToRestartAfterClose() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") + public void shouldBeAbleToRestartAfterClose(final String eosConfig) throws Exception { runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig); } - @Test - public void shouldBeAbleToCommitToMultiplePartitions() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -232,28 +229,33 @@ public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Except } } - @Test - public void shouldBeAbleToRestartAfterClose() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") + public void shouldBeAbleToRestartAfterClose(final String eosConfig) throws Exception { runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig); } - @Test - public void shouldBeAbleToCommitToMultiplePartitions() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") + public void shouldBeAbleToCommitToMultiplePartitions(final String eosConfig) throws Exception { runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig); } - @Test - public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") + public void shouldBeAbleToCommitMultiplePartitionOffsets(final String eosConfig) throws Exception { runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig); } - @Test - public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -807,20 +814,27 @@ public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception { verifyOffsetsAreInCheckpoint(1); } - @Test - public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled() throws Exception { - shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true); + @ParameterizedTest + @MethodSource("data") + public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled( + final String eosConfig, final boolean processingThreadsEnabled) throws Exception { + shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(eosConfig, processingThreadsEnabled, true); } - @Test - public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled() throws Exception { + @ParameterizedTest + @MethodSource("data") Review Comment: ```java @CsvSource({ StreamsConfig.AT_LEAST_ONCE + ",true", StreamsConfig.AT_LEAST_ONCE + ",false", StreamsConfig.EXACTLY_ONCE + ",true", StreamsConfig.EXACTLY_ONCE + ",false", StreamsConfig.EXACTLY_ONCE_V2 + ",true", StreamsConfig.EXACTLY_ONCE_V2 + ",false" }) ``` ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -807,20 +814,27 @@ public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception { verifyOffsetsAreInCheckpoint(1); } - @Test - public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled() throws Exception { - shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true); + @ParameterizedTest + @MethodSource("data") Review Comment: ```java @CsvSource({ StreamsConfig.AT_LEAST_ONCE + ",true", StreamsConfig.AT_LEAST_ONCE + ",false", StreamsConfig.EXACTLY_ONCE + ",true", StreamsConfig.EXACTLY_ONCE + ",false", StreamsConfig.EXACTLY_ONCE_V2 + ",true", StreamsConfig.EXACTLY_ONCE_V2 + ",false" }) ``` ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -391,8 +394,9 @@ public void shouldBeAbleToPerformMultipleTransactions() throws Exception { } } - @Test - public void shouldNotViolateEosIfOneTaskFails() throws Exception { + @ParameterizedTest + @MethodSource("data") Review Comment: ```java @CsvSource({ StreamsConfig.AT_LEAST_ONCE + ",true", StreamsConfig.AT_LEAST_ONCE + ",false", StreamsConfig.EXACTLY_ONCE + ",true", StreamsConfig.EXACTLY_ONCE + ",false", StreamsConfig.EXACTLY_ONCE_V2 + ",true", StreamsConfig.EXACTLY_ONCE_V2 + ",false" }) ``` ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -774,12 +780,13 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() th } } - @Test - public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception { + @ParameterizedTest + @MethodSource("data") Review Comment: ```java @CsvSource({ StreamsConfig.AT_LEAST_ONCE + ",true", StreamsConfig.AT_LEAST_ONCE + ",false", StreamsConfig.EXACTLY_ONCE + ",true", StreamsConfig.EXACTLY_ONCE + ",false", StreamsConfig.EXACTLY_ONCE_V2 + ",true", StreamsConfig.EXACTLY_ONCE_V2 + ",false" }) ``` ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -494,8 +498,9 @@ public void shouldNotViolateEosIfOneTaskFails() throws Exception { } } - @Test - public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception { + @ParameterizedTest + @MethodSource("data") Review Comment: ```java @CsvSource({ StreamsConfig.AT_LEAST_ONCE + ",true", StreamsConfig.AT_LEAST_ONCE + ",false", StreamsConfig.EXACTLY_ONCE + ",true", StreamsConfig.EXACTLY_ONCE + ",false", StreamsConfig.EXACTLY_ONCE_V2 + ",true", StreamsConfig.EXACTLY_ONCE_V2 + ",false" }) ``` ########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -344,8 +346,9 @@ private List<KeyValue<Long, Long>> getAllRecordPerKey(final Long key, final List return recordsPerKey; } - @Test - public void shouldBeAbleToPerformMultipleTransactions() throws Exception { + @ParameterizedTest + @MethodSource("eosConfig") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java: ########## @@ -334,21 +325,22 @@ public void shouldRestoreTransactionalMessages() throws Exception { ); } - @Test - public void shouldSkipOverTxMarkersOnRestore() throws Exception { - shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false); + @ParameterizedTest + @MethodSource("data") + public void shouldSkipOverTxMarkersOnRestore(final String eosConfig) throws Exception { + shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false, eosConfig); } - @Test - public void shouldSkipOverAbortedMessagesOnRestore() throws Exception { - shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(true); + @ParameterizedTest + @MethodSource("data") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java: ########## @@ -155,20 +143,21 @@ public void before() throws Exception { foreachAction = results::put; } - @After + @AfterEach public void after() throws Exception { if (kafkaStreams != null) { kafkaStreams.close(); } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } - @Test - public void shouldKStreamGlobalKTableLeftJoin() throws Exception { + @ParameterizedTest + @MethodSource("data") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java: ########## @@ -300,11 +290,12 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { ); } - @Test - public void shouldRestoreTransactionalMessages() throws Exception { + @ParameterizedTest + @MethodSource("data") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` ########## streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java: ########## @@ -334,21 +325,22 @@ public void shouldRestoreTransactionalMessages() throws Exception { ); } - @Test - public void shouldSkipOverTxMarkersOnRestore() throws Exception { - shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false); + @ParameterizedTest + @MethodSource("data") Review Comment: `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org