AndrewJSchofield commented on code in PR #18964:
URL: https://github.com/apache/kafka/pull/18964#discussion_r1961700426
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -1018,7 +1018,7 @@ private void
acknowledgeBatchIfImplicitAcknowledgement(boolean calledOnPoll) {
if (acknowledgementMode == AcknowledgementMode.UNKNOWN) {
// The first call to poll(Duration) moves into PENDING
acknowledgementMode = AcknowledgementMode.PENDING;
- } else if (acknowledgementMode == AcknowledgementMode.PENDING) {
+ } else if (acknowledgementMode == AcknowledgementMode.PENDING &&
!currentFetch.isEmpty()) {
// The second call to poll(Duration) if PENDING moves into
IMPLICIT
Review Comment:
I suggest a better comment such as `// If there are records to acknowledge
and PENDING, moves into IMPLICIT`. It might not be the second call any more.
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -688,6 +688,41 @@ public void testExplicitAcknowledgementCommitAsync()
throws InterruptedException
}
}
+ @ClusterTest
+ public void testExplicitModeSwitchOnEmptyPoll() throws
InterruptedException {
Review Comment:
How about `testImplicitModeNotTriggerByPollWhenNoAcksToSend`.
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -688,6 +688,41 @@ public void testExplicitAcknowledgementCommitAsync()
throws InterruptedException
}
}
+ @ClusterTest
+ public void testExplicitModeSwitchOnEmptyPoll() throws
InterruptedException {
+ setup();
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+ Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new
HashMap<>();
+ Map<TopicPartition, Exception> partitionExceptionMap1 = new
HashMap<>();
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap1,
partitionExceptionMap1));
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
+ assertEquals(0, records.count());
+ shareConsumer.commitAsync();
+
+ ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ producer.send(record1);
+ producer.flush();
+
+ records = shareConsumer.poll(Duration.ofMillis(5000));
+ assertEquals(1, records.count());
+
+ shareConsumer.acknowledge(records.iterator().next());
Review Comment:
And `// And now the acknowledgement mode becomes EXPLICIT`
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -688,6 +688,41 @@ public void testExplicitAcknowledgementCommitAsync()
throws InterruptedException
}
}
+ @ClusterTest
+ public void testExplicitModeSwitchOnEmptyPoll() throws
InterruptedException {
+ setup();
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+ Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new
HashMap<>();
+ Map<TopicPartition, Exception> partitionExceptionMap1 = new
HashMap<>();
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap1,
partitionExceptionMap1));
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
+ assertEquals(0, records.count());
+ shareConsumer.commitAsync();
+
+ ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ producer.send(record1);
+ producer.flush();
+
+ records = shareConsumer.poll(Duration.ofMillis(5000));
Review Comment:
I would add `// The acknowledgement mode remains PENDING because no records
were return`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]