chia7712 commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r519251131



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
##########
@@ -204,118 +75,78 @@ public ProduceResponse(Map<TopicPartition, 
PartitionResponse> responses) {
      * @param throttleTimeMs Time in milliseconds the response was throttled
      */
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, 
int throttleTimeMs) {
-        this.responses = responses;
-        this.throttleTimeMs = throttleTimeMs;
+        this(new ProduceResponseData()
+            .setResponses(responses.entrySet()
+                .stream()
+                .collect(Collectors.groupingBy(e -> e.getKey().topic()))
+                .entrySet()
+                .stream()
+                .map(topicData -> new 
ProduceResponseData.TopicProduceResponse()
+                    .setTopic(topicData.getKey())
+                    .setPartitionResponses(topicData.getValue()
+                        .stream()
+                        .map(p -> new 
ProduceResponseData.PartitionProduceResponse()
+                            .setPartition(p.getKey().partition())
+                            .setBaseOffset(p.getValue().baseOffset)
+                            .setLogStartOffset(p.getValue().logStartOffset)
+                            .setLogAppendTime(p.getValue().logAppendTime)
+                            .setErrorMessage(p.getValue().errorMessage)
+                            .setErrorCode(p.getValue().error.code())
+                            .setRecordErrors(p.getValue().recordErrors
+                                .stream()
+                                .map(e -> new 
ProduceResponseData.BatchIndexAndErrorMessage()
+                                    .setBatchIndex(e.batchIndex)
+                                    .setBatchIndexErrorMessage(e.message))
+                                .collect(Collectors.toList())))
+                        .collect(Collectors.toList())))
+                .collect(Collectors.toList()))
+            .setThrottleTimeMs(throttleTimeMs));
     }
 
     /**
-     * Constructor from a {@link Struct}.
+     * Visible for testing.
      */
-    public ProduceResponse(Struct struct) {
-        responses = new HashMap<>();
-        for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicRespStruct = (Struct) topicResponse;
-            String topic = topicRespStruct.get(TOPIC_NAME);
-
-            for (Object partResponse : 
topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) {
-                Struct partRespStruct = (Struct) partResponse;
-                int partition = partRespStruct.get(PARTITION_ID);
-                Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE));
-                long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
-                long logAppendTime = 
partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ?
-                        partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : 
RecordBatch.NO_TIMESTAMP;
-                long logStartOffset = 
partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);
-
-                List<RecordError> recordErrors = Collections.emptyList();
-                if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
-                    Object[] recordErrorsArray = 
partRespStruct.getArray(RECORD_ERRORS_KEY_NAME);
-                    if (recordErrorsArray.length > 0) {
-                        recordErrors = new 
ArrayList<>(recordErrorsArray.length);
-                        for (Object indexAndMessage : recordErrorsArray) {
-                            Struct indexAndMessageStruct = (Struct) 
indexAndMessage;
-                            recordErrors.add(new RecordError(
-                                    
indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME),
-                                    
indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD)
-                            ));
-                        }
-                    }
-                }
-
-                String errorMessage = 
partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null);
-                TopicPartition tp = new TopicPartition(topic, partition);
-                responses.put(tp, new PartitionResponse(error, offset, 
logAppendTime, logStartOffset, recordErrors, errorMessage));
-            }
-        }
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, 
DEFAULT_THROTTLE_TIME);
-    }
-
     @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.PRODUCE.responseSchema(version));
-
-        Map<String, Map<Integer, PartitionResponse>> responseByTopic = 
CollectionUtils.groupPartitionDataByTopic(responses);
-        List<Struct> topicDatas = new ArrayList<>(responseByTopic.size());
-        for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : 
responseByTopic.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_NAME, entry.getKey());
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionResponse> partitionEntry : 
entry.getValue().entrySet()) {
-                PartitionResponse part = partitionEntry.getValue();
-                short errorCode = part.error.code();
-                // If producer sends ProduceRequest V3 or earlier, the client 
library is not guaranteed to recognize the error code
-                // for KafkaStorageException. In this case the client library 
will translate KafkaStorageException to
-                // UnknownServerException which is not retriable. We can 
ensure that producer will update metadata and retry
-                // by converting the KafkaStorageException to 
NotLeaderOrFollowerException in the response if ProduceRequest version <= 3
-                if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version 
<= 3)
-                    errorCode = Errors.NOT_LEADER_OR_FOLLOWER.code();
-                Struct partStruct = 
topicData.instance(PARTITION_RESPONSES_KEY_NAME)
-                        .set(PARTITION_ID, partitionEntry.getKey())
-                        .set(ERROR_CODE, errorCode)
-                        .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
-                partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, 
part.logAppendTime);
-                partStruct.setIfExists(LOG_START_OFFSET_FIELD, 
part.logStartOffset);
-
-                if (partStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
-                    List<Struct> recordErrors = Collections.emptyList();
-                    if (!part.recordErrors.isEmpty()) {
-                        recordErrors = new ArrayList<>();
-                        for (RecordError indexAndMessage : part.recordErrors) {
-                            Struct indexAndMessageStruct = 
partStruct.instance(RECORD_ERRORS_KEY_NAME)
-                                    .set(BATCH_INDEX_KEY_NAME, 
indexAndMessage.batchIndex)
-                                    .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, 
indexAndMessage.message);
-                            recordErrors.add(indexAndMessageStruct);
-                        }
-                    }
-                    partStruct.set(RECORD_ERRORS_KEY_NAME, 
recordErrors.toArray());
-                }
-
-                partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage);
-                partitionArray.add(partStruct);
-            }
-            topicData.set(PARTITION_RESPONSES_KEY_NAME, 
partitionArray.toArray());
-            topicDatas.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
+    public Struct toStruct(short version) {
+        return data.toStruct(version);
+    }
 
-        return struct;
+    public ProduceResponseData data() {
+        return this.data;
     }
 
+    /**
+     * this method is used by testing only.
+     * TODO: refactor the tests which are using this method and then remove 
this method from production code.

Review comment:
       https://issues.apache.org/jira/browse/KAFKA-10697




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to