This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c5ae7b41281 KAFKA-18794: Add check to send one ShareFetchEvent per 
poll in ShareConsumer. (#20794)
c5ae7b41281 is described below

commit c5ae7b4128148c97845843db7ea818c8e798eef9
Author: Shivsundar R <[email protected]>
AuthorDate: Mon Nov 3 10:11:38 2025 -0500

    KAFKA-18794: Add check to send one ShareFetchEvent per poll in 
ShareConsumer. (#20794)
    
    *What*
    
    - Currently in `ShareConsumerImpl`, there is a chance we could be
    sending multiple `ShareFetchEvents` for a single `poll()`.
    - This could happen when the `ShareFetchBuffer` is empty in the first
    wait and the `pollTimer` has not completed yet, then we would send
    multiple events.
    
    - Usually the `ShareFetchBuffer` would wait for a time until the
    pollTimeout, so we would not send multiple events, but the buffer wait
    time is also controlled by `applicationEventHandler.maximumTimeToWait()`
    which can return lower values (even 0) in some cases (especially during
    startup of heartbeat request manager).
    
    - If this happens, we will see multiple events sent, and this could even
    lead to multiple fetching from the broker (sort of a fetch and a
    pre-fetch) if the response for the previous `ShareFetchEvent` arrives
    before the next `ShareFetchEvent` was processed.
    - To avoid this, we have a check now which only sends one
    `ShareFetchEvent` per poll.
    
    - This was the reason a couple of tests were flaky in
    KafkaShareConsumerTest -
    https://issues.apache.org/jira/browse/KAFKA-18794. The PR fixes the
    flakiness, now the tests reliably pass locally.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../internals/ShareConsumeRequestManager.java      |  6 +-
 .../consumer/internals/ShareConsumerImpl.java      | 40 ++++++++-----
 .../events/ApplicationEventProcessor.java          |  2 +-
 .../consumer/internals/events/ShareFetchEvent.java | 10 +---
 .../clients/consumer/KafkaShareConsumerTest.java   |  7 ---
 .../internals/ShareConsumeRequestManagerTest.java  | 66 +++++++---------------
 .../consumer/internals/ShareConsumerImplTest.java  | 42 ++++++++++----
 7 files changed, 82 insertions(+), 91 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index e7bfdaefe49..703f7530a5c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -283,16 +283,14 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         }
     }
 
