hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r518984064



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
##########
@@ -204,118 +75,78 @@ public ProduceResponse(Map<TopicPartition, 
PartitionResponse> responses) {
      * @param throttleTimeMs Time in milliseconds the response was throttled
      */
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, 
int throttleTimeMs) {
-        this.responses = responses;
-        this.throttleTimeMs = throttleTimeMs;
+        this(new ProduceResponseData()
+            .setResponses(responses.entrySet()
+                .stream()
+                .collect(Collectors.groupingBy(e -> e.getKey().topic()))
+                .entrySet()
+                .stream()
+                .map(topicData -> new 
ProduceResponseData.TopicProduceResponse()

Review comment:
       Not required, but this would be easier to follow up if we had some 
helpers.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -560,13 +561,23 @@ private void handleProduceResponse(ClientResponse 
response, Map<TopicPartition,
             log.trace("Received produce response from node {} with correlation 
id {}", response.destination(), correlationId);
             // if we have a response, parse it
             if (response.hasResponse()) {
+                // TODO: Sender should exercise PartitionProduceResponse 
rather than ProduceResponse.PartitionResponse

Review comment:
       Is the plan to save this for a follow-up? It looks like it will be a bit 
of effort to trace down all the uses, but seems doable.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
##########
@@ -204,118 +75,78 @@ public ProduceResponse(Map<TopicPartition, 
PartitionResponse> responses) {
      * @param throttleTimeMs Time in milliseconds the response was throttled
      */
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, 
int throttleTimeMs) {
-        this.responses = responses;
-        this.throttleTimeMs = throttleTimeMs;
+        this(new ProduceResponseData()
+            .setResponses(responses.entrySet()
+                .stream()
+                .collect(Collectors.groupingBy(e -> e.getKey().topic()))
+                .entrySet()
+                .stream()
+                .map(topicData -> new 
ProduceResponseData.TopicProduceResponse()
+                    .setTopic(topicData.getKey())
+                    .setPartitionResponses(topicData.getValue()
+                        .stream()
+                        .map(p -> new 
ProduceResponseData.PartitionProduceResponse()
+                            .setPartition(p.getKey().partition())
+                            .setBaseOffset(p.getValue().baseOffset)
+                            .setLogStartOffset(p.getValue().logStartOffset)
+                            .setLogAppendTime(p.getValue().logAppendTime)
+                            .setErrorMessage(p.getValue().errorMessage)
+                            .setErrorCode(p.getValue().error.code())
+                            .setRecordErrors(p.getValue().recordErrors
+                                .stream()
+                                .map(e -> new 
ProduceResponseData.BatchIndexAndErrorMessage()
+                                    .setBatchIndex(e.batchIndex)
+                                    .setBatchIndexErrorMessage(e.message))
+                                .collect(Collectors.toList())))
+                        .collect(Collectors.toList())))
+                .collect(Collectors.toList()))
+            .setThrottleTimeMs(throttleTimeMs));
     }
 
     /**
-     * Constructor from a {@link Struct}.
+     * Visible for testing.
      */
-    public ProduceResponse(Struct struct) {
-        responses = new HashMap<>();
-        for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicRespStruct = (Struct) topicResponse;
-            String topic = topicRespStruct.get(TOPIC_NAME);
-
-            for (Object partResponse : 
topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) {
-                Struct partRespStruct = (Struct) partResponse;
-                int partition = partRespStruct.get(PARTITION_ID);
-                Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE));
-                long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
-                long logAppendTime = 
partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ?
-                        partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : 
RecordBatch.NO_TIMESTAMP;
-                long logStartOffset = 
partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);
-
-                List<RecordError> recordErrors = Collections.emptyList();
-                if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
-                    Object[] recordErrorsArray = 
partRespStruct.getArray(RECORD_ERRORS_KEY_NAME);
-                    if (recordErrorsArray.length > 0) {
-                        recordErrors = new 
ArrayList<>(recordErrorsArray.length);
-                        for (Object indexAndMessage : recordErrorsArray) {
-                            Struct indexAndMessageStruct = (Struct) 
indexAndMessage;
-                            recordErrors.add(new RecordError(
-                                    
indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME),
-                                    
indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD)
-                            ));
-                        }
-                    }
-                }
-
-                String errorMessage = 
partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null);
-                TopicPartition tp = new TopicPartition(topic, partition);
-                responses.put(tp, new PartitionResponse(error, offset, 
logAppendTime, logStartOffset, recordErrors, errorMessage));
-            }
-        }
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, 
DEFAULT_THROTTLE_TIME);
-    }
-
     @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.PRODUCE.responseSchema(version));
