chia7712 commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r551441782



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -59,136 +60,140 @@
  */
 public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
 
-    public static final long INVALID_HIGHWATERMARK = -1L;
+    public static FetchResponseData.FetchablePartitionResponse 
partitionResponse(Errors error) {
+        return new FetchResponseData.FetchablePartitionResponse()
+                .setErrorCode(error.code())
+                .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+                .setAbortedTransactions(null)
+                .setRecordSet(MemoryRecords.EMPTY)
+                
.setPreferredReadReplica(FetchResponse.INVALID_PREFERRED_REPLICA_ID);
+    }
+
+    public static final long INVALID_HIGH_WATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
     public static final int INVALID_PREFERRED_REPLICA_ID = -1;
 
     private final FetchResponseData data;
-    private final LinkedHashMap<TopicPartition, PartitionData<T>> 
responseDataMap;
+    private final LinkedHashMap<TopicPartition, 
FetchResponseData.FetchablePartitionResponse> partitionData;
+    // lazily instantiate this field
+    private LinkedHashMap<TopicPartition, PartitionData<T>> responseDataMap = 
null;

Review comment:
       ```partitionData``` can be removed but ```responseDataMap``` is still a 
response type and it is used by production code. I did not refactor that to 
avoid big patch.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -59,136 +60,140 @@
  */
 public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
 
-    public static final long INVALID_HIGHWATERMARK = -1L;
+    public static FetchResponseData.FetchablePartitionResponse 
partitionResponse(Errors error) {
+        return new FetchResponseData.FetchablePartitionResponse()
+                .setErrorCode(error.code())
+                .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+                .setAbortedTransactions(null)
+                .setRecordSet(MemoryRecords.EMPTY)
+                
.setPreferredReadReplica(FetchResponse.INVALID_PREFERRED_REPLICA_ID);
+    }
+
+    public static final long INVALID_HIGH_WATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
     public static final int INVALID_PREFERRED_REPLICA_ID = -1;
 
     private final FetchResponseData data;
-    private final LinkedHashMap<TopicPartition, PartitionData<T>> 
responseDataMap;
+    private final LinkedHashMap<TopicPartition, 
FetchResponseData.FetchablePartitionResponse> partitionData;
+    // lazily instantiate this field
+    private LinkedHashMap<TopicPartition, PartitionData<T>> responseDataMap = 
null;
 
     @Override
     public FetchResponseData data() {
         return data;
     }
 
-    public static final class AbortedTransaction {
-        public final long producerId;
-        public final long firstOffset;
+    /**
+     * From version 3 or later, the entries in `responseData` should be in the 
same order as the entries in
+     * `FetchRequest.fetchData`.
+     *
+     * @param error             The top-level error code.
+     * @param responseData      The fetched data grouped by partition.
+     * @param throttleTimeMs    The time in milliseconds that the response was 
throttled
+     * @param sessionId         The fetch session id.
+     */
+    public FetchResponse(Errors error,
+                         LinkedHashMap<TopicPartition, PartitionData<T>> 
responseData,
+                         int throttleTimeMs,
+                         int sessionId) {
+        this(error, throttleTimeMs, sessionId, 
responseData.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+            entry -> entry.getValue().partitionResponse, (o1, o2) -> {
+                throw new RuntimeException("this is impossible");
+            }, LinkedHashMap::new)));
+    }
 
-        public AbortedTransaction(long producerId, long firstOffset) {
-            this.producerId = producerId;
-            this.firstOffset = firstOffset;
-        }
+    public FetchResponse(Errors error,
+                         int throttleTimeMs,
+                         int sessionId,
+                         LinkedHashMap<TopicPartition, 
FetchResponseData.FetchablePartitionResponse> responseData) {
+        super(ApiKeys.FETCH);
+        this.data = new FetchResponseData()
+                .setSessionId(sessionId)
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(throttleTimeMs);
+        responseData.forEach((tp, tpData) -> data.responses().add(new 
FetchResponseData.FetchableTopicResponse()
+            .setTopic(tp.topic())
+            
.setPartitionResponses(Collections.singletonList(tpData.setPartition(tp.partition())))));
+        this.partitionData = responseData;
+    }
 
-        @Override
-        public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        super(ApiKeys.FETCH);
+        this.data = fetchResponseData;
+        this.partitionData = new LinkedHashMap<>();
+        fetchResponseData.responses().forEach(topicResponse ->
+            topicResponse.partitionResponses().forEach(partitionResponse ->
+                partitionData.put(new TopicPartition(topicResponse.topic(), 
partitionResponse.partition()), partitionResponse))
+        );
+    }
 
-            AbortedTransaction that = (AbortedTransaction) o;
+    public Errors error() {
+        return Errors.forCode(data.errorCode());
+    }
 