-    public void fetch(Map<TopicIdPartition, NodeAcknowledgements> 
acknowledgementsMap,
-                      Map<TopicIdPartition, NodeAcknowledgements> 
controlRecordAcknowledgements) {
+    public void fetch(Map<TopicIdPartition, NodeAcknowledgements> 
acknowledgementsMap) {
         if (!fetchMoreRecords) {
             log.debug("Fetch more data");
             fetchMoreRecords = true;
         }
 
-        // Process both acknowledgement maps and sends them in the next 
ShareFetch.
+        // Store the acknowledgements and send them in the next ShareFetch.
         processAcknowledgementsMap(acknowledgementsMap);
-        processAcknowledgementsMap(controlRecordAcknowledgements);
     }
 
     private void processAcknowledgementsMap(Map<TopicIdPartition, 
NodeAcknowledgements> acknowledgementsMap) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 0530155e538..501821a2310 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -203,6 +203,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     // and is used to prevent multithreaded access
     private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
     private final AtomicInteger refCount = new AtomicInteger(0);
+    private boolean shouldSendShareFetchEvent = false;
 
     ShareConsumerImpl(final ConsumerConfig config,
                       final Deserializer<K> keyDeserializer,
@@ -581,6 +582,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 throw new IllegalStateException("Consumer is not subscribed to 
any topics.");
             }
 
+            shouldSendShareFetchEvent = true;
+
             do {
                 // Make sure the network thread can tell the application is 
actively polling
                 applicationEventHandler.add(new 
PollEvent(timer.currentTimeMs()));
@@ -654,28 +657,29 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         if (currentFetch.isEmpty()) {
             final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
             if (fetch.isEmpty()) {
-                // Check for any acknowledgements which could have come from 
control records (GAP) and include them.
-                applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap, fetch.takeAcknowledgedRecords()));
+                Map<TopicIdPartition, NodeAcknowledgements> 
controlRecordAcknowledgements = fetch.takeAcknowledgedRecords();
 
-                // Notify the network thread to wake up and start the next 
round of fetching
-                applicationEventHandler.wakeupNetworkThread();
+                if (!controlRecordAcknowledgements.isEmpty()) {
+                    // Asynchronously commit any waiting acknowledgements from 
control records.
+                    
sendShareAcknowledgeAsyncEvent(controlRecordAcknowledgements);
+                }
+                // We only send one ShareFetchEvent per poll call.
+                if (shouldSendShareFetchEvent) {
+                    // Check for any acknowledgements which could have come 
from control records (GAP) and include them.
+                    applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+                    shouldSendShareFetchEvent = false;
+                    // Notify the network thread to wake up and start the next 
round of fetching
+                    applicationEventHandler.wakeupNetworkThread();
+                }
             } else if (!acknowledgementsMap.isEmpty()) {
                 // Asynchronously commit any waiting acknowledgements
-                Timer timer = time.timer(defaultApiTimeoutMs);
-                applicationEventHandler.add(new 
ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));
-
-                // Notify the network thread to wake up and start the next 
round of fetching
-                applicationEventHandler.wakeupNetworkThread();
+                sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
             }
             return fetch;
         } else {
             if (!acknowledgementsMap.isEmpty()) {
                 // Asynchronously commit any waiting acknowledgements
-                Timer timer = time.timer(defaultApiTimeoutMs);
-                applicationEventHandler.add(new 
ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));
-
-                // Notify the network thread to wake up and start the next 
round of fetching
-                applicationEventHandler.wakeupNetworkThread();
+                sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
             }
             if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
                 // We cannot leave unacknowledged records in EXPLICIT 
acknowledgement mode, so we throw an exception to the application.
@@ -685,6 +689,14 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         }
     }
 
+    private void sendShareAcknowledgeAsyncEvent(Map<TopicIdPartition, 
NodeAcknowledgements> acknowledgementsMap) {
+        Timer timer = time.timer(defaultApiTimeoutMs);
+        applicationEventHandler.add(new 
ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));
+
+        // Notify the network thread to wake up and start the next round of 
fetching
+        applicationEventHandler.wakeupNetworkThread();
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 31eef662ce8..2ac1be587a6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -514,7 +514,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      * Process event that tells the share consume request manager to fetch 
more records.
      */
     private void process(final ShareFetchEvent event) {
-        requestManagers.shareConsumeRequestManager.ifPresent(scrm -> 
scrm.fetch(event.acknowledgementsMap(), event.controlRecordAcknowledgements()));
+        requestManagers.shareConsumeRequestManager.ifPresent(scrm -> 
scrm.fetch(event.acknowledgementsMap()));
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
index 2bc38b07b8b..1f2f0cc9a17 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
@@ -25,23 +25,15 @@ public class ShareFetchEvent extends ApplicationEvent {
 
     private final Map<TopicIdPartition, NodeAcknowledgements> 
acknowledgementsMap;
 
-    private final Map<TopicIdPartition, NodeAcknowledgements> 
controlRecordAcknowledgements;
-
-    public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> 
acknowledgementsMap,
-                           Map<TopicIdPartition, NodeAcknowledgements> 
controlRecordAcknowledgements) {
+    public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> 
acknowledgementsMap) {
         super(Type.SHARE_FETCH);
         this.acknowledgementsMap = acknowledgementsMap;
-        this.controlRecordAcknowledgements = controlRecordAcknowledgements;
     }
 
     public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
         return acknowledgementsMap;
     }
 
-    public Map<TopicIdPartition, NodeAcknowledgements> 
controlRecordAcknowledgements() {
-        return controlRecordAcknowledgements;
-    }
-
     @Override
     protected String toStringBase() {
         return super.toStringBase() + ", acknowledgementsMap=" + 
acknowledgementsMap;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
index 1346ea7fd09..e1a4aef6498 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
@@ -49,7 +49,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -141,9 +140,6 @@ public class KafkaShareConsumerTest {
         }
     }
 
