mjsax commented on code in PR #20718:
URL: https://github.com/apache/kafka/pull/20718#discussion_r2496804395


##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##########
@@ -383,60 +386,79 @@ public static VerificationResult verify(final String 
kafka,
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
NumberDeserializer.class);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
 
-        final KafkaConsumer<String, Number> consumer = new 
KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, 
NUMERIC_VALUE_TOPICS);
-        consumer.assign(partitions);
-        consumer.seekToBeginning(partitions);
+        // Verify all transactions are finished before proceeding with data 
verification
+        if (eosEnabled) {
+            final Properties txnProps = new Properties();
+            txnProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+            txnProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+            txnProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+            txnProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+            txnProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString());
+
+            final VerificationResult txnResult;
+            try (final KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(txnProps)) {

Review Comment:
   Seem we are creating a consumer here, and also below. Could we re-use the 
same consumer? Ie use a top level `try (final KafkaConsumer<byte[], byte[]> 
consumer = new KafkaConsumer<>(...))` and nest `if (eosEnabled)` block inside 
this try-catch, as well as the other verification we do later?



##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##########
@@ -383,60 +386,79 @@ public static VerificationResult verify(final String 
kafka,
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
NumberDeserializer.class);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
 
-        final KafkaConsumer<String, Number> consumer = new 
KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, 
NUMERIC_VALUE_TOPICS);
-        consumer.assign(partitions);
-        consumer.seekToBeginning(partitions);
+        // Verify all transactions are finished before proceeding with data 
verification
+        if (eosEnabled) {
+            final Properties txnProps = new Properties();

Review Comment:
   Seems this are the same config we setup above for `props`. Seems we could 
just reuse the ones from above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to