chia7712 commented on code in PR #16360:
URL: https://github.com/apache/kafka/pull/16360#discussion_r1642020103


##########
streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java:
##########
@@ -61,33 +59,27 @@
 /**
  * Tests all available joins of Kafka Streams DSL.
  */
-@Category({IntegrationTest.class})
-@RunWith(value = Parameterized.class)
+@Tag("integration")
 public abstract class AbstractJoinIntegrationTest {
-    @Rule
-    public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
-
-    @Parameterized.Parameters(name = "caching enabled = {0}")
-    public static Collection<Object[]> data() {
-        final List<Object[]> values = new ArrayList<>();
-        for (final boolean cacheEnabled : Arrays.asList(true, false)) {
-            values.add(new Object[]{cacheEnabled});
-        }
-        return values;
+    private File testFolder = null;

Review Comment:
   `private final File testFolder = TestUtils.tempDirectory();`



##########
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java:
##########
@@ -80,8 +77,8 @@ public abstract class AbstractResetIntegrationTest {
 
     abstract Map<String, Object> getClientSslConfig();
 
-    @Rule
-    public final TestName testName = new TestName();
+    protected TestInfo testInfo;

Review Comment:
   we can inject `TestInfo` in testing.



##########
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java:
##########
@@ -66,11 +63,11 @@
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@Category({IntegrationTest.class})
+@Tag("integration")

Review Comment:
   we don't need to declare annotation for abstract class.



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -232,28 +229,33 @@ public void 
shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Except
         }
     }
 
-    @Test
-    public void shouldBeAbleToRestartAfterClose() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -232,28 +229,33 @@ public void 
shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Except
         }
     }
 
