This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b649b1ed5db KAFKA-18935: Ensure brokers do not return null records in
FetchResponse (#19167)
b649b1ed5db is described below
commit b649b1ed5db565ebc34734247411997c2bd4f23c
Author: TengYao Chi <[email protected]>
AuthorDate: Thu Apr 10 22:21:00 2025 +0800
KAFKA-18935: Ensure brokers do not return null records in FetchResponse
(#19167)
JIRA: KAFKA-18935 This patch ensures the broker will not return null
records in FetchResponse. For more details, please refer to the
ticket.
Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai
<[email protected]>, Jun Rao <[email protected]>
---
.../apache/kafka/common/requests/FetchRequest.java | 2 +-
.../kafka/common/requests/FetchResponse.java | 31 +++++-
.../kafka/common/requests/ShareFetchRequest.java | 6 +-
.../kafka/common/requests/ShareFetchResponse.java | 26 ++++-
.../clients/consumer/KafkaShareConsumerTest.java | 9 +-
.../internals/ShareSessionHandlerTest.java | 110 +++++++++------------
.../kafka/common/requests/RequestResponseTest.java | 66 +++++++++++--
.../main/scala/kafka/server/ControllerApis.scala | 2 +-
.../scala/kafka/tools/TestRaftRequestHandler.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +-
.../kafka/jmh/common/FetchResponseBenchmark.java | 2 +-
.../apache/kafka/raft/KafkaNetworkChannelTest.java | 2 +-
.../kafka/server/share/context/FinalContext.java | 3 +-
.../server/share/context/ShareFetchContext.java | 5 +-
.../server/share/context/ShareSessionContext.java | 18 ++--
15 files changed, 176 insertions(+), 112 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 9d4ba685139..0b478b759a8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -360,7 +360,7 @@ public class FetchRequest extends AbstractRequest {
.setPartitions(partitionResponses));
});
}
- return new FetchResponse(new FetchResponseData()
+ return FetchResponse.of(new FetchResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code())
.setSessionId(data.sessionId())
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 0a0ec7949cc..d4684e07652 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -87,7 +87,7 @@ public class FetchResponse extends AbstractResponse {
* We may also return INCONSISTENT_TOPIC_ID error as a partition-level
error when a partition in the session has a topic ID
* inconsistent with the log.
*/
- public FetchResponse(FetchResponseData fetchResponseData) {
+ private FetchResponse(FetchResponseData fetchResponseData) {
super(ApiKeys.FETCH);
this.data = fetchResponseData;
}
@@ -138,6 +138,13 @@ public class FetchResponse extends AbstractResponse {
return errorCounts;
}
+ /**
+ * Creates a {@link org.apache.kafka.common.requests.FetchResponse} from
the given byte buffer.
+ * Unlike {@link
org.apache.kafka.common.requests.FetchResponse#of(FetchResponseData)}, this
method doesn't convert
+ * null records to {@link
org.apache.kafka.common.record.MemoryRecords#EMPTY}.
+ *
+ * <p><strong>This method should only be used in client-side.</strong></p>
+ */
public static FetchResponse parse(ByteBuffer buffer, short version) {
return new FetchResponse(new FetchResponseData(new
ByteBufferAccessor(buffer), version));
}
@@ -220,6 +227,23 @@ public class FetchResponse extends AbstractResponse {
return partition.records() == null ? 0 :
partition.records().sizeInBytes();
}
+ /**
+ * Creates a {@link org.apache.kafka.common.requests.FetchResponse} from
the given data.
+ * This method converts null records to {@link
org.apache.kafka.common.record.MemoryRecords#EMPTY}
+ * to ensure consistent record representation in the response.
+ *
+ * <p><strong>This method should only be used in server-side.</strong></p>
+ */
+ public static FetchResponse of(FetchResponseData data) {
+ for (FetchResponseData.FetchableTopicResponse response :
data.responses()) {
+ for (FetchResponseData.PartitionData partition :
response.partitions()) {
+ if (partition.records() == null)
+ partition.setRecords(MemoryRecords.EMPTY);
+ }
+ }
+ return new FetchResponse(data);
+ }
+
// TODO: remove as a part of KAFKA-12410
public static FetchResponse of(Errors error,
int throttleTimeMs,
@@ -258,6 +282,11 @@ public class FetchResponse extends AbstractResponse {
FetchResponseData.PartitionData partitionData = entry.getValue();
// Since PartitionData alone doesn't know the partition ID, we set
it here
partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
+ // To protect the clients from failing due to null records,
+ // we always convert null records to MemoryRecords.EMPTY
+ // We will propose a KIP to change the schema definitions in the
future
+ if (partitionData.records() == null)
+ partitionData.setRecords(MemoryRecords.EMPTY);
// We have to keep the order of input topic-partition. Hence, we
batch the partitions only if the last
// batch is in the same topic group.
FetchResponseData.FetchableTopicResponse previousTopic =
topicResponseList.isEmpty() ? null
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 215fb6e90c7..ea8d93f2a91 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -20,13 +20,13 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchRequestData;
-import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -161,9 +161,7 @@ public class ShareFetchRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
- return new ShareFetchResponse(new ShareFetchResponseData()
- .setThrottleTimeMs(throttleTimeMs)
- .setErrorCode(error.code()));
+ return ShareFetchResponse.of(error, throttleTimeMs, new
LinkedHashMap<>(), List.of(), 0);
}
public static ShareFetchRequest parse(Readable readable, short version) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
index 028edb61cba..2bab79ead9b 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
@@ -55,7 +55,7 @@ public class ShareFetchResponse extends AbstractResponse {
private final ShareFetchResponseData data;
- public ShareFetchResponse(ShareFetchResponseData data) {
+ private ShareFetchResponse(ShareFetchResponseData data) {
super(ApiKeys.SHARE_FETCH);
this.data = data;
}
@@ -103,6 +103,13 @@ public class ShareFetchResponse extends AbstractResponse {
data.setThrottleTimeMs(throttleTimeMs);
}
+ /**
+ * Creates a {@link org.apache.kafka.common.requests.ShareFetchResponse}
from the given byte buffer.
+ * Unlike {@link
org.apache.kafka.common.requests.ShareFetchResponse#of(Errors, int,
LinkedHashMap, List, int)},
+ * this method doesn't convert null records to {@link
org.apache.kafka.common.record.MemoryRecords#EMPTY}.
+ *
+ * <p><strong>This method should only be used in client-side.</strong></p>
+ */
public static ShareFetchResponse parse(ByteBuffer buffer, short version) {
return new ShareFetchResponse(
new ShareFetchResponseData(new ByteBufferAccessor(buffer),
version)
@@ -145,6 +152,13 @@ public class ShareFetchResponse extends AbstractResponse {
return partition.records() == null ? 0 :
partition.records().sizeInBytes();
}
+ /**
+ * Creates a {@link org.apache.kafka.common.requests.ShareFetchResponse}
from the given data.
+ * This method converts null records to {@link
org.apache.kafka.common.record.MemoryRecords#EMPTY}
+ * to ensure consistent record representation in the response.
+ *
+ * <p><strong>This method should only be used in server-side.</strong></p>
+ */
public static ShareFetchResponse of(Errors error,
int throttleTimeMs,
LinkedHashMap<TopicIdPartition,
ShareFetchResponseData.PartitionData> responseData,
@@ -152,7 +166,7 @@ public class ShareFetchResponse extends AbstractResponse {
return new ShareFetchResponse(toMessage(error, throttleTimeMs,
responseData.entrySet().iterator(), nodeEndpoints, acquisitionLockTimeout));
}
- public static ShareFetchResponseData toMessage(Errors error, int
throttleTimeMs,
+ private static ShareFetchResponseData toMessage(Errors error, int
throttleTimeMs,
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>>
partIterator,
List<Node> nodeEndpoints,
int acquisitionLockTimeout) {
Map<Uuid, ShareFetchResponseData.ShareFetchableTopicResponse>
topicResponseList = new LinkedHashMap<>();
@@ -161,6 +175,11 @@ public class ShareFetchResponse extends AbstractResponse {
ShareFetchResponseData.PartitionData partitionData =
entry.getValue();
// Since PartitionData alone doesn't know the partition ID, we set
it here
partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
+ // To protect the clients from failing due to null records,
+ // we always convert null records to MemoryRecords.EMPTY
+ // We will propose a KIP to change the schema definitions in the
future
+ if (partitionData.records() == null)
+ partitionData.setRecords(MemoryRecords.EMPTY);
// Checking if the topic is already present in the map
if (topicResponseList.containsKey(entry.getKey().topicId())) {
topicResponseList.get(entry.getKey().topicId()).partitions().add(partitionData);
@@ -193,6 +212,7 @@ public class ShareFetchResponse extends AbstractResponse {
public static ShareFetchResponseData.PartitionData partitionResponse(int
partition, Errors error) {
return new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
- .setErrorCode(error.code());
+ .setErrorCode(error.code())
+ .setRecords(MemoryRecords.EMPTY);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
index 53105e7d306..3a0f3461c09 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
@@ -56,6 +56,7 @@ import org.junit.jupiter.api.Timeout;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -392,13 +393,7 @@ public class KafkaShareConsumerTest {
.setPartitionIndex(tip.partition())
.setRecords(records)
.setAcquiredRecords(List.of(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(count
- 1).setDeliveryCount((short) 1)));
- ShareFetchResponseData.ShareFetchableTopicResponse topicResponse = new
ShareFetchResponseData.ShareFetchableTopicResponse()
- .setTopicId(tip.topicId())
- .setPartitions(List.of(partData));
- return new ShareFetchResponse(
- new ShareFetchResponseData()
- .setResponses(List.of(topicResponse))
- );
+ return ShareFetchResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(Map.of(tip, partData)), List.of(), 0);
}
private ShareAcknowledgeResponse shareAcknowledgeResponse() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
index e0ae3e982bb..5a52b7bc35f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
@@ -139,15 +139,14 @@ public class ShareSessionHandlerTest {
}
}
- private static List<ShareFetchResponseData.ShareFetchableTopicResponse>
respList(RespEntry... entries) {
- HashMap<TopicIdPartition,
ShareFetchResponseData.ShareFetchableTopicResponse> map = new HashMap<>();
+ private static LinkedHashMap<TopicIdPartition,
ShareFetchResponseData.PartitionData> buildResponseData(RespEntry... entries) {
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
topicIdPartitionToPartition = new LinkedHashMap<>();
for (RespEntry entry : entries) {
- ShareFetchResponseData.ShareFetchableTopicResponse response =
map.computeIfAbsent(entry.part, topicIdPartition ->
- new
ShareFetchResponseData.ShareFetchableTopicResponse().setTopicId(topicIdPartition.topicId()));
- response.partitions().add(new
ShareFetchResponseData.PartitionData()
- .setPartitionIndex(entry.part.partition()));
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(entry.part.partition());
+ topicIdPartitionToPartition.put(entry.part, partitionData);
}
- return new ArrayList<>(map.values());
+ return topicIdPartitionToPartition;
}
@Test
@@ -170,13 +169,11 @@ public class ShareSessionHandlerTest {
assertListEquals(expectedToSend1, reqFetchList(requestData1,
topicNames));
assertEquals(memberId.toString(), requestData1.memberId());
- ShareFetchResponse resp = new ShareFetchResponse(
- new ShareFetchResponseData()
- .setErrorCode(Errors.NONE.code())
- .setThrottleTimeMs(0)
- .setResponses(respList(
- new RespEntry("foo", 0, fooId),
- new RespEntry("foo", 1, fooId))));
+ ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+ 0,
+ buildResponseData(new RespEntry("foo", 0, fooId), new
RespEntry("foo", 1, fooId)),
+ List.of(),
+ 0);
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
// Test a fetch request which adds one partition
@@ -194,18 +191,15 @@ public class ShareSessionHandlerTest {
expectedToSend2.add(new TopicIdPartition(barId, 0, "bar"));
assertListEquals(expectedToSend2, reqFetchList(requestData2,
topicNames));
- ShareFetchResponse resp2 = new ShareFetchResponse(
- new ShareFetchResponseData()
- .setErrorCode(Errors.NONE.code())
- .setThrottleTimeMs(0)
- .setResponses(respList(
- new RespEntry("foo", 1, fooId))));
+ ShareFetchResponse resp2 = ShareFetchResponse.of(Errors.NONE,
+ 0,
+ buildResponseData(new RespEntry("foo", 1, fooId)),
+ List.of(),
+ 0);
handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
// A top-level error code will reset the session epoch
- ShareFetchResponse resp3 = new ShareFetchResponse(
- new ShareFetchResponseData()
-
.setErrorCode(Errors.INVALID_SHARE_SESSION_EPOCH.code()));
+ ShareFetchResponse resp3 =
ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new
LinkedHashMap<>(), List.of(), 0);
handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion(true));
ShareFetchRequestData requestData4 =
handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
@@ -251,14 +245,14 @@ public class ShareSessionHandlerTest {
assertListEquals(expectedToSend1, reqFetchList(requestData1,
topicNames));
assertEquals(memberId.toString(), requestData1.memberId());
- ShareFetchResponse resp = new ShareFetchResponse(
- new ShareFetchResponseData()
- .setErrorCode(Errors.NONE.code())
- .setThrottleTimeMs(0)
- .setResponses(respList(
- new RespEntry("foo", 0, fooId),
- new RespEntry("foo", 1, fooId),
- new RespEntry("bar", 0, barId))));
+ ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+ 0,
+ buildResponseData(
+ new RespEntry("foo", 0, fooId),
+ new RespEntry("foo", 1, fooId),
+ new RespEntry("bar", 0, barId)),
+ List.of(),
+ 0);
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
// Test a fetch request which removes two partitions
@@ -275,9 +269,7 @@ public class ShareSessionHandlerTest {
assertListEquals(expectedToForget2, reqForgetList(requestData2,
topicNames));
// A top-level error code will reset the session epoch
- ShareFetchResponse resp2 = new ShareFetchResponse(
- new ShareFetchResponseData()
-
.setErrorCode(Errors.INVALID_SHARE_SESSION_EPOCH.code()));
+ ShareFetchResponse resp2 =
ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new
LinkedHashMap<>(), List.of(), 0);
handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
handler.addPartitionToFetch(foo1, null);
@@ -309,12 +301,11 @@ public class ShareSessionHandlerTest {
expectedToSend1.add(new TopicIdPartition(topicId1, 0, "foo"));
assertListEquals(expectedToSend1, reqFetchList(requestData1,
topicNames));
- ShareFetchResponse resp = new ShareFetchResponse(
- new ShareFetchResponseData()
- .setErrorCode(Errors.NONE.code())
- .setThrottleTimeMs(0)
- .setResponses(respList(
- new RespEntry("foo", 0, topicId1))));
+ ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+ 0,
+ buildResponseData(new RespEntry("foo", 0, topicId1)),
+ List.of(),
+ 0);
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
// Try to add a new topic ID
@@ -354,12 +345,11 @@ public class ShareSessionHandlerTest {
expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
assertListEquals(expectedToSend1, reqFetchList(requestData1,
topicNames));
- ShareFetchResponse resp = new ShareFetchResponse(
- new ShareFetchResponseData()
- .setErrorCode(Errors.NONE.code())
- .setThrottleTimeMs(0)
- .setResponses(respList(
- new RespEntry("foo", 0, topicId))));
+ ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+ 0,
+ buildResponseData(new RespEntry("foo", 0, topicId)),
+ List.of(),
+ 0);
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
// Remove the topic from the session by setting acknowledgements only
- this is not asking to fetch records
@@ -390,12 +380,11 @@ public class ShareSessionHandlerTest {
expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
assertListEquals(expectedToSend1, reqFetchList(requestData1,
topicNames));
- ShareFetchResponse resp = new ShareFetchResponse(
- new ShareFetchResponseData()
- .setErrorCode(Errors.NONE.code())
- .setThrottleTimeMs(0)
- .setResponses(respList(
- new RespEntry("foo", 0, topicId))));
+ ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+ 0,
+ buildResponseData(new RespEntry("foo", 0, topicId)),
+ List.of(),
+ 0);
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
// Remove the topic from the session
@@ -424,23 +413,18 @@ public class ShareSessionHandlerTest {
expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
assertListEquals(expectedToSend1, reqFetchList(requestData1,
topicNames));
- ShareFetchResponse resp = new ShareFetchResponse(
- new ShareFetchResponseData()
- .setErrorCode(Errors.NONE.code())
- .setThrottleTimeMs(0)
- .setResponses(respList(
- new RespEntry("foo", 0, topicId))));
+ ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+ 0,
+ buildResponseData(new RespEntry("foo", 0, topicId)),
+ List.of(),
+ 0);
handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
// Remove the partition from the session
ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, fetchConfig).build().data();
assertTrue(handler.sessionPartitionMap().isEmpty());
assertTrue(requestData2.topics().isEmpty());
- ShareFetchResponse resp2 = new ShareFetchResponse(
- new ShareFetchResponseData()
- .setErrorCode(Errors.NONE.code())
- .setThrottleTimeMs(0)
- .setResponses(respList()));
+ ShareFetchResponse resp2 = ShareFetchResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(), List.of(), 0);
handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
// After the topic is removed, add a recreated topic with a new ID
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 005a401c95d..b33dec17d9a 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -583,6 +583,57 @@ public class RequestResponseTest {
deserialized.responseData(topicNames, (short) 4));
}
+ @Test
+ public void testFetchResponseShouldNotHaveNullRecords() {
+ Uuid id = Uuid.randomUuid();
+ FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
+ .setPartitionIndex(0)
+ .setHighWatermark(1000000)
+ .setLogStartOffset(100)
+ .setLastStableOffset(200)
+ .setRecords(null);
+ FetchResponseData.FetchableTopicResponse response = new
FetchResponseData.FetchableTopicResponse()
+ .setTopic("topic")
+ .setPartitions(List.of(partitionData))
+ .setTopicId(id);
+ FetchResponseData data = new
FetchResponseData().setResponses(List.of(response));
+
+ response.setPartitions(List.of(FetchResponse.partitionResponse(0,
Errors.NONE)));
+ FetchResponse fetchResponse = FetchResponse.of(data);
+ validateNoNullRecords(fetchResponse);
+
+ TopicIdPartition topicIdPartition = new TopicIdPartition(id, new
TopicPartition("test", 0));
+ LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>
tpToData = new LinkedHashMap<>(Map.of(topicIdPartition, partitionData));
+ fetchResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID,
tpToData);
+ validateNoNullRecords(fetchResponse);
+ }
+
+ private void validateNoNullRecords(FetchResponse fetchResponse) {
+ fetchResponse.data().responses().stream()
+ .flatMap(response -> response.partitions().stream())
+ .forEach(partition -> assertEquals(MemoryRecords.EMPTY,
partition.records()));
+ }
+
+ @Test
+ public void testShareFetchResponseShouldNotHaveNullRecords() {
+ Uuid id = Uuid.randomUuid();
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(0)
+ .setAcquiredRecords(List.of())
+ .setRecords(null);
+
+ TopicIdPartition topicIdPartition = new TopicIdPartition(id, new
TopicPartition("test", 0));
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
tpToData = new LinkedHashMap<>(Map.of(topicIdPartition, partitionData));
+ ShareFetchResponse shareFetchResponse =
ShareFetchResponse.of(Errors.NONE, 0, tpToData, List.of(), 0);
+ validateNoNullRecords(shareFetchResponse);
+ }
+
+ private void validateNoNullRecords(ShareFetchResponse fetchResponse) {
+ fetchResponse.data().responses().stream()
+ .flatMap(response -> response.partitions().stream())
+ .forEach(partition -> assertEquals(MemoryRecords.EMPTY,
partition.records()));
+ }
+
@Test
public void verifyFetchResponseFullWrites() throws Exception {
verifyFetchResponseFullWrite(FETCH.latestVersion(),
createFetchResponse(123));
@@ -1443,7 +1494,6 @@ public class RequestResponseTest {
}
private ShareFetchResponse createShareFetchResponse() {
- ShareFetchResponseData data = new ShareFetchResponseData();
MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
new SimpleRecord("blah".getBytes()));
ShareFetchResponseData.PartitionData partition = new
ShareFetchResponseData.PartitionData()
.setPartitionIndex(0)
@@ -1453,14 +1503,10 @@ public class RequestResponseTest {
.setFirstOffset(0)
.setLastOffset(0)
.setDeliveryCount((short) 1)));
- ShareFetchResponseData.ShareFetchableTopicResponse response = new
ShareFetchResponseData.ShareFetchableTopicResponse()
- .setTopicId(Uuid.randomUuid())
- .setPartitions(singletonList(partition));
-
- data.setResponses(singletonList(response));
- data.setThrottleTimeMs(345);
- data.setErrorCode(Errors.NONE.code());
- return new ShareFetchResponse(data);
+ TopicIdPartition topicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("",
partition.partitionIndex()));
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
topicIdPartitionToPartition = new LinkedHashMap<>();
+ topicIdPartitionToPartition.put(topicIdPartition, partition);
+ return ShareFetchResponse.of(Errors.NONE, 345,
topicIdPartitionToPartition, List.of(), 0);
}
private ShareAcknowledgeRequest createShareAcknowledgeRequest(short
version) {
@@ -2103,7 +2149,7 @@ public class RequestResponseTest {
response.setTopicId(Uuid.randomUuid());
}
data.setResponses(singletonList(response));
- return new FetchResponse(data);
+ return FetchResponse.of(data);
}
private HeartbeatRequest createHeartBeatRequest(short version) {
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index da9a1b86322..b294ddd3108 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -187,7 +187,7 @@ class ControllerApis(
def handleFetch(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
- handleRaftRequest(request, response => new
FetchResponse(response.asInstanceOf[FetchResponseData]))
+ handleRaftRequest(request, response =>
FetchResponse.of(response.asInstanceOf[FetchResponseData]))
}
def handleFetchSnapshot(request: RequestChannel.Request):
CompletableFuture[Unit] = {
diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
index 95df38c4e14..733e8228b7c 100644
--- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
@@ -81,7 +81,7 @@ class TestRaftRequestHandler(
}
private def handleFetch(request: RequestChannel.Request): Unit = {
- handle(request, response => new
FetchResponse(response.asInstanceOf[FetchResponseData]))
+ handle(request, response =>
FetchResponse.of(response.asInstanceOf[FetchResponseData]))
}
private def handleFetchSnapshot(request: RequestChannel.Request): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b4768a90317..2283ca36582 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -5733,7 +5733,7 @@ class KafkaApisTest extends Logging {
0,
Errors.TOPIC_AUTHORIZATION_FAILED.code,
Errors.NONE.code,
- null,
+ MemoryRecords.EMPTY,
Collections.emptyList[AcquiredRecords](),
partitionData1
)
@@ -5772,7 +5772,7 @@ class KafkaApisTest extends Logging {
0,
Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
Errors.NONE.code,
- null,
+ MemoryRecords.EMPTY,
Collections.emptyList[AcquiredRecords](),
partitionData4
)
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
index ed68202abe9..adbb8a5b0d0 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
@@ -115,7 +115,7 @@ public class FetchResponseBenchmark {
@Benchmark
public int testPartitionMapFromData() {
- return new FetchResponse(fetchResponseData).responseData(topicNames,
ApiKeys.FETCH.latestVersion()).size();
+ return FetchResponse.of(fetchResponseData).responseData(topicNames,
ApiKeys.FETCH.latestVersion()).size();
}
@Benchmark
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
index bdae77bdd18..f68b4b6f061 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
@@ -377,7 +377,7 @@ public class KafkaNetworkChannelTest {
} else if (responseData instanceof EndQuorumEpochResponseData) {
return new EndQuorumEpochResponse((EndQuorumEpochResponseData)
responseData);
} else if (responseData instanceof FetchResponseData) {
- return new FetchResponse((FetchResponseData) responseData);
+ return FetchResponse.of((FetchResponseData) responseData);
} else if (responseData instanceof FetchSnapshotResponseData) {
return new FetchSnapshotResponse((FetchSnapshotResponseData)
responseData);
} else if (responseData instanceof UpdateRaftVoterResponseData) {
diff --git
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
index 9abc5591ddd..c42ed8d5364 100644
---
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
+++
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
@@ -54,8 +54,7 @@ public class FinalContext extends ShareFetchContext {
public ShareFetchResponse updateAndGenerateResponseData(String groupId,
Uuid memberId,
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
log.debug("Final context returning {}",
partitionsToLogString(updates.keySet()));
- return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, 0,
- updates.entrySet().iterator(), List.of(), 0));
+ return ShareFetchResponse.of(Errors.NONE, 0, updates, List.of(), 0);
}
@Override
diff --git
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
index 36bf67ad828..6de1f57f190 100644
---
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
+++
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
@@ -25,7 +25,6 @@ import
org.apache.kafka.server.share.ErroneousAndValidPartitionData;
import org.apache.kafka.server.share.session.ShareSession;
import java.util.Collection;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@@ -50,8 +49,8 @@ public abstract class ShareFetchContext {
* @return - An empty throttled response.
*/
public ShareFetchResponse throttleResponse(int throttleTimeMs) {
- return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
- Collections.emptyIterator(), List.of(), 0));
+ return ShareFetchResponse.of(Errors.NONE, throttleTimeMs,
+ new LinkedHashMap<>(), List.of(), 0);
}
/**
diff --git
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
index c3d177c8808..1c2242d54f3 100644
---
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
+++
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
@@ -102,8 +102,7 @@ public class ShareSessionContext extends ShareFetchContext {
@Override
public ShareFetchResponse throttleResponse(int throttleTimeMs) {
if (!isSubsequent) {
- return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
- Collections.emptyIterator(), List.of(), 0));
+ return ShareFetchResponse.of(Errors.NONE, throttleTimeMs, new
LinkedHashMap<>(), List.of(), 0);
}
int expectedEpoch =
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
int sessionEpoch;
@@ -113,11 +112,9 @@ public class ShareSessionContext extends ShareFetchContext
{
if (sessionEpoch != expectedEpoch) {
log.debug("Subsequent share session {} expected epoch {}, but got
{}. " +
"Possible duplicate request.", session.key(),
expectedEpoch, sessionEpoch);
- return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
- throttleTimeMs, Collections.emptyIterator(), List.of(),
0));
+ return ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH,
throttleTimeMs, new LinkedHashMap<>(), List.of(), 0);
}
- return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
- Collections.emptyIterator(), List.of(), 0));
+ return ShareFetchResponse.of(Errors.NONE, throttleTimeMs, new
LinkedHashMap<>(), List.of(), 0);
}
/**
@@ -195,8 +192,7 @@ public class ShareSessionContext extends ShareFetchContext {
public ShareFetchResponse updateAndGenerateResponseData(String groupId,
Uuid memberId,
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
if (!isSubsequent) {
- return new ShareFetchResponse(ShareFetchResponse.toMessage(
- Errors.NONE, 0, updates.entrySet().iterator(), List.of(),
0));
+ return ShareFetchResponse.of(Errors.NONE, 0, updates, List.of(),
0);
} else {
int expectedEpoch =
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
int sessionEpoch;
@@ -206,8 +202,7 @@ public class ShareSessionContext extends ShareFetchContext {
if (sessionEpoch != expectedEpoch) {
log.debug("Subsequent share session {} expected epoch {}, but
got {}. Possible duplicate request.",
session.key(), expectedEpoch, sessionEpoch);
- return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
- 0, Collections.emptyIterator(), List.of(), 0));
+ return
ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new
LinkedHashMap<>(), List.of(), 0);
}
// Iterate over the update list using PartitionIterator. This will
prune updates which don't need to be sent
Iterator<Map.Entry<TopicIdPartition,
ShareFetchResponseData.PartitionData>> partitionIterator = new
PartitionIterator(
@@ -217,8 +212,7 @@ public class ShareSessionContext extends ShareFetchContext {
}
log.debug("Subsequent share session context with session key {}
returning {}", session.key(),
partitionsToLogString(updates.keySet()));
- return new ShareFetchResponse(ShareFetchResponse.toMessage(
- Errors.NONE, 0, updates.entrySet().iterator(), List.of(),
0));
+ return ShareFetchResponse.of(Errors.NONE, 0, updates, List.of(),
0);
}
}