ijuma commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r584172530



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -365,17 +123,70 @@ public int sessionId() {
      * @param partIterator  The partition iterator.
      * @return              The response size in bytes.
      */
-    public static <T extends BaseRecords> int sizeOf(short version,
-                                                     
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
+    public static int sizeOf(short version,
+                             Iterator<Map.Entry<TopicPartition, 
FetchResponseData.PartitionData>> partIterator) {
         // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
         // use arbitrary values here without affecting the result.
-        FetchResponseData data = toMessage(0, Errors.NONE, partIterator, 
INVALID_SESSION_ID);
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> data = 
new LinkedHashMap<>();
+        partIterator.forEachRemaining(entry -> data.put(entry.getKey(), 
entry.getValue()));
         ObjectSerializationCache cache = new ObjectSerializationCache();
-        return 4 + data.size(cache, version);
+        return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
data).data.size(cache, version);
     }
 
     @Override
     public boolean shouldClientThrottle(short version) {
         return version >= 8;
     }
-}
+
+    public static Optional<FetchResponseData.EpochEndOffset> 
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() < 0 ? 
Optional.empty()
+                : Optional.of(partitionResponse.divergingEpoch());
+    }
+
+    public static boolean isDivergingEpoch(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() >= 0;
+    }
+
+    public static Optional<Integer> 
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.preferredReadReplica() == 
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+                : Optional.of(partitionResponse.preferredReadReplica());
+    }
+
+    public static boolean isPreferredReplica(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.preferredReadReplica() != 
INVALID_PREFERRED_REPLICA_ID;
+    }
+
+    public static FetchResponseData.PartitionData partitionResponse(int 
partition, Errors error) {
+        return new FetchResponseData.PartitionData()
+                .setPartitionIndex(partition)
+                .setErrorCode(error.code())
+                .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+                .setAbortedTransactions(null)
+                .setRecords(MemoryRecords.EMPTY);
+    }
+
+    /**
+     * cast the BaseRecords of PartitionData to Records. This is used to 
eliminate duplicate code of type casting.

Review comment:
       Does this ever fail? If so, it would be good to explain under which 
conditions it can fail. Also "This is used to eliminate duplicate code of type 
casting." seems a bit redundant.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -304,58 +108,12 @@ public int sessionId() {
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
         updateErrorCounts(errorCounts, error());
-        responseDataMap.values().forEach(response ->
-            updateErrorCounts(errorCounts, response.error())
-        );
+        responseData().values().forEach(response -> 
updateErrorCounts(errorCounts, Errors.forCode(response.errorCode())));

Review comment:
       Can we update this not to use `responseData`? Then we at least have the 
right behavior for the broker and we can fix the clients in the subsequent PR.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2377,14 +2378,19 @@ private ListOffsetsResponse 
listOffsetsResponse(Map<TopicPartition, Long> partit
                     builder.append(0L, ("key-" + i).getBytes(), ("value-" + 
i).getBytes());
                 records = builder.build();
             }
-            tpResponses.put(partition, new FetchResponse.PartitionData<>(
-                    Errors.NONE, highWatermark, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                    logStartOffset, null, records));
+            tpResponses.put(partition,
+                    new FetchResponseData.PartitionData()
+                            .setErrorCode(Errors.NONE.code())
+                            .setHighWatermark(highWatermark)
+                            
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                            .setLogStartOffset(logStartOffset)
+                            .setAbortedTransactions(null)
+                            .setRecords(records));

Review comment:
       Are some of these redundant? (eg `setAbortedTransactions(null)`)

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -365,17 +123,70 @@ public int sessionId() {
      * @param partIterator  The partition iterator.
      * @return              The response size in bytes.
      */
-    public static <T extends BaseRecords> int sizeOf(short version,
-                                                     
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
+    public static int sizeOf(short version,
+                             Iterator<Map.Entry<TopicPartition, 
FetchResponseData.PartitionData>> partIterator) {
         // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
         // use arbitrary values here without affecting the result.
-        FetchResponseData data = toMessage(0, Errors.NONE, partIterator, 
INVALID_SESSION_ID);
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> data = 
new LinkedHashMap<>();
+        partIterator.forEachRemaining(entry -> data.put(entry.getKey(), 
entry.getValue()));
         ObjectSerializationCache cache = new ObjectSerializationCache();
-        return 4 + data.size(cache, version);
+        return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
data).data.size(cache, version);
     }
 
     @Override
     public boolean shouldClientThrottle(short version) {
         return version >= 8;
     }