-    // This test is proving sufficiently flaky that it has been disabled 
pending investigation
-    @Disabled
-    // @Flaky("KAFKA-18488")
     @Test
     public void testVerifyFetchAndCommitSyncImplicit() {
         ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, 
Long.MAX_VALUE, false,
@@ -218,9 +214,6 @@ public class KafkaShareConsumerTest {
         }
     }
 
-    // This test is proving sufficiently flaky that it has been disabled 
pending investigation
-    @Disabled
-    //@Flaky("KAFKA-18794")
     @Test
     public void testVerifyFetchAndCloseImplicit() {
         ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, 
Long.MAX_VALUE, false,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 6c47aea4171..d2caebb1c4c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -252,7 +252,7 @@ public class ShareConsumeRequestManagerTest {
 
         Acknowledgements acknowledgements = Acknowledgements.empty();
         acknowledgements.add(1L, AcknowledgeType.ACCEPT);
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
 
         sendFetchAndVerifyResponse(records, 
ShareCompletedFetchTest.acquiredRecords(2L, 1), Errors.NONE);
         assertEquals(1.0,
@@ -265,7 +265,7 @@ public class ShareConsumeRequestManagerTest {
 
         Acknowledgements acknowledgements2 = Acknowledgements.empty();
         acknowledgements2.add(2L, AcknowledgeType.REJECT);
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements2)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements2)));
 
         // Preparing a response with an acknowledgement error.
         sendFetchAndVerifyResponse(records, Collections.emptyList(), 
Errors.NONE, Errors.INVALID_RECORD_STATE);
@@ -391,7 +391,7 @@ public class ShareConsumeRequestManagerTest {
         acknowledgements.add(1L, AcknowledgeType.ACCEPT);
 
         // Piggyback acknowledgements
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
 
         // Remaining acknowledgements sent with close().
         Acknowledgements acknowledgements2 = getAcknowledgements(2, 
AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@@ -797,7 +797,7 @@ public class ShareConsumeRequestManagerTest {
         fetchRecords();
 
         // Piggyback acknowledgements
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
 
         assertEquals(1, sendFetches());
         assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -807,7 +807,7 @@ public class ShareConsumeRequestManagerTest {
 
         Acknowledgements acknowledgements2 = Acknowledgements.empty();
         acknowledgements2.add(3L, AcknowledgeType.ACCEPT);
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements2)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements2)));
 
         client.prepareResponse(fullFetchResponse(tip0, records, 
acquiredRecords, Errors.NONE));
         networkClientDelegate.poll(time.timer(0));
