adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2031767657
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2090,6 +2091,307 @@ public void testComplexShareConsumer() throws Exception
{
verifyShareGroupStateTopicRecordsProduced();
}
+ @ClusterTest
+ public void testReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 8);
+ // 5th and 10th message transaction was aborted, hence they won't
be included in the fetched records.
+ assertEquals(8, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ if (messageCounter % 5 == 0)
+ messageCounter++;
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testReadUncommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 10);
+ // Even though 5th and 10th message transaction was aborted, they
will be included in the fetched records since IsolationLevel is
READ_UNCOMMITTED.
+ assertEquals(10, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
Review Comment:
I don't think that this will force the test to wait for 2.5 sec. This is the
maximum time to block. The method returns immediately if there are records
available. Otherwise, it will await the passed timeout.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]