cadonna commented on code in PR #15044:
URL: https://github.com/apache/kafka/pull/15044#discussion_r1432581521


##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -1006,12 +1177,40 @@ private List<KeyValue<Long, Long>> readResult(final 
String topic,
 
         // read uncommitted
         return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            TestUtils.consumerConfig(CLUSTER.bootstrapServers(), 
LongDeserializer.class, LongDeserializer.class),
+            TestUtils.consumerConfig(CLUSTER.bootstrapServers(), 
keyDeserializer, valueDeserializer),
             topic,
             numberOfRecords
         );
     }
 
+    private <K, V> void ensureCommittedRecordsInTopicPartition(final String 
topic,

Review Comment:
   I added this method to specifically verify if the partition to verify 
contains committed records because I saw flaky test failures where the 
changelog topic of the partition to verify was empty. If the changelog topic is 
empty, the latch never counts down, the Streams client never closes and the 
test runs into the test timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to