@@ -935,7 +935,7 @@ public class ShareConsumeRequestManagerTest {
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
 
         // Send acknowledgements via ShareFetch
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
         fetchRecords();
         // Subscription changes.
         subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
@@ -974,7 +974,7 @@ public class ShareConsumeRequestManagerTest {
         Acknowledgements acknowledgements = getAcknowledgements(0, 
AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
 
         // Send acknowledgements via ShareFetch
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
         fetchRecords();
         // Subscription changes.
         subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
@@ -1362,7 +1362,7 @@ public class ShareConsumeRequestManagerTest {
         Acknowledgements acknowledgements1 = getAcknowledgements(2,
                 AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
 
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements1)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements1)));
 
         assertEquals(1, sendFetches());
         assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -1406,7 +1406,7 @@ public class ShareConsumeRequestManagerTest {
 
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
 
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
 
         NetworkClientDelegate.PollResult pollResult = 
shareConsumeRequestManager.sendFetchesReturnPollResult();
         assertEquals(1, pollResult.unsentRequests.size());
@@ -1442,7 +1442,7 @@ public class ShareConsumeRequestManagerTest {
 
         // The acknowledgements for the initial fetch from tip0 are processed 
now and sent to the background thread.
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
 
         assertEquals(0, completedAcknowledgements.size());
 
@@ -1467,7 +1467,7 @@ public class ShareConsumeRequestManagerTest {
 
         // The acknowledgements for the initial fetch from tip0 are processed 
now and sent to the background thread.
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
 
         // We attempt to send the acknowledgements piggybacking on the fetch.
         assertEquals(1, sendFetches());
@@ -1813,7 +1813,7 @@ public class ShareConsumeRequestManagerTest {
 
         Acknowledgements acknowledgements = Acknowledgements.empty();
         acknowledgements.add(1L, AcknowledgeType.ACCEPT);
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
 
         assertEquals(startingClusterMetadata, metadata.fetch());
 
@@ -1918,7 +1918,7 @@ public class ShareConsumeRequestManagerTest {
 
         Acknowledgements acknowledgements = Acknowledgements.empty();
         acknowledgements.add(1L, AcknowledgeType.ACCEPT);
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
 
         // The metadata snapshot will have been updated with the new leader 
information
         assertNotEquals(startingClusterMetadata, metadata.fetch());
@@ -2013,7 +2013,7 @@ public class ShareConsumeRequestManagerTest {
 
         Acknowledgements acknowledgements = Acknowledgements.empty();
         acknowledgements.add(1L, AcknowledgeType.ACCEPT);
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
 
         assertEquals(startingClusterMetadata, metadata.fetch());
 
@@ -2301,7 +2301,7 @@ public class ShareConsumeRequestManagerTest {
         Acknowledgements acknowledgementsTp1 = getAcknowledgements(1,
                 AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
 
-        shareConsumeRequestManager.fetch(Map.of(tip1, new 
NodeAcknowledgements(1, acknowledgementsTp1)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip1, new 
NodeAcknowledgements(1, acknowledgementsTp1)));
 
         // Move the leadership of tp0 onto node 1
         HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = 
new HashMap<>();
@@ -2384,13 +2384,13 @@ public class ShareConsumeRequestManagerTest {
 
         Acknowledgements acknowledgements = Acknowledgements.empty();
         acknowledgements.add(1, AcknowledgeType.ACCEPT);
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
 
         assertEquals(startingClusterMetadata, metadata.fetch());
 
         acknowledgements = Acknowledgements.empty();
         acknowledgements.add(1, AcknowledgeType.ACCEPT);
-        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
 
         assertEquals(2, sendFetches());
         assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -2431,7 +2431,7 @@ public class ShareConsumeRequestManagerTest {
 
         assertNotEquals(startingClusterMetadata, metadata.fetch());
 
-        shareConsumeRequestManager.fetch(Map.of(tip1, new 
NodeAcknowledgements(1, acknowledgements)), Collections.emptyMap());
+        shareConsumeRequestManager.fetch(Map.of(tip1, new 
NodeAcknowledgements(1, acknowledgements)));
 
         assertEquals(1, sendFetches());
         assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -2714,14 +2714,14 @@ public class ShareConsumeRequestManagerTest {
         }
 
         private int sendFetches() {
-            fetch(new HashMap<>(), new HashMap<>());
+            fetch(new HashMap<>());
             NetworkClientDelegate.PollResult pollResult = 
poll(time.milliseconds());
             networkClientDelegate.addAll(pollResult.unsentRequests);
             return pollResult.unsentRequests.size();
         }
 
         private NetworkClientDelegate.PollResult sendFetchesReturnPollResult() 
{
-            fetch(new HashMap<>(), new HashMap<>());
+            fetch(new HashMap<>());
             NetworkClientDelegate.PollResult pollResult = 
poll(time.milliseconds());
             networkClientDelegate.addAll(pollResult.unsentRequests);
             return pollResult;
@@ -2861,32 +2861,6 @@ public class ShareConsumeRequestManagerTest {
         }
     }
 
-    @Test
-    void testFetchWithControlRecords() {
-        buildRequestManager();
-        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
-
-        Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsMap = 
new HashMap<>();
-
-        Acknowledgements acknowledgements = Acknowledgements.empty();
-        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
-        nodeAcknowledgementsMap.put(tip0, new NodeAcknowledgements(0, 
acknowledgements));
-
-        Map<TopicIdPartition, NodeAcknowledgements> 
nodeAcknowledgementsControlRecordMap = new HashMap<>();
-
-        Acknowledgements controlAcknowledgements = Acknowledgements.empty();
-        controlAcknowledgements.addGap(2L);
-        nodeAcknowledgementsControlRecordMap.put(tip0, new 
NodeAcknowledgements(0, controlAcknowledgements));
-
-        shareConsumeRequestManager.fetch(nodeAcknowledgementsMap, 
nodeAcknowledgementsControlRecordMap);
-
-        Map<TopicIdPartition, Acknowledgements> fetchAcksToSend = 
shareConsumeRequestManager.getFetchAcknowledgementsToSend(0);
-        assertEquals(1, fetchAcksToSend.size());
-        assertEquals(AcknowledgeType.ACCEPT, 
fetchAcksToSend.get(tip0).get(1L));
-        assertEquals(2, fetchAcksToSend.get(tip0).size());
-        assertNull(fetchAcksToSend.get(tip0).get(3L));
-    }
-
     private void sendFetchAndVerifyResponse(MemoryRecords records,
                                     
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords,
                                     Errors... error) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index edb244b06aa..1765563adcd 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
@@ -269,20 +270,15 @@ public class ShareConsumerImplTest {
 
         consumer.poll(Duration.ZERO);
 
-        // Verify that next ShareFetchEvent was sent with the acknowledgement 
GAP for offset 1
+        // Verify that a ShareAcknowledeAsyncEvent was sent with the 
acknowledgement GAP for offset 1
         verify(applicationEventHandler).add(argThat(event -> {
-            if (!(event instanceof ShareFetchEvent)) {
+            if (!(event instanceof ShareAcknowledgeAsyncEvent)) {
                 return false;
             }
-            ShareFetchEvent fetchEvent = (ShareFetchEvent) event;
+            ShareAcknowledgeAsyncEvent shareAcknowledgeAsyncEvent = 
(ShareAcknowledgeAsyncEvent) event;
             
-            // Regular acknowledgements map should be empty
-            if (!fetchEvent.acknowledgementsMap().isEmpty()) {
-                return false;
-            }
-            
-            // Control record acknowledgements map should contain the GAP for 
offset 1
-            Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcks = 
fetchEvent.controlRecordAcknowledgements();
+            // Acknowledgements map should contain the GAP for offset 1
+            Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcks = 
shareAcknowledgeAsyncEvent.acknowledgementsMap();
             return controlRecordAcks.containsKey(tip) &&
                    controlRecordAcks.get(tip).acknowledgements().get(1L) == 
null; // Null indicates GAP
         }));
@@ -346,6 +342,32 @@ public class ShareConsumerImplTest {
         assertEquals("This consumer has already been closed.", 
res.getMessage());
     }
 
+    @Test
+    public void testShouldSendOneShareFetchEventPerPoll() {
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(subscriptions);
+
+        // Setup test data
+        String topic = "test-topic";
+        // Setup an empty fetch.
+        ShareFetch<String, String> firstFetch = ShareFetch.empty();
+
+        doReturn(firstFetch)
+                .doReturn(ShareFetch.empty())
+                .when(fetchCollector)
+                .collect(any(ShareFetchBuffer.class));
+
+        // Setup subscription
+        List<String> topics = Collections.singletonList(topic);
+        
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
topics);
+        consumer.subscribe(topics);
+
+        doReturn(0L).when(applicationEventHandler).maximumTimeToWait();
+        // Check that only 1 ShareFetchEvent is sent per poll
+        consumer.poll(Duration.ofMillis(100));
+        verify(applicationEventHandler, times(1)).add(argThat(event -> event 
instanceof ShareFetchEvent));
+    }
+
     @Test
     public void testUnsubscribeWithTopicAuthorizationException() {
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);

Reply via email to