-
-        Map<String, Map<Integer, PartitionResponse>> responseByTopic = 
CollectionUtils.groupPartitionDataByTopic(responses);
-        List<Struct> topicDatas = new ArrayList<>(responseByTopic.size());
-        for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : 
responseByTopic.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_NAME, entry.getKey());
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionResponse> partitionEntry : 
entry.getValue().entrySet()) {
-                PartitionResponse part = partitionEntry.getValue();
-                short errorCode = part.error.code();
-                // If producer sends ProduceRequest V3 or earlier, the client 
library is not guaranteed to recognize the error code
-                // for KafkaStorageException. In this case the client library 
will translate KafkaStorageException to
-                // UnknownServerException which is not retriable. We can 
ensure that producer will update metadata and retry
-                // by converting the KafkaStorageException to 
NotLeaderOrFollowerException in the response if ProduceRequest version <= 3
-                if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version 
<= 3)
-                    errorCode = Errors.NOT_LEADER_OR_FOLLOWER.code();
-                Struct partStruct = 
topicData.instance(PARTITION_RESPONSES_KEY_NAME)
-                        .set(PARTITION_ID, partitionEntry.getKey())
-                        .set(ERROR_CODE, errorCode)
-                        .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
-                partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, 
part.logAppendTime);
-                partStruct.setIfExists(LOG_START_OFFSET_FIELD, 
part.logStartOffset);
-
-                if (partStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
-                    List<Struct> recordErrors = Collections.emptyList();
-                    if (!part.recordErrors.isEmpty()) {
-                        recordErrors = new ArrayList<>();
-                        for (RecordError indexAndMessage : part.recordErrors) {
-                            Struct indexAndMessageStruct = 
partStruct.instance(RECORD_ERRORS_KEY_NAME)
-                                    .set(BATCH_INDEX_KEY_NAME, 
indexAndMessage.batchIndex)
-                                    .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, 
indexAndMessage.message);
-                            recordErrors.add(indexAndMessageStruct);
-                        }
-                    }
-                    partStruct.set(RECORD_ERRORS_KEY_NAME, 
recordErrors.toArray());
-                }
-
-                partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage);
-                partitionArray.add(partStruct);
-            }
-            topicData.set(PARTITION_RESPONSES_KEY_NAME, 
partitionArray.toArray());
-            topicDatas.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+    public Struct toStruct(short version) {
+        return data.toStruct(version);
+    }
 
-        return struct;
+    public ProduceResponseData data() {
+        return this.data;
     }
 
