This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 07e80c63cd1 KAFKA-19946: Simplify skipping of empty ShareFetch 
requests (#21033)
07e80c63cd1 is described below

commit 07e80c63cd1bf88fede239cc54622b0c24a6beff
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Dec 3 10:05:43 2025 +0000

    KAFKA-19946: Simplify skipping of empty ShareFetch requests (#21033)
    
    ShareFetch requests can be used to fetch records for share consumers,
    but in some cases, no records are required and the request is just being
    used to update the share session or send acknowledgements. As a result,
    there are situations in which a ShareFetch request would be built, only
    for it to be entirely empty (no fetch, no asks, no share session
    update). This PR simplifies the logic for detecting when the request is
    empty and then to avoid building it entirely. It also adds some tests
    for this case.
    
    Reviewers: Apoorv Mittal <[email protected]>
---
 .../internals/ShareConsumeRequestManager.java      |  22 ++---
 .../consumer/internals/ShareSessionHandler.java    |  33 +++++--
 .../internals/ShareSessionHandlerTest.java         | 103 +++++++++++++++------
 .../java/kafka/server/share/SharePartition.java    |   2 +-
 4 files changed, 110 insertions(+), 50 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 183f15833aa..e57265716f5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -253,24 +253,20 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             Node target = entry.getKey();
             ShareSessionHandler handler = entry.getValue();
 
-            log.trace("Building ShareFetch request to send to node {}", 
target.id());
-            ShareFetchRequest.Builder requestBuilder = 
handler.newShareFetchBuilder(groupId, shareFetchConfig);
-
             // For record_limit mode, we only send a full ShareFetch to a 
single node at a time.
             // We prepare to build ShareFetch requests for all nodes with 
session handlers to permit
             // piggy-backing of acknowledgements, and also to adjust the 
topic-partitions
-            // in the share session.
-            if (isShareAcquireModeRecordLimit() && target.id() != 
fetchRecordsNodeId.get()) {
-                ShareFetchRequestData data = requestBuilder.data();
-                // If there's nothing to send, just skip building the record.
-                if (data.topics().isEmpty() && 
data.forgottenTopicsData().isEmpty()) {
-                    return null;
-                } else {
-                    // There is something to send, but we don't want to fetch 
any records.
-                    requestBuilder.data().setMaxRecords(0);
-                }
+            // in the share session, but if the request would contain neither 
of those, it can be skipped.
+            boolean canSkipIfRequestEmpty = isShareAcquireModeRecordLimit() && 
target.id() != fetchRecordsNodeId.get();
+
+            ShareFetchRequest.Builder requestBuilder = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, canSkipIfRequestEmpty);
+            if (requestBuilder == null) {
+                log.trace("Skipping ShareFetch request to send to node {}", 
target.id());
+                return null;
             }
 
+            log.trace("Building ShareFetch request to send to node {}", 
target.id());
+
             nodesWithPendingRequests.add(target.id());
 
             BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, error) -> {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index 348855a341b..0b6cdf0a6db 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -54,6 +54,7 @@ import java.util.stream.Collectors;
  * <p>ShareSessionHandler tracks the partitions which are in the session. It 
also determines
  * which partitions need to be included in each ShareFetch/ShareAcknowledge 
request.
  */
+@SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"})
 public class ShareSessionHandler {
     private final Logger log;
     private final int node;
@@ -112,7 +113,7 @@ public class ShareSessionHandler {
         return nextMetadata.isNewSession();
     }
 
-    public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, 
ShareFetchConfig shareFetchConfig) {
+    public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, 
ShareFetchConfig shareFetchConfig, boolean canSkipIfRequestEmpty) {
         List<TopicIdPartition> added = new ArrayList<>();
         List<TopicIdPartition> removed = new ArrayList<>();
         List<TopicIdPartition> replaced = new ArrayList<>();
@@ -158,15 +159,6 @@ public class ShareSessionHandler {
             }
         }
 
-        if (log.isDebugEnabled()) {
-            log.debug("Build ShareFetch {} for node {}. Added {}, removed {}, 
replaced {} out of {}",
-                    nextMetadata, node,
-                    topicIdPartitionsToLogString(added),
-                    topicIdPartitionsToLogString(removed),
-                    topicIdPartitionsToLogString(replaced),
-                    topicIdPartitionsToLogString(sessionPartitions.values()));
-        }
-
         // The replaced topic-partitions need to be removed, and their 
replacements are already added
         removed.addAll(replaced);
 
@@ -187,6 +179,19 @@ public class ShareSessionHandler {
         nextPartitions = new LinkedHashMap<>();
         nextAcknowledgements = new LinkedHashMap<>();
 
+        if (canSkipIfRequestEmpty && added.isEmpty() && removed.isEmpty() && 
acknowledgementBatches.isEmpty()) {
+            return null;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Build ShareFetch {} for node {}. Added {}, removed {}, 
replaced {} out of {}",
+                nextMetadata, node,
+                topicIdPartitionsToLogString(added),
+                topicIdPartitionsToLogString(removed),
+                topicIdPartitionsToLogString(replaced),
+                topicIdPartitionsToLogString(sessionPartitions.values()));
+        }
+
         if (hasRenewAcknowledgements) {
             // If the request has renew acknowledgements, the ShareFetch is 
only used to send the acknowledgements
             // and potentially update the share session. The parameters for 
wait time, number of bytes and number of
@@ -196,6 +201,14 @@ public class ShareSessionHandler {
                 0, 0, 0,
                 0, shareFetchConfig.shareAcquireMode.id, true,
                 added, removed, acknowledgementBatches);
+        } else if (canSkipIfRequestEmpty) {
+            // The request contains changes to the share session or 
acknowledgements only. The parameters for wait time,
+            // number of bytes and number of records are all zero.
+            return ShareFetchRequest.Builder.forConsumer(
+                groupId, nextMetadata, 0,
+                0, 0, 0,
+                0, shareFetchConfig.shareAcquireMode.id, false,
+                added, removed, acknowledgementBatches);
         } else {
             return ShareFetchRequest.Builder.forConsumer(
                 groupId, nextMetadata, shareFetchConfig.maxWaitMs,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
index 72d5d7c4e21..826978aab5a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
@@ -26,9 +26,11 @@ import org.apache.kafka.common.message.ShareFetchRequestData;
 import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ShareFetchRequest;
 import org.apache.kafka.common.requests.ShareFetchResponse;
 import org.apache.kafka.common.utils.LogContext;
 
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -45,6 +47,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.kafka.common.requests.ShareRequestMetadata.INITIAL_EPOCH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -180,7 +183,7 @@ public class ShareSessionHandlerTest {
         TopicIdPartition foo1 = new TopicIdPartition(fooId, 1, "foo");
         handler.addPartitionToFetch(foo0, null);
         handler.addPartitionToFetch(foo1, null);
-        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, 
DEFAULT_SHARE_FETCH_CONFIG).build().data();
+        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, DEFAULT_SHARE_FETCH_CONFIG, 
false).build().data();
         ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
         expectedToSend1.add(new TopicIdPartition(fooId, 0, "foo"));
         expectedToSend1.add(new TopicIdPartition(fooId, 1, "foo"));
@@ -192,7 +195,7 @@ public class ShareSessionHandlerTest {
             buildResponseData(new RespEntry("foo", 0, fooId), new 
RespEntry("foo", 1, fooId)),
             List.of(),
             0);
-        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
 
         // Test a fetch request which adds one partition
         Uuid barId = addTopicId(topicNames, "bar");
@@ -200,7 +203,7 @@ public class ShareSessionHandlerTest {
         handler.addPartitionToFetch(foo0, null);
         handler.addPartitionToFetch(foo1, null);
         handler.addPartitionToFetch(bar0, null);
-        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, 
DEFAULT_SHARE_FETCH_CONFIG).build().data();
+        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, DEFAULT_SHARE_FETCH_CONFIG, 
false).build().data();
         assertMapsEqual(reqMap(new TopicIdPartition(fooId, 0, "foo"),
                         new TopicIdPartition(fooId, 1, "foo"),
                         new TopicIdPartition(barId, 0, "bar")),
@@ -214,13 +217,13 @@ public class ShareSessionHandlerTest {
             buildResponseData(new RespEntry("foo", 1, fooId)),
             List.of(),
             0);
-        handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
+        handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion());
 
         // A top-level error code will reset the session epoch
         ShareFetchResponse resp3 = ShareFetchResponse.of(error, 0, new 
LinkedHashMap<>(), List.of(), 0);
-        handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion(true));
+        handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion());
 
