chia7712 commented on code in PR #19167:
URL: https://github.com/apache/kafka/pull/19167#discussion_r2024083530
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java:
##########
@@ -392,13 +393,9 @@ private ShareFetchResponse
shareFetchResponse(TopicIdPartition tip, int count) {
.setPartitionIndex(tip.partition())
.setRecords(records)
.setAcquiredRecords(List.of(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(count
- 1).setDeliveryCount((short) 1)));
- ShareFetchResponseData.ShareFetchableTopicResponse topicResponse = new
ShareFetchResponseData.ShareFetchableTopicResponse()
- .setTopicId(tip.topicId())
- .setPartitions(List.of(partData));
- return new ShareFetchResponse(
- new ShareFetchResponseData()
- .setResponses(List.of(topicResponse))
- );
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
topicIdPartitionToPartition = new LinkedHashMap<>();
+ topicIdPartitionToPartition.put(tip, partData);
+ return ShareFetchResponse.of(Errors.NONE, 0,
topicIdPartitionToPartition, List.of(), 0);
Review Comment:
```java
return ShareFetchResponse.of(Errors.NONE, 0, new LinkedHashMap<>(Map.of(tip,
partData)), List.of(), 0);
```
##########
clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java:
##########
@@ -172,6 +186,11 @@ public static ShareFetchResponseData toMessage(Errors
error, int throttleTimeMs,
ShareFetchResponseData.PartitionData partitionData =
entry.getValue();
// Since PartitionData alone doesn't know the partition ID, we set
it here
partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
+ // To protect the clients from failing due to null records,
Review Comment:
Could you please change this method `toMessage` to private?
##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -360,7 +360,7 @@ public AbstractResponse getErrorResponse(int
throttleTimeMs, Throwable e) {
.setPartitions(partitionResponses));
});
}
- return new FetchResponse(new FetchResponseData()
+ return FetchResponse.of(new FetchResponseData()
Review Comment:
For versions prior to 13, the data is iterated twice. To prevent this
redundancy, consider making the `FetchResponse` constructor package-private to
avoid the double loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]