-    @Test
-    public void shouldBeAbleToRestartAfterClose() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")
+    public void shouldBeAbleToRestartAfterClose(final String eosConfig) throws 
Exception {
         runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, 
SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
-    @Test
-    public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")
+    public void shouldBeAbleToCommitToMultiplePartitions(final String 
eosConfig) throws Exception {
         runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, 
MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
-    @Test
-    public void shouldBeAbleToCommitMultiplePartitionOffsets() throws 
Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")
+    public void shouldBeAbleToCommitMultiplePartitionOffsets(final String 
eosConfig) throws Exception {
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, 
SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
-    @Test
-    public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")
+    public void shouldBeAbleToRunWithTwoSubtopologies(final String eosConfig) 
throws Exception {
         runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, 
SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false, 
eosConfig);
     }
 
-    @Test
-    public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() 
throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -612,8 +617,9 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() 
throws Exception {
         }
     }
 
-    @Test
-    public void 
shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws 
Exception {
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   ```java
       @CsvSource({
           StreamsConfig.AT_LEAST_ONCE + ",true",
           StreamsConfig.AT_LEAST_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE + ",true",
           StreamsConfig.EXACTLY_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE_V2 + ",true",
           StreamsConfig.EXACTLY_ONCE_V2 + ",false"
       })
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java:
##########
@@ -432,13 +424,14 @@ public void onRestoreEnd(final TopicPartition 
topicPartition,
         );
     }
 
-    @Test
-    public void shouldNotRestoreAbortedMessages() throws Exception {
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java:
##########
@@ -80,8 +77,8 @@ public abstract class AbstractResetIntegrationTest {
 
     abstract Map<String, Object> getClientSslConfig();
 
-    @Rule
-    public final TestName testName = new TestName();
+    protected TestInfo testInfo;
+    private File testFolder;

Review Comment:
   Could you initialize it in construction?



##########
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java:
##########
@@ -178,7 +174,7 @@ void prepareTest() throws Exception {
 
     void cleanupTest() throws Exception {
         Utils.closeQuietly(streams, "kafka streams");
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);

Review Comment:
   we should keep origin behavior in migration.



##########
streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java:
##########
@@ -59,59 +56,52 @@
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
-import static org.junit.Assert.assertTrue;
-
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test the unclean shutdown behavior around state store cleanup.
  */
-@RunWith(Parameterized.class)
-@Category(IntegrationTest.class)
+@Tag("integration")
+@Timeout(600)
 public class EOSUncleanShutdownIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
 
     @SuppressWarnings("deprecation")
-    @Parameterized.Parameters(name = "{0}")
-    public static Collection<String[]> data() {
-        return Arrays.asList(new String[][] {
-            {StreamsConfig.EXACTLY_ONCE},
-            {StreamsConfig.EXACTLY_ONCE_V2}
-        });
+    public static Stream<Arguments> data() {
+        return Stream.of(
+                Arguments.of(StreamsConfig.EXACTLY_ONCE),
+                Arguments.of(StreamsConfig.EXACTLY_ONCE_V2));
     }
 
-    @Parameterized.Parameter
-    public String eosConfig;
-
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
+    private static File testFolder;
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
+        testFolder = TestUtils.tempDirectory();
         CLUSTER.start();
         STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
         STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL);
-        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, 
TEST_FOLDER.getRoot().getPath());
+        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getPath());
     }
 
-    @AfterClass
-    public static void closeCluster() {
+    @AfterAll
+    public static void closeCluster() throws IOException {
         CLUSTER.stop();
+        Utils.delete(testFolder);
     }
 
-    @ClassRule
-    public static final TemporaryFolder TEST_FOLDER = new 
TemporaryFolder(TestUtils.tempDirectory());
-
     private static final Properties STREAMS_CONFIG = new Properties();
     private static final StringSerializer STRING_SERIALIZER = new 
StringSerializer();
     private static final Long COMMIT_INTERVAL = 100L;
 
     private static final int RECORD_TOTAL = 3;
 
-    @Test
-    public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws 
InterruptedException {
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   Could you please use `@ValueSource(strings = {StreamsConfig.EXACTLY_ONCE, 
StreamsConfig.EXACTLY_ONCE_V2})`?



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -200,13 +195,15 @@ public void createTopics() throws Exception {
         CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, 
NUM_TOPIC_PARTITIONS, 1);
     }
 
-    @Test
-    public void shouldBeAbleToRunWithEosEnabled() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java:
##########
@@ -152,34 +144,36 @@ public static Collection<Object[]> data() {
 
     final ValueJoiner<String, String, String> valueJoiner = (value1, value2) 
-> value1 + "-" + value2;
 
-    final boolean cacheEnabled;
-
-    AbstractJoinIntegrationTest(final boolean cacheEnabled) {
-        this.cacheEnabled = cacheEnabled;
-    }
-
-    @BeforeClass
-    public static void setupConfigsAndUtils() {
-        STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");

Review Comment:
   nice refactor



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -200,13 +195,15 @@ public void createTopics() throws Exception {
         CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, 
NUM_TOPIC_PARTITIONS, 1);
     }
 
-    @Test
-    public void shouldBeAbleToRunWithEosEnabled() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")
+    public void shouldBeAbleToRunWithEosEnabled(final String eosConfig) throws 
Exception {
         runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, 
SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
-    @Test
-    public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws 
Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java:
##########
@@ -59,59 +56,52 @@
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
-import static org.junit.Assert.assertTrue;
-
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test the unclean shutdown behavior around state store cleanup.
  */
-@RunWith(Parameterized.class)
-@Category(IntegrationTest.class)
+@Tag("integration")
+@Timeout(600)
 public class EOSUncleanShutdownIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
 
     @SuppressWarnings("deprecation")
-    @Parameterized.Parameters(name = "{0}")
-    public static Collection<String[]> data() {
-        return Arrays.asList(new String[][] {
-            {StreamsConfig.EXACTLY_ONCE},
-            {StreamsConfig.EXACTLY_ONCE_V2}
-        });
+    public static Stream<Arguments> data() {
+        return Stream.of(
+                Arguments.of(StreamsConfig.EXACTLY_ONCE),
+                Arguments.of(StreamsConfig.EXACTLY_ONCE_V2));
     }
 
-    @Parameterized.Parameter
-    public String eosConfig;
-
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
+    private static File testFolder;

Review Comment:
   `private static final File TEST_FOLDER = TestUtils.tempDirectory();`



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -232,28 +229,33 @@ public void 
shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Except
         }
     }
 
-    @Test
-    public void shouldBeAbleToRestartAfterClose() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")
+    public void shouldBeAbleToRestartAfterClose(final String eosConfig) throws 
Exception {
         runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, 
SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
-    @Test
-    public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")
+    public void shouldBeAbleToCommitToMultiplePartitions(final String 
eosConfig) throws Exception {
         runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, 
MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
-    @Test
-    public void shouldBeAbleToCommitMultiplePartitionOffsets() throws 
Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -232,28 +229,33 @@ public void 
shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Except
         }
     }
 
-    @Test
-    public void shouldBeAbleToRestartAfterClose() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")
+    public void shouldBeAbleToRestartAfterClose(final String eosConfig) throws 
Exception {
         runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, 
SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
-    @Test
-    public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -232,28 +229,33 @@ public void 
shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Except
         }
     }
 
-    @Test
-    public void shouldBeAbleToRestartAfterClose() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")
+    public void shouldBeAbleToRestartAfterClose(final String eosConfig) throws 
Exception {
         runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, 
SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
-    @Test
-    public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")
+    public void shouldBeAbleToCommitToMultiplePartitions(final String 
eosConfig) throws Exception {
         runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, 
MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
-    @Test
-    public void shouldBeAbleToCommitMultiplePartitionOffsets() throws 
Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")
+    public void shouldBeAbleToCommitMultiplePartitionOffsets(final String 
eosConfig) throws Exception {
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, 
SINGLE_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
-    @Test
-    public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -807,20 +814,27 @@ public void 
shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
         verifyOffsetsAreInCheckpoint(1);
     }
 
-    @Test
-    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled()
 throws Exception {
-        shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true);
+    @ParameterizedTest
+    @MethodSource("data")
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled(
+            final String eosConfig, final boolean processingThreadsEnabled) 
throws Exception {
+        
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(eosConfig, 
processingThreadsEnabled, true);
     }
 
-    @Test
-    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled()
 throws Exception {
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   ```java
       @CsvSource({
           StreamsConfig.AT_LEAST_ONCE + ",true",
           StreamsConfig.AT_LEAST_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE + ",true",
           StreamsConfig.EXACTLY_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE_V2 + ",true",
           StreamsConfig.EXACTLY_ONCE_V2 + ",false"
       })
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -807,20 +814,27 @@ public void 
shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
         verifyOffsetsAreInCheckpoint(1);
     }
 
-    @Test
-    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled()
 throws Exception {
-        shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true);
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   ```java
       @CsvSource({
           StreamsConfig.AT_LEAST_ONCE + ",true",
           StreamsConfig.AT_LEAST_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE + ",true",
           StreamsConfig.EXACTLY_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE_V2 + ",true",
           StreamsConfig.EXACTLY_ONCE_V2 + ",false"
       })
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -391,8 +394,9 @@ public void shouldBeAbleToPerformMultipleTransactions() 
throws Exception {
         }
     }
 
-    @Test
-    public void shouldNotViolateEosIfOneTaskFails() throws Exception {
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   ```java
       @CsvSource({
           StreamsConfig.AT_LEAST_ONCE + ",true",
           StreamsConfig.AT_LEAST_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE + ",true",
           StreamsConfig.EXACTLY_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE_V2 + ",true",
           StreamsConfig.EXACTLY_ONCE_V2 + ",false"
       })
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -774,12 +780,13 @@ public void 
shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() th
         }
     }
 
