AndrewJSchofield commented on code in PR #18964:
URL: https://github.com/apache/kafka/pull/18964#discussion_r1961700426


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -1018,7 +1018,7 @@ private void 
acknowledgeBatchIfImplicitAcknowledgement(boolean calledOnPoll) {
             if (acknowledgementMode == AcknowledgementMode.UNKNOWN) {
                 // The first call to poll(Duration) moves into PENDING
                 acknowledgementMode = AcknowledgementMode.PENDING;
-            } else if (acknowledgementMode == AcknowledgementMode.PENDING) {
+            } else if (acknowledgementMode == AcknowledgementMode.PENDING && 
!currentFetch.isEmpty()) {
                 // The second call to poll(Duration) if PENDING moves into 
IMPLICIT

Review Comment:
   I suggest a better comment such as `// If there are records to acknowledge 
and PENDING, moves into IMPLICIT`. It might not be the second call any more.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -688,6 +688,41 @@ public void testExplicitAcknowledgementCommitAsync() 
throws InterruptedException
         }
     }
 
+    @ClusterTest
+    public void testExplicitModeSwitchOnEmptyPoll() throws 
InterruptedException {

Review Comment:
   How about `testImplicitModeNotTriggerByPollWhenNoAcksToSend`.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -688,6 +688,41 @@ public void testExplicitAcknowledgementCommitAsync() 
throws InterruptedException
         }
     }
 
+    @ClusterTest
+    public void testExplicitModeSwitchOnEmptyPoll() throws 
InterruptedException {
+        setup();
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+
+            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+            Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new 
HashMap<>();
+            Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap1, 
partitionExceptionMap1));
+
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+            assertEquals(0, records.count());
+            shareConsumer.commitAsync();
+
+            ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record1);
+            producer.flush();
+
+            records = shareConsumer.poll(Duration.ofMillis(5000));
+            assertEquals(1, records.count());
+
+            shareConsumer.acknowledge(records.iterator().next());

Review Comment:
   And `// And now the acknowledgement mode becomes EXPLICIT`



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -688,6 +688,41 @@ public void testExplicitAcknowledgementCommitAsync() 
throws InterruptedException
         }
     }
 
+    @ClusterTest
+    public void testExplicitModeSwitchOnEmptyPoll() throws 
InterruptedException {
+        setup();
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+
+            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+            Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new 
HashMap<>();
+            Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap1, 
partitionExceptionMap1));
+
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+            assertEquals(0, records.count());
+            shareConsumer.commitAsync();
+
+            ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record1);
+            producer.flush();
+
+            records = shareConsumer.poll(Duration.ofMillis(5000));

Review Comment:
   I would add `// The acknowledgement mode remains PENDING because no records 
were return`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to