ijuma commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r581069071



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -57,238 +55,51 @@
  *     the fetch offset after the index lookup
  * - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors
  */
-public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
-
-    public static final long INVALID_HIGHWATERMARK = -1L;
+public class FetchResponse extends AbstractResponse {
+    public static final long INVALID_HIGH_WATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
     public static final int INVALID_PREFERRED_REPLICA_ID = -1;
 
     private final FetchResponseData data;
-    private final LinkedHashMap<TopicPartition, PartitionData<T>> 
responseDataMap;
+    private final LinkedHashMap<TopicPartition, 
FetchResponseData.FetchablePartitionResponse> responseData;
 
     @Override
     public FetchResponseData data() {
         return data;
     }
 
-    public static final class AbortedTransaction {
-        public final long producerId;
-        public final long firstOffset;
-
-        public AbortedTransaction(long producerId, long firstOffset) {
-            this.producerId = producerId;
-            this.firstOffset = firstOffset;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            AbortedTransaction that = (AbortedTransaction) o;
-
-            return producerId == that.producerId && firstOffset == 
that.firstOffset;
-        }
-
-        @Override
-        public int hashCode() {
-            int result = Long.hashCode(producerId);
-            result = 31 * result + Long.hashCode(firstOffset);
-            return result;
-        }
-
-        @Override
-        public String toString() {
-            return "(producerId=" + producerId + ", firstOffset=" + 
firstOffset + ")";
-        }
-
-        static AbortedTransaction 
fromMessage(FetchResponseData.AbortedTransaction abortedTransaction) {
-            return new AbortedTransaction(abortedTransaction.producerId(), 
abortedTransaction.firstOffset());
-        }
-    }
-
-    public static final class PartitionData<T extends BaseRecords> {
-        private final FetchResponseData.FetchablePartitionResponse 
partitionResponse;
-
-        // Derived fields
-        private final Optional<Integer> preferredReplica;
-        private final List<AbortedTransaction> abortedTransactions;
-        private final Errors error;
-
-        private PartitionData(FetchResponseData.FetchablePartitionResponse 
partitionResponse) {
-            // We partially construct FetchablePartitionResponse since we 
don't know the partition ID at this point
-            // When we convert the PartitionData (and other fields) into 
FetchResponseData down in toMessage, we
-            // set the partition IDs.
-            this.partitionResponse = partitionResponse;
-            this.preferredReplica = 
Optional.of(partitionResponse.preferredReadReplica())
-                .filter(replicaId -> replicaId != 
INVALID_PREFERRED_REPLICA_ID);
-
-            if (partitionResponse.abortedTransactions() == null) {
-                this.abortedTransactions = null;
-            } else {
-                this.abortedTransactions = 
partitionResponse.abortedTransactions().stream()
-                    .map(AbortedTransaction::fromMessage)
-                    .collect(Collectors.toList());
-            }
-
-            this.error = Errors.forCode(partitionResponse.errorCode());
-        }
-
-        public PartitionData(Errors error,
-                             long highWatermark,
-                             long lastStableOffset,
-                             long logStartOffset,
-                             Optional<Integer> preferredReadReplica,
-                             List<AbortedTransaction> abortedTransactions,
-                             Optional<FetchResponseData.EpochEndOffset> 
divergingEpoch,
-                             T records) {
-            this.preferredReplica = preferredReadReplica;
-            this.abortedTransactions = abortedTransactions;
-            this.error = error;
-
-            FetchResponseData.FetchablePartitionResponse partitionResponse =
-                new FetchResponseData.FetchablePartitionResponse();
-            partitionResponse.setErrorCode(error.code())
-                .setHighWatermark(highWatermark)
-                .setLastStableOffset(lastStableOffset)
-                .setLogStartOffset(logStartOffset);
-            if (abortedTransactions != null) {
-                
partitionResponse.setAbortedTransactions(abortedTransactions.stream().map(
-                    aborted -> new FetchResponseData.AbortedTransaction()
-                        .setProducerId(aborted.producerId)
-                        .setFirstOffset(aborted.firstOffset))
-                    .collect(Collectors.toList()));
-            } else {
-                partitionResponse.setAbortedTransactions(null);
-            }
-            
partitionResponse.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID));
-            partitionResponse.setRecordSet(records);
-            divergingEpoch.ifPresent(partitionResponse::setDivergingEpoch);
-
-            this.partitionResponse = partitionResponse;
-        }
-
-        public PartitionData(Errors error,
-                             long highWatermark,
-                             long lastStableOffset,
-                             long logStartOffset,
-                             Optional<Integer> preferredReadReplica,
-                             List<AbortedTransaction> abortedTransactions,
-                             T records) {
-            this(error, highWatermark, lastStableOffset, logStartOffset, 
preferredReadReplica,
-                abortedTransactions, Optional.empty(), records);
-        }
-
-        public PartitionData(Errors error,
-                             long highWatermark,
-                             long lastStableOffset,
-                             long logStartOffset,
-                             List<AbortedTransaction> abortedTransactions,
-                             T records) {
-            this(error, highWatermark, lastStableOffset, logStartOffset, 
Optional.empty(), abortedTransactions, records);
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            PartitionData that = (PartitionData) o;
-
-            return this.partitionResponse.equals(that.partitionResponse);
-        }
-
-        @Override
-        public int hashCode() {
-            return this.partitionResponse.hashCode();
-        }
-
-        @Override
-        public String toString() {
-            return "(error=" + error() +
-                    ", highWaterMark=" + highWatermark() +
-                    ", lastStableOffset = " + lastStableOffset() +
-                    ", logStartOffset = " + logStartOffset() +
-                    ", preferredReadReplica = " + 
preferredReadReplica().map(Object::toString).orElse("absent") +
-                    ", abortedTransactions = " + abortedTransactions() +
-                    ", divergingEpoch =" + divergingEpoch() +
-                    ", recordsSizeInBytes=" + records().sizeInBytes() + ")";
-        }
-
-        public Errors error() {
-            return error;
-        }
-
-        public long highWatermark() {
-            return partitionResponse.highWatermark();
-        }
-
-        public long lastStableOffset() {
-            return partitionResponse.lastStableOffset();
-        }
-
-        public long logStartOffset() {
-            return partitionResponse.logStartOffset();
-        }
-
-        public Optional<Integer> preferredReadReplica() {
-            return preferredReplica;
-        }
-
-        public List<AbortedTransaction> abortedTransactions() {
-            return abortedTransactions;
-        }
-
-        public Optional<FetchResponseData.EpochEndOffset> divergingEpoch() {
-            FetchResponseData.EpochEndOffset epochEndOffset = 
partitionResponse.divergingEpoch();
-            if (epochEndOffset.epoch() < 0) {
-                return Optional.empty();
-            } else {
-                return Optional.of(epochEndOffset);
-            }
-        }
-
-        @SuppressWarnings("unchecked")
-        public T records() {
-            return (T) partitionResponse.recordSet();
-        }
-    }
-
-    /**
-     * From version 3 or later, the entries in `responseData` should be in the 
same order as the entries in
-     * `FetchRequest.fetchData`.
-     *
-     * @param error             The top-level error code.
-     * @param responseData      The fetched data grouped by partition.
-     * @param throttleTimeMs    The time in milliseconds that the response was 
throttled
-     * @param sessionId         The fetch session id.
-     */
     public FetchResponse(Errors error,
-                         LinkedHashMap<TopicPartition, PartitionData<T>> 
responseData,
                          int throttleTimeMs,
-                         int sessionId) {
-        super(ApiKeys.FETCH);
-        this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
-        this.responseDataMap = responseData;
+                         int sessionId,
+                         LinkedHashMap<TopicPartition, 
FetchResponseData.FetchablePartitionResponse> responseData) {
+        this(new FetchResponseData()
+            .setSessionId(sessionId)
+            .setErrorCode(error.code())
+            .setThrottleTimeMs(throttleTimeMs)
+            .setResponses(responseData.entrySet().stream().map(entry -> new 
FetchResponseData.FetchableTopicResponse()
+                .setTopic(entry.getKey().topic())
+                
.setPartitionResponses(Collections.singletonList(entry.getValue().setPartition(entry.getKey().partition()))))
+                .collect(Collectors.toList())));
     }
 
     public FetchResponse(FetchResponseData fetchResponseData) {
         super(ApiKeys.FETCH);
         this.data = fetchResponseData;
-        this.responseDataMap = toResponseDataMap(fetchResponseData);
+        this.responseData = new LinkedHashMap<>();

Review comment:
       This isn't needed when we return a fetch from the broker, right? If this 
is true, can we remove it from the fetch response and build it on the client 
when needed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to