hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r453095731



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +368,164 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> 
responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.fetchResponseData = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);

Review comment:
       Probably better to save for a follow-up, but potentially we can get rid 
of this conversion by using `FetchablePartitionResponse` directly in the 
broker. 

##########
File path: clients/src/main/resources/common/message/FetchRequest.json
##########
@@ -49,41 +49,41 @@
   "fields": [
     { "name": "ReplicaId", "type": "int32", "versions": "0+",
       "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
-    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
+    { "name": "MaxWaitTime", "type": "int32", "versions": "0+",

Review comment:
       Can we revert some of these renamings? We intentionally changed them in 
#8802. 

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -209,30 +210,21 @@
             FORGOTTEN_TOPIC_DATA_V7,
             RACK_ID);

Review comment:
       Similarly, we can get rid of all this.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +368,164 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> 
responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.fetchResponseData = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> 
responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = 
partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = 
Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = 
partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = 
partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, 
INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = 
partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, 
INVALID_PREFERRED_REPLICA_ID)
-                
).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type 
found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if 
(partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = 
partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new 
ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : 
abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) 
abortedTransactionObj;
-                            long producerId = 
abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = 
abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new 
AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new 
PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, 
abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), 
partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, 
(short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), 
struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.fetchResponseData = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        return fetchResponseData.toStruct(version);
     }
 
     @Override
     protected Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        RecordsWriter writer = new RecordsWriter(dest, sends::add);
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        fetchResponseData.size(cache, apiVersion);
+        fetchResponseData.write(writer, cache, apiVersion);
+        writer.flush();
+
+        // Compute the total size of all the Sends and write it out along with 
the header in the first Send
+        ResponseHeaderData responseHeaderData = responseHeader.data();
+
+        //Struct responseHeaderStruct = responseHeader.toStruct();

Review comment:
       nit: I guess we didn't need this?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -218,23 +220,19 @@
             SESSION_ID,
             new Field(RESPONSES_KEY_NAME, new 
ArrayOf(FETCH_RESPONSE_TOPIC_V6)));
 
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, 
FETCH_RESPONSE_V2,
-            FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, 
FETCH_RESPONSE_V6,
-            FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9, 
FETCH_RESPONSE_V10,
-            FETCH_RESPONSE_V11};
-    }
-
     public static final long INVALID_HIGHWATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
     public static final int INVALID_PREFERRED_REPLICA_ID = -1;
 
-    private final int throttleTimeMs;
-    private final Errors error;
-    private final int sessionId;
-    private final LinkedHashMap<TopicPartition, PartitionData<T>> responseData;
+    private final FetchResponseData fetchResponseData;

Review comment:
       nit: in all of the other classes, we just use the name `data`. Can we do 
the same here?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -218,23 +220,19 @@
             SESSION_ID,
             new Field(RESPONSES_KEY_NAME, new 
ArrayOf(FETCH_RESPONSE_TOPIC_V6)));

Review comment:
       We can get rid of all the stuff above too, right?

##########
File path: 
generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
                 buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return 
false;%n",
                     field.camelCaseName(), field.camelCaseName());
             }
+        } else if (field.type().isRecords()) {
+            // TODO is this valid for record instances?

Review comment:
       I don't think `FileRecords` and `MemoryRecords` instances can be 
compared directly, if that's what the question is about.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read 
records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {

Review comment:
       Is it worth extending `ByteBufferAccessor` or not?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +368,164 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> 
responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.fetchResponseData = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> 
responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = 
partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = 
Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = 
partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = 
partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, 
INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = 
partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, 
INVALID_PREFERRED_REPLICA_ID)
-                
).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type 
found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if 
(partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = 
partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new 
ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : 
abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) 
abortedTransactionObj;
-                            long producerId = 
abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = 
abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new 
AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new 
PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, 
abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), 
partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, 
(short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), 
struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.fetchResponseData = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        return fetchResponseData.toStruct(version);
     }
 
     @Override
     protected Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        RecordsWriter writer = new RecordsWriter(dest, sends::add);

Review comment:
       Pretty nice if this is all the manual code we need. If we wanted to go a 
little further, we could push `toSend` into the generated class as well. That 
will be necessary if we ever want to get of the current `AbstractRequest` and 
`AbstractResponse` types and replace them with the generated data classes 
(which was always the plan). However, I think this could be left for follow-up 
work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to