-        ShareFetchRequestData requestData4 = 
handler.newShareFetchBuilder(groupId, 
DEFAULT_SHARE_FETCH_CONFIG).build().data();
+        ShareFetchRequestData requestData4 = 
handler.newShareFetchBuilder(groupId, DEFAULT_SHARE_FETCH_CONFIG, 
false).build().data();
         assertEquals(requestData2.memberId(), requestData4.memberId());
         assertEquals(INITIAL_EPOCH, requestData4.shareSessionEpoch());
         assertMapsEqual(reqMap(new TopicIdPartition(fooId, 0, "foo"),
@@ -250,7 +253,7 @@ public class ShareSessionHandlerTest {
         handler.addPartitionToFetch(foo0, null);
         handler.addPartitionToFetch(foo1, null);
         handler.addPartitionToFetch(bar0, null);
-        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
         assertMapsEqual(reqMap(
                         new TopicIdPartition(fooId, 0, "foo"),
                         new TopicIdPartition(fooId, 1, "foo"),
@@ -271,11 +274,11 @@ public class ShareSessionHandlerTest {
                 new RespEntry("bar", 0, barId)),
             List.of(),
             0);
-        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
 
         // Test a fetch request which removes two partitions
         handler.addPartitionToFetch(foo1, null);
-        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
         assertEquals(memberId.toString(), requestData2.memberId());
         assertEquals(1, requestData2.shareSessionEpoch());
         assertMapsEqual(reqMap(new TopicIdPartition(fooId, 1, "foo")),
@@ -288,10 +291,10 @@ public class ShareSessionHandlerTest {
 
         // A top-level error code will reset the session epoch
         ShareFetchResponse resp2 = 
ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new 
LinkedHashMap<>(), List.of(), 0);
-        handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
+        handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion());
 
         handler.addPartitionToFetch(foo1, null);
-        ShareFetchRequestData requestData3 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData3 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
         assertEquals(memberId.toString(), requestData3.memberId());
         assertEquals(INITIAL_EPOCH, requestData3.shareSessionEpoch());
         assertMapsEqual(reqMap(new TopicIdPartition(fooId, 1, "foo")),
@@ -312,7 +315,7 @@ public class ShareSessionHandlerTest {
         Uuid topicId1 = addTopicId(topicNames, "foo");
         TopicIdPartition tp = new TopicIdPartition(topicId1, 0, "foo");
         handler.addPartitionToFetch(tp, null);
-        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
         assertMapsEqual(reqMap(new TopicIdPartition(topicId1, 0, "foo")),
                 handler.sessionPartitionMap());
         ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
@@ -324,14 +327,14 @@ public class ShareSessionHandlerTest {
             buildResponseData(new RespEntry("foo", 0, topicId1)),
             List.of(),
             0);
-        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
 
         // Try to add a new topic ID
         Uuid topicId2 = addTopicId(topicNames, "foo");
         TopicIdPartition tp2 = new TopicIdPartition(topicId2, 0, "foo");
         // Use the same data besides the topic ID
         handler.addPartitionToFetch(tp2, null);
-        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
 
         // If we started with an ID, only a new ID will count towards replaced.
         // The old topic ID partition should be forgotten, and the new one 
should be fetched.
@@ -357,7 +360,7 @@ public class ShareSessionHandlerTest {
         Uuid topicId = addTopicId(topicNames, "foo");
         TopicIdPartition foo0 = new TopicIdPartition(topicId, 0, "foo");
         handler.addPartitionToFetch(foo0, null);
-        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
         assertMapsEqual(reqMap(foo0), handler.sessionPartitionMap());
         ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
         expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
@@ -368,10 +371,10 @@ public class ShareSessionHandlerTest {
             buildResponseData(new RespEntry("foo", 0, topicId)),
             List.of(),
             0);
-        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
 
         // Remove the topic from the session by setting acknowledgements only 
- this is not asking to fetch records
-        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
         handler.addPartitionToAcknowledgeOnly(foo0, Acknowledgements.empty());
         assertEquals(Collections.singletonList(foo0), 
reqForgetList(requestData2, topicNames));
 
@@ -392,7 +395,7 @@ public class ShareSessionHandlerTest {
         Uuid topicId = addTopicId(topicNames, "foo");
         TopicIdPartition foo0 = new TopicIdPartition(topicId, 0, "foo");
         handler.addPartitionToFetch(foo0, null);
-        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
         assertMapsEqual(reqMap(foo0), handler.sessionPartitionMap());
         ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
         expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
@@ -403,10 +406,10 @@ public class ShareSessionHandlerTest {
             buildResponseData(new RespEntry("foo", 0, topicId)),
             List.of(),
             0);
-        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
 
         // Remove the topic from the session
-        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
         assertEquals(Collections.singletonList(foo0), 
reqForgetList(requestData2, topicNames));
 
         // Should have the same session ID, next epoch, and same ID usage
@@ -424,7 +427,7 @@ public class ShareSessionHandlerTest {
         Map<Uuid, String> topicNames = new HashMap<>();
         Uuid topicId = addTopicId(topicNames, "foo");
         handler.addPartitionToFetch(new TopicIdPartition(topicId, 0, "foo"), 
null);
-        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData1 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
         assertMapsEqual(reqMap(new TopicIdPartition(topicId, 0, "foo")),
                 handler.sessionPartitionMap());
         ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
@@ -436,19 +439,19 @@ public class ShareSessionHandlerTest {
             buildResponseData(new RespEntry("foo", 0, topicId)),
             List.of(),
             0);
-        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
 
         // Remove the partition from the session
-        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
         assertTrue(handler.sessionPartitionMap().isEmpty());
         assertTrue(requestData2.topics().isEmpty());
         ShareFetchResponse resp2 = ShareFetchResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(), List.of(), 0);
-        handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
+        handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion());
 
         // After the topic is removed, add a recreated topic with a new ID
         Uuid topicId2 = addTopicId(topicNames, "foo");
         handler.addPartitionToFetch(new TopicIdPartition(topicId2, 0, "foo"), 
null);
-        ShareFetchRequestData requestData3 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData3 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
 
         // Should have the same session ID and epoch 2.
         assertEquals(memberId.toString(), requestData3.memberId(), "Did not 
use same session");
@@ -477,7 +480,7 @@ public class ShareSessionHandlerTest {
         // Attempt a new ShareFetch
         TopicIdPartition foo1 = new TopicIdPartition(fooId, 1, "foo");
         handler.addPartitionToFetch(foo1, null);
-        ShareFetchRequestData requestData = 
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+        ShareFetchRequestData requestData = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
 
         // We should have cleared the unsent acknowledgements before this 
ShareFetch.
         assertEquals(0, 
requestData.topics().stream().findFirst().get().partitions().stream().findFirst().get().acknowledgementBatches().size());
@@ -488,6 +491,54 @@ public class ShareSessionHandlerTest {
         assertEquals(memberId.toString(), requestData.memberId());
     }
 
+    @Test
+    public void testCanSkipIfRequestEmpty() {
+        ShareFetchConfig shareFetchConfig = SHARE_FETCH_CONFIG_RECORD_LIMIT;
+
+        String groupId = "G1";
+        Uuid memberId = Uuid.randomUuid();
+        ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, 
memberId);
+
+        Map<Uuid, String> topicNames = new HashMap<>();
+        Uuid fooId = addTopicId(topicNames, "foo");
+        TopicIdPartition foo0 = new TopicIdPartition(fooId, 0, "foo");
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(0L, AcknowledgeType.ACCEPT);
+
+        // The request cannot be skipped when a topic-partition is added to 
the share session.
+        handler.addPartitionToFetch(foo0, null);
+        ShareFetchRequest.Builder builder = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, true);
+        assertNotNull(builder);
+
+        ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+            0,
+            buildResponseData(new RespEntry("foo", 0, fooId)),
+            List.of(),
+            0);
+        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
+
+        // The request can be skipped when the same topic-partition is already 
in the share session.
+        handler.addPartitionToFetch(foo0, null);
+        builder = handler.newShareFetchBuilder(groupId, shareFetchConfig, 
true);
+        assertNull(builder);
+
+        // The request cannot be skipped when there are acknowledgements.
+        handler.addPartitionToFetch(foo0, acknowledgements);
+        builder = handler.newShareFetchBuilder(groupId, shareFetchConfig, 
true);
+        assertNotNull(builder);
+        handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
+
+        // The request cannot be skipped when the topic-partition is removed 
from the share session.
+        builder = handler.newShareFetchBuilder(groupId, shareFetchConfig, 
true);
+        assertNotNull(builder);
+        handler.handleResponse(ShareFetchResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(), List.of(), 0), ApiKeys.SHARE_FETCH.latestVersion());
+
+        // The request can be skipped when the share session is empty.
+        builder = handler.newShareFetchBuilder(groupId, shareFetchConfig, 
true);
+        assertNull(builder);
+    }
+
     private Uuid addTopicId(Map<Uuid, String> topicNames, String name) {
         Uuid id = Uuid.randomUuid();
         topicNames.put(id, name);
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index c6eb76129a6..92e75069a79 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -782,7 +782,7 @@ public class SharePartition {
                 // check for the floor entry and adjust the base offset 
accordingly.
                 if (baseOffset < startOffset) {
                     log.info("Adjusting base offset for the fetch as it's 
prior to start offset: {}-{}"
-                            + "from {} to {}", groupId, topicIdPartition, 
baseOffset, startOffset);
+                            + " from {} to {}", groupId, topicIdPartition, 
baseOffset, startOffset);
                     baseOffset = startOffset;
                 }
             } else if (floorEntry.getValue().lastOffset() >= baseOffset) {

Reply via email to