chia7712 commented on a change in pull request #9758: URL: https://github.com/apache/kafka/pull/9758#discussion_r585674420
########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -365,17 +126,98 @@ public int sessionId() { * @param partIterator The partition iterator. * @return The response size in bytes. */ - public static <T extends BaseRecords> int sizeOf(short version, - Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) { + public static int sizeOf(short version, + Iterator<Map.Entry<TopicPartition, FetchResponseData.PartitionData>> partIterator) { // Since the throttleTimeMs and metadata field sizes are constant and fixed, we can // use arbitrary values here without affecting the result. - FetchResponseData data = toMessage(0, Errors.NONE, partIterator, INVALID_SESSION_ID); + LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> data = new LinkedHashMap<>(); + partIterator.forEachRemaining(entry -> data.put(entry.getKey(), entry.getValue())); ObjectSerializationCache cache = new ObjectSerializationCache(); - return 4 + data.size(cache, version); + return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, data).data.size(cache, version); } @Override public boolean shouldClientThrottle(short version) { return version >= 8; } -} + + public static Optional<FetchResponseData.EpochEndOffset> divergingEpoch(FetchResponseData.PartitionData partitionResponse) { + return partitionResponse.divergingEpoch().epoch() < 0 ? Optional.empty() + : Optional.of(partitionResponse.divergingEpoch()); + } + + public static boolean isDivergingEpoch(FetchResponseData.PartitionData partitionResponse) { + return partitionResponse.divergingEpoch().epoch() >= 0; + } + + public static Optional<Integer> preferredReadReplica(FetchResponseData.PartitionData partitionResponse) { + return partitionResponse.preferredReadReplica() == INVALID_PREFERRED_REPLICA_ID ? Optional.empty() + : Optional.of(partitionResponse.preferredReadReplica()); + } + + public static boolean isPreferredReplica(FetchResponseData.PartitionData partitionResponse) { + return partitionResponse.preferredReadReplica() != INVALID_PREFERRED_REPLICA_ID; + } + + public static FetchResponseData.PartitionData partitionResponse(int partition, Errors error) { + return new FetchResponseData.PartitionData() + .setPartitionIndex(partition) + .setErrorCode(error.code()) + .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK); + } + + /** + * Returns `partition.records` as `Records` (instead of `BaseRecords`). If `records` is `null`, returns `MemoryRecords.EMPTY`. + * + * If this response was deserialized after a fetch, this method should never fail. An example where this would + * fail is a down-converted response (e.g. LazyDownConversionRecords) on the broker (before it's serialized and + * sent on the wire). + * + * @param partition partition data + * @return Records or empty record if the records in PartitionData is null. + */ + public static Records records(FetchResponseData.PartitionData partition) { + if (partition.records() == null) return MemoryRecords.EMPTY; + else if (partition.records() instanceof Records) return (Records) partition.records(); + else throw new IllegalStateException("the record type is " + partition.records().getClass().getSimpleName() + Review comment: Will address those nice comments ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org