This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b649b1ed5db KAFKA-18935: Ensure brokers do not return null records in 
FetchResponse (#19167)
b649b1ed5db is described below

commit b649b1ed5db565ebc34734247411997c2bd4f23c
Author: TengYao Chi <[email protected]>
AuthorDate: Thu Apr 10 22:21:00 2025 +0800

    KAFKA-18935: Ensure brokers do not return null records in FetchResponse 
(#19167)
    
    JIRA: KAFKA-18935  This patch ensures the broker will not return null
    records in FetchResponse.   For more details, please refer to the
    ticket.
    
    Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai
     <[email protected]>, Jun Rao <[email protected]>
---
 .../apache/kafka/common/requests/FetchRequest.java |   2 +-
 .../kafka/common/requests/FetchResponse.java       |  31 +++++-
 .../kafka/common/requests/ShareFetchRequest.java   |   6 +-
 .../kafka/common/requests/ShareFetchResponse.java  |  26 ++++-
 .../clients/consumer/KafkaShareConsumerTest.java   |   9 +-
 .../internals/ShareSessionHandlerTest.java         | 110 +++++++++------------
 .../kafka/common/requests/RequestResponseTest.java |  66 +++++++++++--
 .../main/scala/kafka/server/ControllerApis.scala   |   2 +-
 .../scala/kafka/tools/TestRaftRequestHandler.scala |   2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   4 +-
 .../kafka/jmh/common/FetchResponseBenchmark.java   |   2 +-
 .../apache/kafka/raft/KafkaNetworkChannelTest.java |   2 +-
 .../kafka/server/share/context/FinalContext.java   |   3 +-
 .../server/share/context/ShareFetchContext.java    |   5 +-
 .../server/share/context/ShareSessionContext.java  |  18 ++--
 15 files changed, 176 insertions(+), 112 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 9d4ba685139..0b478b759a8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -360,7 +360,7 @@ public class FetchRequest extends AbstractRequest {
                         .setPartitions(partitionResponses));
             });
         }