-}
+
+    public static Optional<FetchResponseData.EpochEndOffset> 
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() < 0 ? 
Optional.empty()
+                : Optional.of(partitionResponse.divergingEpoch());
+    }
+
+    public static boolean isDivergingEpoch(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() >= 0;
+    }
+
+    public static Optional<Integer> 
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.preferredReadReplica() == 
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+                : Optional.of(partitionResponse.preferredReadReplica());
+    }
+
+    public static boolean isPreferredReplica(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.preferredReadReplica() != 
INVALID_PREFERRED_REPLICA_ID;
+    }
+
+    public static FetchResponseData.PartitionData partitionResponse(int 
partition, Errors error) {
+        return new FetchResponseData.PartitionData()
+                .setPartitionIndex(partition)
+                .setErrorCode(error.code())
+                .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+                .setAbortedTransactions(null)
+                .setRecords(MemoryRecords.EMPTY);

Review comment:
       Aren't many of these set automatically by the generated classes?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -1270,13 +1271,24 @@ public void testFetchPositionAfterException() {
 
         assertEquals(1, fetcher.sendFetches());
 
-        Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> 
partitions = new LinkedHashMap<>();
-        partitions.put(tp1, new FetchResponse.PartitionData<>(Errors.NONE, 100,
-            FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, null, records));
-        partitions.put(tp0, new 
FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100,
-            FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
-        client.prepareResponse(new FetchResponse<>(Errors.NONE, new 
LinkedHashMap<>(partitions),
-            0, INVALID_SESSION_ID));
+
+        Map<TopicPartition, FetchResponseData.PartitionData> partitions = new 
LinkedHashMap<>();
+        partitions.put(tp1, new FetchResponseData.PartitionData()
+                .setErrorCode(Errors.NONE.code())
+                .setHighWatermark(100)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+                .setAbortedTransactions(null)
+                .setRecords(records));
+        partitions.put(tp0, new FetchResponseData.PartitionData()
+                .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code())
+                .setHighWatermark(100)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+                .setAbortedTransactions(null)
+                .setRecords(MemoryRecords.EMPTY));

Review comment:
       Are some of these redundant? (eg `setAbortedTransactions(null)`). Other 
examples in the same file.

##########
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
##########
@@ -174,8 +173,14 @@ public int sizeInBytes() {
                     return null;
                 }
             };
-            initialFetched.put(tp, new 
FetchResponse.PartitionData<>(Errors.NONE, 0, 0, 0,
-                    new LinkedList<>(), fetched));
+            initialFetched.put(tp, new FetchResponseData.PartitionData()
+                    .setPartitionIndex(tp.partition())
+                    .setErrorCode(Errors.NONE.code())
+                    .setHighWatermark(0)
+                    .setLastStableOffset(0)
+                    .setLogStartOffset(0)

Review comment:
       We can remove some redundant setters?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
##########
@@ -963,9 +966,13 @@ class ReplicaFetcherThreadTest {
 
     val records = MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8)))
-
-    val partitionData: thread.FetchData = new 
FetchResponse.PartitionData[Records](
-      Errors.NONE, 0, 0, 0, Optional.empty(), Collections.emptyList(), records)
+    val partitionData: thread.FetchData = new FetchResponseData.PartitionData()
+        .setErrorCode(Errors.NONE.code)
+        .setHighWatermark(0)
+        .setLastStableOffset(0)
+        .setLogStartOffset(0)

Review comment:
       We can remove some redundant setters?