-            return producerId == that.producerId && firstOffset == 
that.firstOffset;
+    public LinkedHashMap<TopicPartition, PartitionData<T>> responseData() {
+        if (responseDataMap == null) {
+            responseDataMap = new LinkedHashMap<>(partitionData.size());
+            partitionData.forEach((tp, d) -> responseDataMap.put(tp, new 
PartitionData<>(d)));
         }
+        return responseDataMap;
+    }
 
-        @Override
-        public int hashCode() {
-            int result = Long.hashCode(producerId);
-            result = 31 * result + Long.hashCode(firstOffset);
-            return result;
-        }
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
 
-        @Override
-        public String toString() {
-            return "(producerId=" + producerId + ", firstOffset=" + 
firstOffset + ")";
-        }
+    public int sessionId() {
+        return data.sessionId();
+    }
 
-        static AbortedTransaction 
fromMessage(FetchResponseData.AbortedTransaction abortedTransaction) {
-            return new AbortedTransaction(abortedTransaction.producerId(), 
abortedTransaction.firstOffset());
-        }
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        updateErrorCounts(errorCounts, error());
+        partitionData.values().forEach(response ->
+            updateErrorCounts(errorCounts, 
Errors.forCode(response.errorCode()))
+        );
+        return errorCounts;
     }
 
-    public static final class PartitionData<T extends BaseRecords> {
-        private final FetchResponseData.FetchablePartitionResponse 
partitionResponse;
+    public static FetchResponse<MemoryRecords> parse(ByteBuffer buffer, short 
version) {
+        return new FetchResponse<>(new FetchResponseData(new 
ByteBufferAccessor(buffer), version));
+    }
 
-        // Derived fields
-        private final Optional<Integer> preferredReplica;
-        private final List<AbortedTransaction> abortedTransactions;
-        private final Errors error;
+    /**
+     * Convenience method to find the size of a response.
+     *
+     * @param version       The version of the response to use.
+     * @param partIterator  The partition iterator.
+     * @return              The response size in bytes.
+     */
+    public static <T extends Records> int sizeOf(short version,
+                                                     
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
+        // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
+        // use arbitrary values here without affecting the result.
+        LinkedHashMap<TopicPartition, PartitionData<T>> data = new 
LinkedHashMap<>();
+        partIterator.forEachRemaining(entry -> data.put(entry.getKey(), 
entry.getValue()));
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        return 4 + new FetchResponse<>(Errors.NONE, data, 0, 
INVALID_SESSION_ID).data.size(cache, version);
+    }
 
-        private PartitionData(FetchResponseData.FetchablePartitionResponse 
partitionResponse) {
-            // We partially construct FetchablePartitionResponse since we 
don't know the partition ID at this point
-            // When we convert the PartitionData (and other fields) into 
FetchResponseData down in toMessage, we
-            // set the partition IDs.
-            this.partitionResponse = partitionResponse;
-            this.preferredReplica = 
Optional.of(partitionResponse.preferredReadReplica())
-                .filter(replicaId -> replicaId != 
INVALID_PREFERRED_REPLICA_ID);
-
-            if (partitionResponse.abortedTransactions() == null) {
-                this.abortedTransactions = null;
-            } else {
-                this.abortedTransactions = 
partitionResponse.abortedTransactions().stream()
-                    .map(AbortedTransaction::fromMessage)
-                    .collect(Collectors.toList());
-            }
-
-            this.error = Errors.forCode(partitionResponse.errorCode());
-        }
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 8;
+    }
 
-        public PartitionData(Errors error,
-                             long highWatermark,
-                             long lastStableOffset,
-                             long logStartOffset,
-                             Optional<Integer> preferredReadReplica,
-                             List<AbortedTransaction> abortedTransactions,
-                             Optional<FetchResponseData.EpochEndOffset> 
divergingEpoch,
-                             T records) {
-            this.preferredReplica = preferredReadReplica;
-            this.abortedTransactions = abortedTransactions;
-            this.error = error;
-
-            FetchResponseData.FetchablePartitionResponse partitionResponse =
-                new FetchResponseData.FetchablePartitionResponse();
-            partitionResponse.setErrorCode(error.code())
-                .setHighWatermark(highWatermark)
-                .setLastStableOffset(lastStableOffset)
-                .setLogStartOffset(logStartOffset);
-            if (abortedTransactions != null) {
-                
partitionResponse.setAbortedTransactions(abortedTransactions.stream().map(
-                    aborted -> new FetchResponseData.AbortedTransaction()
-                        .setProducerId(aborted.producerId)
-                        .setFirstOffset(aborted.firstOffset))
-                    .collect(Collectors.toList()));
-            } else {
-                partitionResponse.setAbortedTransactions(null);
-            }
-            
partitionResponse.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID));
-            partitionResponse.setRecordSet(records);
-            divergingEpoch.ifPresent(partitionResponse::setDivergingEpoch);
 
-            this.partitionResponse = partitionResponse;
-        }
+    public static final class PartitionData<T extends BaseRecords> {

Review comment:
       > What does it add over FetchResponseData.FetchablePartitionRespons?
   
   It offers methods to transfer "non-defined value" to ```Optional``` type.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to