-        return new FetchResponse(new FetchResponseData()
+        return FetchResponse.of(new FetchResponseData()
                 .setThrottleTimeMs(throttleTimeMs)
                 .setErrorCode(error.code())
                 .setSessionId(data.sessionId())
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 0a0ec7949cc..d4684e07652 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -87,7 +87,7 @@ public class FetchResponse extends AbstractResponse {
      * We may also return INCONSISTENT_TOPIC_ID error as a partition-level 
error when a partition in the session has a topic ID
      * inconsistent with the log.
      */
-    public FetchResponse(FetchResponseData fetchResponseData) {
+    private FetchResponse(FetchResponseData fetchResponseData) {
         super(ApiKeys.FETCH);
         this.data = fetchResponseData;
     }
@@ -138,6 +138,13 @@ public class FetchResponse extends AbstractResponse {
         return errorCounts;
     }
 
+    /**
+     * Creates a {@link org.apache.kafka.common.requests.FetchResponse} from 
the given byte buffer.
+     * Unlike {@link 
org.apache.kafka.common.requests.FetchResponse#of(FetchResponseData)}, this 
method doesn't convert
+     * null records to {@link 
org.apache.kafka.common.record.MemoryRecords#EMPTY}.
+     *
+     * <p><strong>This method should only be used in client-side.</strong></p>
+     */
     public static FetchResponse parse(ByteBuffer buffer, short version) {
         return new FetchResponse(new FetchResponseData(new 
ByteBufferAccessor(buffer), version));
     }
@@ -220,6 +227,23 @@ public class FetchResponse extends AbstractResponse {
         return partition.records() == null ? 0 : 
partition.records().sizeInBytes();
     }
 
+    /**
+     * Creates a {@link org.apache.kafka.common.requests.FetchResponse} from 
the given data.
+     * This method converts null records to {@link 
org.apache.kafka.common.record.MemoryRecords#EMPTY}
+     * to ensure consistent record representation in the response.
+     *
+     * <p><strong>This method should only be used in server-side.</strong></p>
+     */
+    public static FetchResponse of(FetchResponseData data) {
+        for (FetchResponseData.FetchableTopicResponse response : 
data.responses()) {
+            for (FetchResponseData.PartitionData partition : 
response.partitions()) {
+                if (partition.records() == null)
+                    partition.setRecords(MemoryRecords.EMPTY);
+            }
+        }
+        return new FetchResponse(data);
+    }
+
     // TODO: remove as a part of KAFKA-12410
     public static FetchResponse of(Errors error,
                                    int throttleTimeMs,
@@ -258,6 +282,11 @@ public class FetchResponse extends AbstractResponse {
             FetchResponseData.PartitionData partitionData = entry.getValue();
             // Since PartitionData alone doesn't know the partition ID, we set 
it here
             
partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
+            // To protect the clients from failing due to null records,
+            // we always convert null records to MemoryRecords.EMPTY
+            // We will propose a KIP to change the schema definitions in the 
future
+            if (partitionData.records() == null)
+                partitionData.setRecords(MemoryRecords.EMPTY);
             // We have to keep the order of input topic-partition. Hence, we 
batch the partitions only if the last
             // batch is in the same topic group.
             FetchResponseData.FetchableTopicResponse previousTopic = 
topicResponseList.isEmpty() ? null
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 215fb6e90c7..ea8d93f2a91 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -20,13 +20,13 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ShareFetchRequestData;
-import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -161,9 +161,7 @@ public class ShareFetchRequest extends AbstractRequest {
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
-        return new ShareFetchResponse(new ShareFetchResponseData()
-                .setThrottleTimeMs(throttleTimeMs)
-                .setErrorCode(error.code()));
+        return ShareFetchResponse.of(error, throttleTimeMs, new 
LinkedHashMap<>(), List.of(), 0);
     }
 
     public static ShareFetchRequest parse(Readable readable, short version) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
index 028edb61cba..2bab79ead9b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
@@ -55,7 +55,7 @@ public class ShareFetchResponse extends AbstractResponse {
 
     private final ShareFetchResponseData data;
 
-    public ShareFetchResponse(ShareFetchResponseData data) {
+    private ShareFetchResponse(ShareFetchResponseData data) {
         super(ApiKeys.SHARE_FETCH);
         this.data = data;
     }
@@ -103,6 +103,13 @@ public class ShareFetchResponse extends AbstractResponse {
         data.setThrottleTimeMs(throttleTimeMs);
     }
 
+    /**
+     * Creates a {@link org.apache.kafka.common.requests.ShareFetchResponse} 
from the given byte buffer.
+     * Unlike {@link 
org.apache.kafka.common.requests.ShareFetchResponse#of(Errors, int, 
LinkedHashMap, List, int)},
+     * this method doesn't convert null records to {@link 
org.apache.kafka.common.record.MemoryRecords#EMPTY}.
+     *
+     * <p><strong>This method should only be used in client-side.</strong></p>
+     */
     public static ShareFetchResponse parse(ByteBuffer buffer, short version) {
         return new ShareFetchResponse(
                 new ShareFetchResponseData(new ByteBufferAccessor(buffer), 
version)
@@ -145,6 +152,13 @@ public class ShareFetchResponse extends AbstractResponse {
         return partition.records() == null ? 0 : 
partition.records().sizeInBytes();
     }
 
+    /**
+     * Creates a {@link org.apache.kafka.common.requests.ShareFetchResponse} 
from the given data.
+     * This method converts null records to {@link 
org.apache.kafka.common.record.MemoryRecords#EMPTY}
+     * to ensure consistent record representation in the response.
+     *
+     * <p><strong>This method should only be used in server-side.</strong></p>
+     */
     public static ShareFetchResponse of(Errors error,
                                         int throttleTimeMs,
                                         LinkedHashMap<TopicIdPartition, 
ShareFetchResponseData.PartitionData> responseData,
@@ -152,7 +166,7 @@ public class ShareFetchResponse extends AbstractResponse {
         return new ShareFetchResponse(toMessage(error, throttleTimeMs, 
responseData.entrySet().iterator(), nodeEndpoints, acquisitionLockTimeout));
     }
 
-    public static ShareFetchResponseData toMessage(Errors error, int 
throttleTimeMs,
+    private static ShareFetchResponseData toMessage(Errors error, int 
throttleTimeMs,
                                                    
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> 
partIterator,
                                                    List<Node> nodeEndpoints, 
int acquisitionLockTimeout) {
         Map<Uuid, ShareFetchResponseData.ShareFetchableTopicResponse> 
topicResponseList = new LinkedHashMap<>();
@@ -161,6 +175,11 @@ public class ShareFetchResponse extends AbstractResponse {
             ShareFetchResponseData.PartitionData partitionData = 
entry.getValue();
             // Since PartitionData alone doesn't know the partition ID, we set 
it here
             
partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
+            // To protect the clients from failing due to null records,
+            // we always convert null records to MemoryRecords.EMPTY
+            // We will propose a KIP to change the schema definitions in the 
future
+            if (partitionData.records() == null)
+                partitionData.setRecords(MemoryRecords.EMPTY);
             // Checking if the topic is already present in the map
             if (topicResponseList.containsKey(entry.getKey().topicId())) {
                 
topicResponseList.get(entry.getKey().topicId()).partitions().add(partitionData);
@@ -193,6 +212,7 @@ public class ShareFetchResponse extends AbstractResponse {
     public static ShareFetchResponseData.PartitionData partitionResponse(int 
partition, Errors error) {
         return new ShareFetchResponseData.PartitionData()
                 .setPartitionIndex(partition)
-                .setErrorCode(error.code());
+                .setErrorCode(error.code())
+                .setRecords(MemoryRecords.EMPTY);
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
index 53105e7d306..3a0f3461c09 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
@@ -56,6 +56,7 @@ import org.junit.jupiter.api.Timeout;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -392,13 +393,7 @@ public class KafkaShareConsumerTest {
             .setPartitionIndex(tip.partition())
             .setRecords(records)
             .setAcquiredRecords(List.of(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(count 
- 1).setDeliveryCount((short) 1)));
-        ShareFetchResponseData.ShareFetchableTopicResponse topicResponse = new 
ShareFetchResponseData.ShareFetchableTopicResponse()
-            .setTopicId(tip.topicId())
-            .setPartitions(List.of(partData));
-        return new ShareFetchResponse(
-            new ShareFetchResponseData()
-                .setResponses(List.of(topicResponse))
-        );
+        return ShareFetchResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(Map.of(tip, partData)), List.of(), 0);
     }
 
     private ShareAcknowledgeResponse shareAcknowledgeResponse() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
index e0ae3e982bb..5a52b7bc35f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
@@ -139,15 +139,14 @@ public class ShareSessionHandlerTest {
         }
     }
 
-    private static List<ShareFetchResponseData.ShareFetchableTopicResponse> 
respList(RespEntry... entries) {
-        HashMap<TopicIdPartition, 
ShareFetchResponseData.ShareFetchableTopicResponse> map = new HashMap<>();
+    private static LinkedHashMap<TopicIdPartition, 
ShareFetchResponseData.PartitionData> buildResponseData(RespEntry... entries) {
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
topicIdPartitionToPartition = new LinkedHashMap<>();
         for (RespEntry entry : entries) {
-            ShareFetchResponseData.ShareFetchableTopicResponse response = 
map.computeIfAbsent(entry.part, topicIdPartition ->
-                    new 
ShareFetchResponseData.ShareFetchableTopicResponse().setTopicId(topicIdPartition.topicId()));
-            response.partitions().add(new 
ShareFetchResponseData.PartitionData()
-                    .setPartitionIndex(entry.part.partition()));
+            ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+                .setPartitionIndex(entry.part.partition());
+            topicIdPartitionToPartition.put(entry.part, partitionData);
         }
-        return new ArrayList<>(map.values());
+        return topicIdPartitionToPartition;
     }
 
     @Test
@@ -170,13 +169,11 @@ public class ShareSessionHandlerTest {
         assertListEquals(expectedToSend1, reqFetchList(requestData1, 
topicNames));
         assertEquals(memberId.toString(), requestData1.memberId());
 
-        ShareFetchResponse resp = new ShareFetchResponse(
-                new ShareFetchResponseData()
-                        .setErrorCode(Errors.NONE.code())
-                        .setThrottleTimeMs(0)
-                        .setResponses(respList(
-                                new RespEntry("foo", 0, fooId),
-                                new RespEntry("foo", 1, fooId))));
+        ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+            0,
+            buildResponseData(new RespEntry("foo", 0, fooId), new 
RespEntry("foo", 1, fooId)),
+            List.of(),
+            0);
         handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         // Test a fetch request which adds one partition
@@ -194,18 +191,15 @@ public class ShareSessionHandlerTest {
         expectedToSend2.add(new TopicIdPartition(barId, 0, "bar"));
         assertListEquals(expectedToSend2, reqFetchList(requestData2, 
topicNames));
 
-        ShareFetchResponse resp2 = new ShareFetchResponse(
-                new ShareFetchResponseData()
-                        .setErrorCode(Errors.NONE.code())
-                        .setThrottleTimeMs(0)
-                        .setResponses(respList(
-                                new RespEntry("foo", 1, fooId))));
+        ShareFetchResponse resp2 = ShareFetchResponse.of(Errors.NONE,
+            0,
+            buildResponseData(new RespEntry("foo", 1, fooId)),
+            List.of(),
+            0);
         handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         // A top-level error code will reset the session epoch
-        ShareFetchResponse resp3 = new ShareFetchResponse(
-                new ShareFetchResponseData()
-                        
.setErrorCode(Errors.INVALID_SHARE_SESSION_EPOCH.code()));
+        ShareFetchResponse resp3 = 
ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new 
LinkedHashMap<>(), List.of(), 0);
         handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         ShareFetchRequestData requestData4 = 
handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
@@ -251,14 +245,14 @@ public class ShareSessionHandlerTest {
         assertListEquals(expectedToSend1, reqFetchList(requestData1, 
topicNames));
         assertEquals(memberId.toString(), requestData1.memberId());
 
-        ShareFetchResponse resp = new ShareFetchResponse(
-                new ShareFetchResponseData()
-                        .setErrorCode(Errors.NONE.code())
-                        .setThrottleTimeMs(0)
-                        .setResponses(respList(
-                                new RespEntry("foo", 0, fooId),
-                                new RespEntry("foo", 1, fooId),
-                                new RespEntry("bar", 0, barId))));
+        ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+            0,
+            buildResponseData(
+                new RespEntry("foo", 0, fooId),
+                new RespEntry("foo", 1, fooId),
+                new RespEntry("bar", 0, barId)),
+            List.of(),
+            0);
         handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         // Test a fetch request which removes two partitions
@@ -275,9 +269,7 @@ public class ShareSessionHandlerTest {
         assertListEquals(expectedToForget2, reqForgetList(requestData2, 
topicNames));
 
         // A top-level error code will reset the session epoch
-        ShareFetchResponse resp2 = new ShareFetchResponse(
-                new ShareFetchResponseData()
-                        
.setErrorCode(Errors.INVALID_SHARE_SESSION_EPOCH.code()));
+        ShareFetchResponse resp2 = 
ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new 
LinkedHashMap<>(), List.of(), 0);
         handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         handler.addPartitionToFetch(foo1, null);
@@ -309,12 +301,11 @@ public class ShareSessionHandlerTest {
         expectedToSend1.add(new TopicIdPartition(topicId1, 0, "foo"));
         assertListEquals(expectedToSend1, reqFetchList(requestData1, 
topicNames));
 
-        ShareFetchResponse resp = new ShareFetchResponse(
-                new ShareFetchResponseData()
-                        .setErrorCode(Errors.NONE.code())
-                        .setThrottleTimeMs(0)
-                        .setResponses(respList(
-                                new RespEntry("foo", 0, topicId1))));
+        ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+            0,
+            buildResponseData(new RespEntry("foo", 0, topicId1)),
+            List.of(),
+            0);
         handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         // Try to add a new topic ID
@@ -354,12 +345,11 @@ public class ShareSessionHandlerTest {
         expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
         assertListEquals(expectedToSend1, reqFetchList(requestData1, 
topicNames));
 
-        ShareFetchResponse resp = new ShareFetchResponse(
-                new ShareFetchResponseData()
-                        .setErrorCode(Errors.NONE.code())
-                        .setThrottleTimeMs(0)
-                        .setResponses(respList(
-                                new RespEntry("foo", 0, topicId))));
+        ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+            0,
+            buildResponseData(new RespEntry("foo", 0, topicId)),
+            List.of(),
+            0);
         handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         // Remove the topic from the session by setting acknowledgements only 
- this is not asking to fetch records
@@ -390,12 +380,11 @@ public class ShareSessionHandlerTest {
         expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
         assertListEquals(expectedToSend1, reqFetchList(requestData1, 
topicNames));
 
-        ShareFetchResponse resp = new ShareFetchResponse(
-            new ShareFetchResponseData()
-                .setErrorCode(Errors.NONE.code())
-                .setThrottleTimeMs(0)
-                .setResponses(respList(
-                    new RespEntry("foo", 0, topicId))));
+        ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+            0,
+            buildResponseData(new RespEntry("foo", 0, topicId)),
+            List.of(),
+            0);
         handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         // Remove the topic from the session
@@ -424,23 +413,18 @@ public class ShareSessionHandlerTest {
         expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
         assertListEquals(expectedToSend1, reqFetchList(requestData1, 
topicNames));
 
-        ShareFetchResponse resp = new ShareFetchResponse(
-                new ShareFetchResponseData()
-                        .setErrorCode(Errors.NONE.code())
-                        .setThrottleTimeMs(0)
-                        .setResponses(respList(
-                                new RespEntry("foo", 0, topicId))));
+        ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+            0,
+            buildResponseData(new RespEntry("foo", 0, topicId)),
+            List.of(),
+            0);
         handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         // Remove the partition from the session
         ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
         assertTrue(handler.sessionPartitionMap().isEmpty());
         assertTrue(requestData2.topics().isEmpty());
-        ShareFetchResponse resp2 = new ShareFetchResponse(
-                new ShareFetchResponseData()
-                        .setErrorCode(Errors.NONE.code())
-                        .setThrottleTimeMs(0)
-                        .setResponses(respList()));
+        ShareFetchResponse resp2 = ShareFetchResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(), List.of(), 0);
         handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         // After the topic is removed, add a recreated topic with a new ID
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 005a401c95d..b33dec17d9a 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -583,6 +583,57 @@ public class RequestResponseTest {
                 deserialized.responseData(topicNames, (short) 4));
     }
 
+    @Test
+    public void testFetchResponseShouldNotHaveNullRecords() {
+        Uuid id = Uuid.randomUuid();
+        FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+            .setPartitionIndex(0)
+            .setHighWatermark(1000000)
+            .setLogStartOffset(100)
+            .setLastStableOffset(200)
+            .setRecords(null);
+        FetchResponseData.FetchableTopicResponse response = new 
FetchResponseData.FetchableTopicResponse()
+            .setTopic("topic")
+            .setPartitions(List.of(partitionData))
+            .setTopicId(id);
+        FetchResponseData data = new 
FetchResponseData().setResponses(List.of(response));
+
+        response.setPartitions(List.of(FetchResponse.partitionResponse(0, 
Errors.NONE)));
+        FetchResponse fetchResponse = FetchResponse.of(data);
+        validateNoNullRecords(fetchResponse);
+
+        TopicIdPartition topicIdPartition = new TopicIdPartition(id, new 
TopicPartition("test", 0));
+        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
tpToData = new LinkedHashMap<>(Map.of(topicIdPartition, partitionData));
+        fetchResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
tpToData);
+        validateNoNullRecords(fetchResponse);
+    }
+
+    private void validateNoNullRecords(FetchResponse fetchResponse) {
+        fetchResponse.data().responses().stream()
+            .flatMap(response -> response.partitions().stream())
+            .forEach(partition -> assertEquals(MemoryRecords.EMPTY, 
partition.records()));
+    }
+
+    @Test
+    public void testShareFetchResponseShouldNotHaveNullRecords() {
+        Uuid id = Uuid.randomUuid();
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+            .setPartitionIndex(0)
+            .setAcquiredRecords(List.of())
+            .setRecords(null);
+
+        TopicIdPartition topicIdPartition = new TopicIdPartition(id, new 
TopicPartition("test", 0));
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
tpToData = new LinkedHashMap<>(Map.of(topicIdPartition, partitionData));
+        ShareFetchResponse shareFetchResponse = 
ShareFetchResponse.of(Errors.NONE, 0, tpToData, List.of(), 0);
+        validateNoNullRecords(shareFetchResponse);
+    }
+
+    private void validateNoNullRecords(ShareFetchResponse fetchResponse) {
+        fetchResponse.data().responses().stream()
+            .flatMap(response -> response.partitions().stream())
+            .forEach(partition -> assertEquals(MemoryRecords.EMPTY, 
partition.records()));
+    }
+
     @Test
     public void verifyFetchResponseFullWrites() throws Exception {
         verifyFetchResponseFullWrite(FETCH.latestVersion(), 
createFetchResponse(123));
@@ -1443,7 +1494,6 @@ public class RequestResponseTest {
     }
 
     private ShareFetchResponse createShareFetchResponse() {
-        ShareFetchResponseData data = new ShareFetchResponseData();
         MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, 
new SimpleRecord("blah".getBytes()));
         ShareFetchResponseData.PartitionData partition = new 
ShareFetchResponseData.PartitionData()
                 .setPartitionIndex(0)
@@ -1453,14 +1503,10 @@ public class RequestResponseTest {
                         .setFirstOffset(0)
                         .setLastOffset(0)
                         .setDeliveryCount((short) 1)));
-        ShareFetchResponseData.ShareFetchableTopicResponse response = new 
ShareFetchResponseData.ShareFetchableTopicResponse()
-                .setTopicId(Uuid.randomUuid())
-                .setPartitions(singletonList(partition));
-
-        data.setResponses(singletonList(response));
-        data.setThrottleTimeMs(345);
-        data.setErrorCode(Errors.NONE.code());
-        return new ShareFetchResponse(data);
+        TopicIdPartition topicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("", 
partition.partitionIndex()));
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
topicIdPartitionToPartition = new LinkedHashMap<>();
+        topicIdPartitionToPartition.put(topicIdPartition, partition);
+        return ShareFetchResponse.of(Errors.NONE, 345, 
topicIdPartitionToPartition, List.of(), 0);
     }
 
     private ShareAcknowledgeRequest createShareAcknowledgeRequest(short 
version) {
@@ -2103,7 +2149,7 @@ public class RequestResponseTest {
             response.setTopicId(Uuid.randomUuid());
         }
         data.setResponses(singletonList(response));
-        return new FetchResponse(data);
+        return FetchResponse.of(data);
     }
 
     private HeartbeatRequest createHeartBeatRequest(short version) {
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index da9a1b86322..b294ddd3108 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -187,7 +187,7 @@ class ControllerApis(
 
   def handleFetch(request: RequestChannel.Request): CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-    handleRaftRequest(request, response => new 
FetchResponse(response.asInstanceOf[FetchResponseData]))
+    handleRaftRequest(request, response => 
FetchResponse.of(response.asInstanceOf[FetchResponseData]))
   }
 
   def handleFetchSnapshot(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala 
b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
index 95df38c4e14..733e8228b7c 100644
--- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
@@ -81,7 +81,7 @@ class TestRaftRequestHandler(
   }
 
   private def handleFetch(request: RequestChannel.Request): Unit = {
-    handle(request, response => new 
FetchResponse(response.asInstanceOf[FetchResponseData]))
+    handle(request, response => 
FetchResponse.of(response.asInstanceOf[FetchResponseData]))
   }
 
   private def handleFetchSnapshot(request: RequestChannel.Request): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b4768a90317..2283ca36582 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -5733,7 +5733,7 @@ class KafkaApisTest extends Logging {
       0,
       Errors.TOPIC_AUTHORIZATION_FAILED.code,
       Errors.NONE.code,
-      null,
+      MemoryRecords.EMPTY,
       Collections.emptyList[AcquiredRecords](),
       partitionData1
     )
@@ -5772,7 +5772,7 @@ class KafkaApisTest extends Logging {
       0,
       Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
       Errors.NONE.code,
-      null,
+      MemoryRecords.EMPTY,
       Collections.emptyList[AcquiredRecords](),
       partitionData4
     )
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
index ed68202abe9..adbb8a5b0d0 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
@@ -115,7 +115,7 @@ public class FetchResponseBenchmark {
 
     @Benchmark
     public int testPartitionMapFromData() {
-        return new FetchResponse(fetchResponseData).responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).size();
+        return FetchResponse.of(fetchResponseData).responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).size();
     }
 
     @Benchmark
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
index bdae77bdd18..f68b4b6f061 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
@@ -377,7 +377,7 @@ public class KafkaNetworkChannelTest {
         } else if (responseData instanceof EndQuorumEpochResponseData) {
             return new EndQuorumEpochResponse((EndQuorumEpochResponseData) 
responseData);
         } else if (responseData instanceof FetchResponseData) {
-            return new FetchResponse((FetchResponseData) responseData);
+            return FetchResponse.of((FetchResponseData) responseData);
         } else if (responseData instanceof FetchSnapshotResponseData) {
             return new FetchSnapshotResponse((FetchSnapshotResponseData) 
responseData);
         } else if (responseData instanceof UpdateRaftVoterResponseData) {
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java 
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
index 9abc5591ddd..c42ed8d5364 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
@@ -54,8 +54,7 @@ public class FinalContext extends ShareFetchContext {
     public ShareFetchResponse updateAndGenerateResponseData(String groupId, 
Uuid memberId,
                                                      
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
         log.debug("Final context returning {}", 
partitionsToLogString(updates.keySet()));
-        return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, 0,
-                updates.entrySet().iterator(), List.of(), 0));
+        return ShareFetchResponse.of(Errors.NONE, 0, updates, List.of(), 0);
     }
 
     @Override
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
index 36bf67ad828..6de1f57f190 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
@@ -25,7 +25,6 @@ import 
org.apache.kafka.server.share.ErroneousAndValidPartitionData;
 import org.apache.kafka.server.share.session.ShareSession;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 
@@ -50,8 +49,8 @@ public abstract class ShareFetchContext {
      * @return - An empty throttled response.
      */
     public ShareFetchResponse throttleResponse(int throttleTimeMs) {
-        return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
-                Collections.emptyIterator(), List.of(), 0));
+        return ShareFetchResponse.of(Errors.NONE, throttleTimeMs,
+                new LinkedHashMap<>(), List.of(), 0);
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
index c3d177c8808..1c2242d54f3 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
@@ -102,8 +102,7 @@ public class ShareSessionContext extends ShareFetchContext {
     @Override
     public ShareFetchResponse throttleResponse(int throttleTimeMs) {
         if (!isSubsequent) {
-            return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
-                    Collections.emptyIterator(), List.of(), 0));
+            return ShareFetchResponse.of(Errors.NONE, throttleTimeMs, new 
LinkedHashMap<>(), List.of(), 0);
         }
         int expectedEpoch = 
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
         int sessionEpoch;
@@ -113,11 +112,9 @@ public class ShareSessionContext extends ShareFetchContext 
{
         if (sessionEpoch != expectedEpoch) {
             log.debug("Subsequent share session {} expected epoch {}, but got 
{}. " +
                     "Possible duplicate request.", session.key(), 
expectedEpoch, sessionEpoch);
-            return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
-                    throttleTimeMs, Collections.emptyIterator(), List.of(), 
0));
+            return ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 
throttleTimeMs, new LinkedHashMap<>(), List.of(), 0);
         }
-        return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
-                Collections.emptyIterator(), List.of(), 0));
+        return ShareFetchResponse.of(Errors.NONE, throttleTimeMs, new 
LinkedHashMap<>(), List.of(), 0);
     }
 
     /**
@@ -195,8 +192,7 @@ public class ShareSessionContext extends ShareFetchContext {
     public ShareFetchResponse updateAndGenerateResponseData(String groupId, 
Uuid memberId,
                                                      
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
         if (!isSubsequent) {
-            return new ShareFetchResponse(ShareFetchResponse.toMessage(
-                    Errors.NONE, 0, updates.entrySet().iterator(), List.of(), 
0));
+            return ShareFetchResponse.of(Errors.NONE, 0, updates, List.of(), 
0);
         } else {
             int expectedEpoch = 
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
             int sessionEpoch;
@@ -206,8 +202,7 @@ public class ShareSessionContext extends ShareFetchContext {
             if (sessionEpoch != expectedEpoch) {
                 log.debug("Subsequent share session {} expected epoch {}, but 
got {}. Possible duplicate request.",
                         session.key(), expectedEpoch, sessionEpoch);
-                return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
-                        0, Collections.emptyIterator(), List.of(), 0));
+                return 
ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new 
LinkedHashMap<>(), List.of(), 0);
             }
             // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
             Iterator<Map.Entry<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> partitionIterator = new 
PartitionIterator(
@@ -217,8 +212,7 @@ public class ShareSessionContext extends ShareFetchContext {
             }
             log.debug("Subsequent share session context with session key {} 
returning {}", session.key(),
                     partitionsToLogString(updates.keySet()));
-            return new ShareFetchResponse(ShareFetchResponse.toMessage(
-                    Errors.NONE, 0, updates.entrySet().iterator(), List.of(), 
0));
+            return ShareFetchResponse.of(Errors.NONE, 0, updates, List.of(), 
0);
         }
     }
 


Reply via email to