+    /**
+     * this method is used by testing only.
+     * TODO: refactor the tests which are using this method and then remove 
this method from production code.

Review comment:
       Sounds good to refactor. Perhaps we can turn this TODO into a jira?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -517,19 +517,23 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       // Note that authorization to a transactionalId implies ProducerId 
authorization
 
-    } else if (produceRequest.hasIdempotentRecords && 
!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
+    } else if (RequestUtils.hasIdempotentRecords(produceRequest) && 
!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
       sendErrorResponseMaybeThrottle(request, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
       return
     }
 
-    val produceRecords = produceRequest.partitionRecordsOrFail.asScala
     val unauthorizedTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
     val nonExistingTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
     val invalidRequestResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
     val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
-    val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, 
produceRecords)(_._1.topic)
-
-    for ((topicPartition, memoryRecords) <- produceRecords) {
+    val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, 
produceRequest.dataOrException().topicData().asScala)(_.topic())
+
+    produceRequest.dataOrException().topicData().forEach(topic => 
topic.data().forEach { partition =>
+      val topicPartition = new TopicPartition(topic.topic(), 
partition.partition())
+      // This caller assumes the type is MemoryRecords and that is true on 
current serialization
+      // We cast the type to avoid causing big change to code base.
+      // TODO: maybe we need to refactor code to avoid this casting

Review comment:
       That's a good question. I can't think of any great options. We ended up 
making `FetchResponse` generic to address a similar issue. I think the cast is 
reasonable for now. Can we move the TODO to a jira?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##########
@@ -194,7 +107,27 @@ private ProduceRequest build(short version, boolean 
validate) {
                     ProduceRequest.validateRecords(version, records);
                 }
             }
-            return new ProduceRequest(version, acks, timeout, 
partitionRecords, transactionalId);
+
+            List<ProduceRequestData.TopicProduceData> tpd = partitionRecords

Review comment:
       I wonder if we would get any benefit computing `partitionSizes` during 
this pass. 

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -517,19 +517,23 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       // Note that authorization to a transactionalId implies ProducerId 
authorization
 
-    } else if (produceRequest.hasIdempotentRecords && 
!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
+    } else if (RequestUtils.hasIdempotentRecords(produceRequest) && 
!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
       sendErrorResponseMaybeThrottle(request, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
       return
     }
 
-    val produceRecords = produceRequest.partitionRecordsOrFail.asScala
     val unauthorizedTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
     val nonExistingTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
     val invalidRequestResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
     val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
-    val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, 
produceRecords)(_._1.topic)
-
-    for ((topicPartition, memoryRecords) <- produceRecords) {
+    val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, 
produceRequest.dataOrException().topicData().asScala)(_.topic())
+
+    produceRequest.dataOrException().topicData().forEach(topic => 
topic.data().forEach { partition =>
+      val topicPartition = new TopicPartition(topic.topic(), 
partition.partition())

Review comment:
       nit: unnecessary parenthesis (a few of these around here)

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -517,19 +517,23 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       // Note that authorization to a transactionalId implies ProducerId 
authorization
 
-    } else if (produceRequest.hasIdempotentRecords && 
!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
+    } else if (RequestUtils.hasIdempotentRecords(produceRequest) && 
!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
       sendErrorResponseMaybeThrottle(request, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
       return
     }
 
-    val produceRecords = produceRequest.partitionRecordsOrFail.asScala
     val unauthorizedTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
     val nonExistingTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
     val invalidRequestResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
     val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
-    val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, 
produceRecords)(_._1.topic)
-
-    for ((topicPartition, memoryRecords) <- produceRecords) {
+    val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, 
produceRequest.dataOrException().topicData().asScala)(_.topic())

Review comment:
       This logic surprised me a little bit until I realized that we were 
trying to avoid redundant authorization calls. Might be worth adding a comment 
since I was almost ready to suggest moving this logic into the loop.

##########
File path: clients/src/main/resources/common/message/ProduceRequest.json
##########
@@ -33,21 +33,21 @@
   "validVersions": "0-8",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "0+", "entityType": "transactionalId",
+    { "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "3+", "ignorable": true, "entityType": "transactionalId",
       "about": "The transactional ID, or null if the producer is not 
transactional." },
     { "name": "Acks", "type": "int16", "versions": "0+",
       "about": "The number of acknowledgments the producer requires the leader 
to have received before considering a request complete. Allowed values: 0 for 
no acknowledgments, 1 for only the leader and -1 for the full ISR." },
-    { "name": "TimeoutMs", "type": "int32", "versions": "0+",
+    { "name": "Timeout", "type": "int32", "versions": "0+",

Review comment:
       The names do not get serialized, so I think we can make them whatever we 
want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to