##########
File path: core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
##########
@@ -403,25 +401,24 @@ private class ReplicaFetcher(name: String, sourceBroker: 
Node, topicPartitions:
 
     debug("Issuing fetch request ")
 
-    var fetchResponse: FetchResponse[MemoryRecords] = null
+    var fetchResponse: FetchResponse = null
     try {
       val clientResponse = fetchEndpoint.sendRequest(fetchRequestBuilder)
-      fetchResponse = 
clientResponse.responseBody.asInstanceOf[FetchResponse[MemoryRecords]]
+      fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
     } catch {
       case t: Throwable =>
         if (!isRunning)
           throw t
     }
 
     if (fetchResponse != null) {
-      fetchResponse.responseData.forEach { (tp, partitionData) =>
-        replicaBuffer.addFetchedData(tp, sourceBroker.id, partitionData)
-      }
+      fetchResponse.data.responses().forEach(topicResponse =>
+        topicResponse.partitions().forEach(partitionResponse =>

Review comment:
       Nit: remove `()` twice.

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -155,13 +155,28 @@ class FetchSessionTest {
     assertEquals(Optional.of(1), epochs1(tp1))
     assertEquals(Optional.of(2), epochs1(tp2))
 
-    val response = new util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData[Records]]
-    response.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100, 100,
-      100, null, null))
-    response.put(tp1, new FetchResponse.PartitionData(
-      Errors.NONE, 10, 10, 10, null, null))
-    response.put(tp2, new FetchResponse.PartitionData(
-      Errors.NONE, 5, 5, 5, null, null))
+    val response = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+    response.put(tp0, new FetchResponseData.PartitionData()
+      .setErrorCode(Errors.NONE.code)
+      .setHighWatermark(100)
+      .setLastStableOffset(100)
+      .setLogStartOffset(100)
+      .setAbortedTransactions(null)
+      .setRecords(null))

Review comment:
       We can remove some redundant setters?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -761,76 +754,80 @@ class KafkaApis(val requestChannel: RequestChannel,
             // For fetch requests from clients, check if down-conversion is 
disabled for the particular partition
             if (!fetchRequest.isFromFollower && 
!logConfig.forall(_.messageDownConversionEnable)) {
               trace(s"Conversion to message format ${downConvertMagic.get} is 
disabled for partition $tp. Sending unsupported version response to $clientId.")
-              errorResponse(Errors.UNSUPPORTED_VERSION)
+              FetchResponse.partitionResponse(tp.partition, 
Errors.UNSUPPORTED_VERSION)
             } else {
               try {
                 trace(s"Down converting records from partition $tp to message 
format version $magic for fetch request from $clientId")
                 // Because down-conversion is extremely memory intensive, we 
want to try and delay the down-conversion as much
                 // as possible. With KIP-283, we have the ability to lazily 
down-convert in a chunked manner. The lazy, chunked
                 // down-conversion always guarantees that at least one batch 
of messages is down-converted and sent out to the
                 // client.
-                val error = maybeDownConvertStorageError(partitionData.error)
-                new FetchResponse.PartitionData[BaseRecords](error, 
partitionData.highWatermark,
-                  partitionData.lastStableOffset, partitionData.logStartOffset,
-                  partitionData.preferredReadReplica, 
partitionData.abortedTransactions,
-                  new LazyDownConversionRecords(tp, unconvertedRecords, magic, 
fetchContext.getFetchOffset(tp).get, time))
+                new FetchResponseData.PartitionData()
+                  .setPartitionIndex(tp.partition)
+                  
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+                  .setHighWatermark(partitionData.highWatermark)
+                  .setLastStableOffset(partitionData.lastStableOffset)
+                  .setLogStartOffset(partitionData.logStartOffset)
+                  .setAbortedTransactions(partitionData.abortedTransactions)
+                  .setRecords(new LazyDownConversionRecords(tp, 
unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
+                  
.setPreferredReadReplica(partitionData.preferredReadReplica())

Review comment:
       Do we have to copy like this or can we mutate the response?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -150,22 +150,23 @@ private static void assertListEquals(List<TopicPartition> 
expected, List<TopicPa
 
     private static final class RespEntry {
         final TopicPartition part;
-        final FetchResponse.PartitionData<MemoryRecords> data;
+        final FetchResponseData.PartitionData data;
 
         RespEntry(String topic, int partition, long highWatermark, long 
lastStableOffset) {
             this.part = new TopicPartition(topic, partition);
-            this.data = new FetchResponse.PartitionData<>(
-                Errors.NONE,
-                highWatermark,
-                lastStableOffset,
-                0,
-                null,
-                null);
+
+            this.data = new FetchResponseData.PartitionData()
+                        .setErrorCode(Errors.NONE.code())

Review comment:
       Nit: indenting seems excessive.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -364,7 +361,7 @@ abstract class AbstractFetcherThread(name: String,
                       }
                     }
                     if (isTruncationOnFetchSupported) {
-                      partitionData.divergingEpoch.ifPresent { divergingEpoch 
=>
+                     FetchResponse.divergingEpoch(partitionData).ifPresent { 
divergingEpoch =>

Review comment:
       Nit: indenting seems wrong.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -150,22 +150,23 @@ private static void assertListEquals(List<TopicPartition> 
expected, List<TopicPa
 
     private static final class RespEntry {
         final TopicPartition part;
-        final FetchResponse.PartitionData<MemoryRecords> data;
+        final FetchResponseData.PartitionData data;
 
         RespEntry(String topic, int partition, long highWatermark, long 
lastStableOffset) {
             this.part = new TopicPartition(topic, partition);
-            this.data = new FetchResponse.PartitionData<>(
-                Errors.NONE,
-                highWatermark,
-                lastStableOffset,
-                0,
-                null,
-                null);
+
+            this.data = new FetchResponseData.PartitionData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setHighWatermark(highWatermark)
+                        .setLastStableOffset(lastStableOffset)
+                        .setLogStartOffset(0)
+                        .setAbortedTransactions(null)
+                        .setRecords(null);

Review comment:
       Are some of these redundant? (eg setAbortedTransactions(null))

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -734,7 +730,7 @@ abstract class AbstractFetcherThread(name: String,
     Option(partitionStates.stateValue(topicPartition))
   }
 
-  protected def toMemoryRecords(records: Records): MemoryRecords = {
+  protected def toMemoryRecords(records: BaseRecords): MemoryRecords = {

Review comment:
       Do we have to update the matching inside the method to handle other 
potential records types? Or do we want to avoid changing this method signature 
instead?

##########
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
##########
@@ -78,19 +78,25 @@ public void setup() {
         for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
             String topic = UUID.randomUUID().toString();
             for (int partitionId = 0; partitionId < partitionCount; 
partitionId++) {
-                FetchResponse.PartitionData<MemoryRecords> partitionData = new 
FetchResponse.PartitionData<>(
-                    Errors.NONE, 0, 0, 0, Optional.empty(), 
Collections.emptyList(), records);
+                FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+                                .setPartitionIndex(partitionId)
+                                .setErrorCode(Errors.NONE.code())
+                                .setHighWatermark(0)
+                                .setLastStableOffset(0)
+                                .setLogStartOffset(0)

Review comment:
       We can remove some redundant setters?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -340,7 +336,8 @@ abstract class AbstractFetcherThread(name: String,
             // the current offset is the same as the offset requested.
             val fetchPartitionData = sessionPartitions.get(topicPartition)
             if (fetchPartitionData != null && fetchPartitionData.fetchOffset 
== currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
-              partitionData.error match {
+              val partitionError = Errors.forCode(partitionData.errorCode)

Review comment:
       Not clear why we need this val. Seems like we can introduce a variable 
in the `case _` instead.

##########
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java
##########
@@ -70,24 +70,25 @@ public void setUp() {
         handler = new FetchSessionHandler(LOG_CONTEXT, 1);
         FetchSessionHandler.Builder builder = handler.newBuilder();
 
-        LinkedHashMap<TopicPartition, 
FetchResponse.PartitionData<MemoryRecords>> respMap = new LinkedHashMap<>();
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> respMap 
= new LinkedHashMap<>();
         for (int i = 0; i < partitionCount; i++) {
             TopicPartition tp = new TopicPartition("foo", i);
             FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(0, 0, 200,
                     Optional.empty());
             fetches.put(tp, partitionData);
             builder.add(tp, partitionData);
-            respMap.put(tp, new FetchResponse.PartitionData<>(
-                    Errors.NONE,
-                    0L,
-                    0L,
-                    0,
-                    null,
-                    null));
+            respMap.put(tp, new FetchResponseData.PartitionData()
+                            .setPartitionIndex(tp.partition())
+                            .setErrorCode(Errors.NONE.code())
+                            .setHighWatermark(0)
+                            .setLastStableOffset(0)
+                            .setLogStartOffset(0)
+                            .setAbortedTransactions(null)
+                            .setRecords(null));

Review comment:
       We can remove some redundant set calls?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -761,76 +754,80 @@ class KafkaApis(val requestChannel: RequestChannel,
             // For fetch requests from clients, check if down-conversion is 
disabled for the particular partition
             if (!fetchRequest.isFromFollower && 
!logConfig.forall(_.messageDownConversionEnable)) {
               trace(s"Conversion to message format ${downConvertMagic.get} is 
disabled for partition $tp. Sending unsupported version response to $clientId.")
-              errorResponse(Errors.UNSUPPORTED_VERSION)
+              FetchResponse.partitionResponse(tp.partition, 
Errors.UNSUPPORTED_VERSION)
             } else {
               try {
                 trace(s"Down converting records from partition $tp to message 
format version $magic for fetch request from $clientId")
                 // Because down-conversion is extremely memory intensive, we 
want to try and delay the down-conversion as much
                 // as possible. With KIP-283, we have the ability to lazily 
down-convert in a chunked manner. The lazy, chunked
                 // down-conversion always guarantees that at least one batch 
of messages is down-converted and sent out to the
                 // client.
-                val error = maybeDownConvertStorageError(partitionData.error)
-                new FetchResponse.PartitionData[BaseRecords](error, 
partitionData.highWatermark,
-                  partitionData.lastStableOffset, partitionData.logStartOffset,
-                  partitionData.preferredReadReplica, 
partitionData.abortedTransactions,
-                  new LazyDownConversionRecords(tp, unconvertedRecords, magic, 
fetchContext.getFetchOffset(tp).get, time))
+                new FetchResponseData.PartitionData()
+                  .setPartitionIndex(tp.partition)
+                  
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+                  .setHighWatermark(partitionData.highWatermark)
+                  .setLastStableOffset(partitionData.lastStableOffset)
+                  .setLogStartOffset(partitionData.logStartOffset)
+                  .setAbortedTransactions(partitionData.abortedTransactions)
+                  .setRecords(new LazyDownConversionRecords(tp, 
unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
+                  
.setPreferredReadReplica(partitionData.preferredReadReplica())
               } catch {
                 case e: UnsupportedCompressionTypeException =>
                   trace("Received unsupported compression type error during 
down-conversion", e)
-                  errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
+                  FetchResponse.partitionResponse(tp.partition, 
Errors.UNSUPPORTED_COMPRESSION_TYPE)
               }
             }
           case None =>
-            val error = maybeDownConvertStorageError(partitionData.error)
-            new FetchResponse.PartitionData[BaseRecords](error,
-              partitionData.highWatermark,
-              partitionData.lastStableOffset,
-              partitionData.logStartOffset,
-              partitionData.preferredReadReplica,
-              partitionData.abortedTransactions,
-              partitionData.divergingEpoch,
-              unconvertedRecords)
+            new FetchResponseData.PartitionData()
+              .setPartitionIndex(tp.partition)
+              
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+              .setHighWatermark(partitionData.highWatermark)
+              .setLastStableOffset(partitionData.lastStableOffset)
+              .setLogStartOffset(partitionData.logStartOffset)
+              .setAbortedTransactions(partitionData.abortedTransactions)
+              .setRecords(unconvertedRecords)
+              .setPreferredReadReplica(partitionData.preferredReadReplica)
+              .setDivergingEpoch(partitionData.divergingEpoch)

Review comment:
       Similar question, is the copy required?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -761,76 +754,80 @@ class KafkaApis(val requestChannel: RequestChannel,
             // For fetch requests from clients, check if down-conversion is 
disabled for the particular partition
             if (!fetchRequest.isFromFollower && 
!logConfig.forall(_.messageDownConversionEnable)) {
               trace(s"Conversion to message format ${downConvertMagic.get} is 
disabled for partition $tp. Sending unsupported version response to $clientId.")
-              errorResponse(Errors.UNSUPPORTED_VERSION)
+              FetchResponse.partitionResponse(tp.partition, 
Errors.UNSUPPORTED_VERSION)
             } else {
               try {
                 trace(s"Down converting records from partition $tp to message 
format version $magic for fetch request from $clientId")
                 // Because down-conversion is extremely memory intensive, we 
want to try and delay the down-conversion as much
                 // as possible. With KIP-283, we have the ability to lazily 
down-convert in a chunked manner. The lazy, chunked
                 // down-conversion always guarantees that at least one batch 
of messages is down-converted and sent out to the
                 // client.
-                val error = maybeDownConvertStorageError(partitionData.error)
-                new FetchResponse.PartitionData[BaseRecords](error, 
partitionData.highWatermark,
-                  partitionData.lastStableOffset, partitionData.logStartOffset,
-                  partitionData.preferredReadReplica, 
partitionData.abortedTransactions,
-                  new LazyDownConversionRecords(tp, unconvertedRecords, magic, 
fetchContext.getFetchOffset(tp).get, time))
+                new FetchResponseData.PartitionData()
+                  .setPartitionIndex(tp.partition)
+                  
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+                  .setHighWatermark(partitionData.highWatermark)
+                  .setLastStableOffset(partitionData.lastStableOffset)
+                  .setLogStartOffset(partitionData.logStartOffset)
+                  .setAbortedTransactions(partitionData.abortedTransactions)
+                  .setRecords(new LazyDownConversionRecords(tp, 
unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
+                  
.setPreferredReadReplica(partitionData.preferredReadReplica())
               } catch {
                 case e: UnsupportedCompressionTypeException =>
                   trace("Received unsupported compression type error during 
down-conversion", e)
-                  errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
+                  FetchResponse.partitionResponse(tp.partition, 
Errors.UNSUPPORTED_COMPRESSION_TYPE)
               }
             }
           case None =>
-            val error = maybeDownConvertStorageError(partitionData.error)
-            new FetchResponse.PartitionData[BaseRecords](error,
-              partitionData.highWatermark,
-              partitionData.lastStableOffset,
-              partitionData.logStartOffset,
-              partitionData.preferredReadReplica,
-              partitionData.abortedTransactions,
-              partitionData.divergingEpoch,
-              unconvertedRecords)
+            new FetchResponseData.PartitionData()
+              .setPartitionIndex(tp.partition)
+              
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+              .setHighWatermark(partitionData.highWatermark)
+              .setLastStableOffset(partitionData.lastStableOffset)
+              .setLogStartOffset(partitionData.logStartOffset)
+              .setAbortedTransactions(partitionData.abortedTransactions)
+              .setRecords(unconvertedRecords)
+              .setPreferredReadReplica(partitionData.preferredReadReplica)
+              .setDivergingEpoch(partitionData.divergingEpoch)
         }
       }
     }
 
     // the callback for process a fetch response, invoked before throttling
     def processResponseCallback(responsePartitionData: Seq[(TopicPartition, 
FetchPartitionData)]): Unit = {
-      val partitions = new util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData[Records]]
+      val partitions = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
       val reassigningPartitions = mutable.Set[TopicPartition]()
       responsePartitionData.foreach { case (tp, data) =>
         val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
         val lastStableOffset = 
data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
-        if (data.isReassignmentFetch)
-          reassigningPartitions.add(tp)
-        val error = maybeDownConvertStorageError(data.error)
-        partitions.put(tp, new FetchResponse.PartitionData(
-          error,
-          data.highWatermark,
-          lastStableOffset,
-          data.logStartOffset,
-          data.preferredReadReplica.map(int2Integer).asJava,
-          abortedTransactions,
-          data.divergingEpoch.asJava,
-          data.records))
+        if (data.isReassignmentFetch) reassigningPartitions.add(tp)
+        partitions.put(tp, new FetchResponseData.PartitionData()
+            .setPartitionIndex(tp.partition)
+            .setErrorCode(maybeDownConvertStorageError(data.error).code)
+            .setHighWatermark(data.highWatermark)
+            .setLastStableOffset(lastStableOffset)
+            .setLogStartOffset(data.logStartOffset)
+            .setAbortedTransactions(abortedTransactions)
+            .setRecords(data.records)
+            
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
+            .setDivergingEpoch(data.divergingEpoch.getOrElse(new 
FetchResponseData.EpochEndOffset)))

Review comment:
       Similar question, is the copy required?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -304,58 +115,12 @@ public int sessionId() {
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
         updateErrorCounts(errorCounts, error());
-        responseDataMap.values().forEach(response ->
-            updateErrorCounts(errorCounts, response.error())
-        );
+        dataByTopicPartition.values().forEach(response -> 
updateErrorCounts(errorCounts, Errors.forCode(response.errorCode())));
         return errorCounts;
     }
 
-    public static FetchResponse<MemoryRecords> parse(ByteBuffer buffer, short 
version) {
-        return new FetchResponse<>(new FetchResponseData(new 
ByteBufferAccessor(buffer), version));
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <T extends BaseRecords> LinkedHashMap<TopicPartition, 
PartitionData<T>> toResponseDataMap(
-            FetchResponseData message) {
-        LinkedHashMap<TopicPartition, PartitionData<T>> responseMap = new 
LinkedHashMap<>();
-        message.responses().forEach(topicResponse -> {
-            topicResponse.partitionResponses().forEach(partitionResponse -> {
-                TopicPartition tp = new TopicPartition(topicResponse.topic(), 
partitionResponse.partition());
-                PartitionData<T> partitionData = new 
PartitionData<>(partitionResponse);
-                responseMap.put(tp, partitionData);
-            });
-        });
-        return responseMap;
-    }
-
-    private static <T extends BaseRecords> FetchResponseData toMessage(int 
throttleTimeMs, Errors error,

Review comment:
       Can you clarify what you mean here? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to