-    @Test
-    public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws 
Exception {
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   ```java
       @CsvSource({
           StreamsConfig.AT_LEAST_ONCE + ",true",
           StreamsConfig.AT_LEAST_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE + ",true",
           StreamsConfig.EXACTLY_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE_V2 + ",true",
           StreamsConfig.EXACTLY_ONCE_V2 + ",false"
       })
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -494,8 +498,9 @@ public void shouldNotViolateEosIfOneTaskFails() throws 
Exception {
         }
     }
 
-    @Test
-    public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   ```java
       @CsvSource({
           StreamsConfig.AT_LEAST_ONCE + ",true",
           StreamsConfig.AT_LEAST_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE + ",true",
           StreamsConfig.EXACTLY_ONCE + ",false",
           StreamsConfig.EXACTLY_ONCE_V2 + ",true",
           StreamsConfig.EXACTLY_ONCE_V2 + ",false"
       })
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -344,8 +346,9 @@ private List<KeyValue<Long, Long>> getAllRecordPerKey(final 
Long key, final List
         return recordsPerKey;
     }
 
-    @Test
-    public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
+    @ParameterizedTest
+    @MethodSource("eosConfig")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java:
##########
@@ -334,21 +325,22 @@ public void shouldRestoreTransactionalMessages() throws 
Exception {
         );
     }
 
-    @Test
-    public void shouldSkipOverTxMarkersOnRestore() throws Exception {
-        shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false);
+    @ParameterizedTest
+    @MethodSource("data")
+    public void shouldSkipOverTxMarkersOnRestore(final String eosConfig) 
throws Exception {
+        shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false, eosConfig);
     }
 
-    @Test
-    public void shouldSkipOverAbortedMessagesOnRestore() throws Exception {
-        shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(true);
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java:
##########
@@ -155,20 +143,21 @@ public void before() throws Exception {
         foreachAction = results::put;
     }
 
-    @After
+    @AfterEach
     public void after() throws Exception {
         if (kafkaStreams != null) {
             kafkaStreams.close();
         }
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
-    @Test
-    public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java:
##########
@@ -300,11 +290,12 @@ public void shouldKStreamGlobalKTableJoin() throws 
Exception {
         );
     }
 
-    @Test
-    public void shouldRestoreTransactionalMessages() throws Exception {
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



##########
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java:
##########
@@ -334,21 +325,22 @@ public void shouldRestoreTransactionalMessages() throws 
Exception {
         );
     }
 
-    @Test
-    public void shouldSkipOverTxMarkersOnRestore() throws Exception {
-        shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false);
+    @ParameterizedTest
+    @MethodSource("data")

Review Comment:
   `@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, 
StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to