chia7712 commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r519251131
########## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ########## @@ -204,118 +75,78 @@ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) { * @param throttleTimeMs Time in milliseconds the response was throttled */ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) { - this.responses = responses; - this.throttleTimeMs = throttleTimeMs; + this(new ProduceResponseData() + .setResponses(responses.entrySet() + .stream() + .collect(Collectors.groupingBy(e -> e.getKey().topic())) + .entrySet() + .stream() + .map(topicData -> new ProduceResponseData.TopicProduceResponse() + .setTopic(topicData.getKey()) + .setPartitionResponses(topicData.getValue() + .stream() + .map(p -> new ProduceResponseData.PartitionProduceResponse() + .setPartition(p.getKey().partition()) + .setBaseOffset(p.getValue().baseOffset) + .setLogStartOffset(p.getValue().logStartOffset) + .setLogAppendTime(p.getValue().logAppendTime) + .setErrorMessage(p.getValue().errorMessage) + .setErrorCode(p.getValue().error.code()) + .setRecordErrors(p.getValue().recordErrors + .stream() + .map(e -> new ProduceResponseData.BatchIndexAndErrorMessage() + .setBatchIndex(e.batchIndex) + .setBatchIndexErrorMessage(e.message)) + .collect(Collectors.toList()))) + .collect(Collectors.toList()))) + .collect(Collectors.toList())) + .setThrottleTimeMs(throttleTimeMs)); } /** - * Constructor from a {@link Struct}. + * Visible for testing. */ - public ProduceResponse(Struct struct) { - responses = new HashMap<>(); - for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) { - Struct topicRespStruct = (Struct) topicResponse; - String topic = topicRespStruct.get(TOPIC_NAME); - - for (Object partResponse : topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) { - Struct partRespStruct = (Struct) partResponse; - int partition = partRespStruct.get(PARTITION_ID); - Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE)); - long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); - long logAppendTime = partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ? - partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : RecordBatch.NO_TIMESTAMP; - long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); - - List<RecordError> recordErrors = Collections.emptyList(); - if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) { - Object[] recordErrorsArray = partRespStruct.getArray(RECORD_ERRORS_KEY_NAME); - if (recordErrorsArray.length > 0) { - recordErrors = new ArrayList<>(recordErrorsArray.length); - for (Object indexAndMessage : recordErrorsArray) { - Struct indexAndMessageStruct = (Struct) indexAndMessage; - recordErrors.add(new RecordError( - indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME), - indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD) - )); - } - } - } - - String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null); - TopicPartition tp = new TopicPartition(topic, partition); - responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, recordErrors, errorMessage)); - } - } - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); - } - @Override - protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.PRODUCE.responseSchema(version)); - - Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupPartitionDataByTopic(responses); - List<Struct> topicDatas = new ArrayList<>(responseByTopic.size()); - for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) { - Struct topicData = struct.instance(RESPONSES_KEY_NAME); - topicData.set(TOPIC_NAME, entry.getKey()); - List<Struct> partitionArray = new ArrayList<>(); - for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) { - PartitionResponse part = partitionEntry.getValue(); - short errorCode = part.error.code(); - // If producer sends ProduceRequest V3 or earlier, the client library is not guaranteed to recognize the error code - // for KafkaStorageException. In this case the client library will translate KafkaStorageException to - // UnknownServerException which is not retriable. We can ensure that producer will update metadata and retry - // by converting the KafkaStorageException to NotLeaderOrFollowerException in the response if ProduceRequest version <= 3 - if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 3) - errorCode = Errors.NOT_LEADER_OR_FOLLOWER.code(); - Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME) - .set(PARTITION_ID, partitionEntry.getKey()) - .set(ERROR_CODE, errorCode) - .set(BASE_OFFSET_KEY_NAME, part.baseOffset); - partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); - partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); - - if (partStruct.hasField(RECORD_ERRORS_KEY_NAME)) { - List<Struct> recordErrors = Collections.emptyList(); - if (!part.recordErrors.isEmpty()) { - recordErrors = new ArrayList<>(); - for (RecordError indexAndMessage : part.recordErrors) { - Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME) - .set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex) - .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message); - recordErrors.add(indexAndMessageStruct); - } - } - partStruct.set(RECORD_ERRORS_KEY_NAME, recordErrors.toArray()); - } - - partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage); - partitionArray.add(partStruct); - } - topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray()); - topicDatas.add(topicData); - } - struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); - struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); + public Struct toStruct(short version) { + return data.toStruct(version); + } - return struct; + public ProduceResponseData data() { + return this.data; } + /** + * this method is used by testing only. + * TODO: refactor the tests which are using this method and then remove this method from production code. Review comment: https://issues.apache.org/jira/browse/KAFKA-10697 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org