hachikuji commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r518984064
########## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ########## @@ -204,118 +75,78 @@ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) { * @param throttleTimeMs Time in milliseconds the response was throttled */ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) { - this.responses = responses; - this.throttleTimeMs = throttleTimeMs; + this(new ProduceResponseData() + .setResponses(responses.entrySet() + .stream() + .collect(Collectors.groupingBy(e -> e.getKey().topic())) + .entrySet() + .stream() + .map(topicData -> new ProduceResponseData.TopicProduceResponse() Review comment: Not required, but this would be easier to follow up if we had some helpers. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ########## @@ -560,13 +561,23 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId); // if we have a response, parse it if (response.hasResponse()) { + // TODO: Sender should exercise PartitionProduceResponse rather than ProduceResponse.PartitionResponse Review comment: Is the plan to save this for a follow-up? It looks like it will be a bit of effort to trace down all the uses, but seems doable. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ########## @@ -204,118 +75,78 @@ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) { * @param throttleTimeMs Time in milliseconds the response was throttled */ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) { - this.responses = responses; - this.throttleTimeMs = throttleTimeMs; + this(new ProduceResponseData() + .setResponses(responses.entrySet() + .stream() + .collect(Collectors.groupingBy(e -> e.getKey().topic())) + .entrySet() + .stream() + .map(topicData -> new ProduceResponseData.TopicProduceResponse() + .setTopic(topicData.getKey()) + .setPartitionResponses(topicData.getValue() + .stream() + .map(p -> new ProduceResponseData.PartitionProduceResponse() + .setPartition(p.getKey().partition()) + .setBaseOffset(p.getValue().baseOffset) + .setLogStartOffset(p.getValue().logStartOffset) + .setLogAppendTime(p.getValue().logAppendTime) + .setErrorMessage(p.getValue().errorMessage) + .setErrorCode(p.getValue().error.code()) + .setRecordErrors(p.getValue().recordErrors + .stream() + .map(e -> new ProduceResponseData.BatchIndexAndErrorMessage() + .setBatchIndex(e.batchIndex) + .setBatchIndexErrorMessage(e.message)) + .collect(Collectors.toList()))) + .collect(Collectors.toList()))) + .collect(Collectors.toList())) + .setThrottleTimeMs(throttleTimeMs)); } /** - * Constructor from a {@link Struct}. + * Visible for testing. */ - public ProduceResponse(Struct struct) { - responses = new HashMap<>(); - for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) { - Struct topicRespStruct = (Struct) topicResponse; - String topic = topicRespStruct.get(TOPIC_NAME); - - for (Object partResponse : topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) { - Struct partRespStruct = (Struct) partResponse; - int partition = partRespStruct.get(PARTITION_ID); - Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE)); - long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); - long logAppendTime = partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ? - partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : RecordBatch.NO_TIMESTAMP; - long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); - - List<RecordError> recordErrors = Collections.emptyList(); - if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) { - Object[] recordErrorsArray = partRespStruct.getArray(RECORD_ERRORS_KEY_NAME); - if (recordErrorsArray.length > 0) { - recordErrors = new ArrayList<>(recordErrorsArray.length); - for (Object indexAndMessage : recordErrorsArray) { - Struct indexAndMessageStruct = (Struct) indexAndMessage; - recordErrors.add(new RecordError( - indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME), - indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD) - )); - } - } - } - - String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null); - TopicPartition tp = new TopicPartition(topic, partition); - responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, recordErrors, errorMessage)); - } - } - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); - } - @Override - protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.PRODUCE.responseSchema(version)); - - Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupPartitionDataByTopic(responses); - List<Struct> topicDatas = new ArrayList<>(responseByTopic.size()); - for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) { - Struct topicData = struct.instance(RESPONSES_KEY_NAME); - topicData.set(TOPIC_NAME, entry.getKey()); - List<Struct> partitionArray = new ArrayList<>(); - for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) { - PartitionResponse part = partitionEntry.getValue(); - short errorCode = part.error.code(); - // If producer sends ProduceRequest V3 or earlier, the client library is not guaranteed to recognize the error code - // for KafkaStorageException. In this case the client library will translate KafkaStorageException to - // UnknownServerException which is not retriable. We can ensure that producer will update metadata and retry - // by converting the KafkaStorageException to NotLeaderOrFollowerException in the response if ProduceRequest version <= 3 - if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 3) - errorCode = Errors.NOT_LEADER_OR_FOLLOWER.code(); - Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME) - .set(PARTITION_ID, partitionEntry.getKey()) - .set(ERROR_CODE, errorCode) - .set(BASE_OFFSET_KEY_NAME, part.baseOffset); - partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); - partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); - - if (partStruct.hasField(RECORD_ERRORS_KEY_NAME)) { - List<Struct> recordErrors = Collections.emptyList(); - if (!part.recordErrors.isEmpty()) { - recordErrors = new ArrayList<>(); - for (RecordError indexAndMessage : part.recordErrors) { - Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME) - .set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex) - .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message); - recordErrors.add(indexAndMessageStruct); - } - } - partStruct.set(RECORD_ERRORS_KEY_NAME, recordErrors.toArray()); - } - - partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage); - partitionArray.add(partStruct); - } - topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray()); - topicDatas.add(topicData); - } - struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); - struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); + public Struct toStruct(short version) { + return data.toStruct(version); + } - return struct; + public ProduceResponseData data() { + return this.data; } + /** + * this method is used by testing only. + * TODO: refactor the tests which are using this method and then remove this method from production code. Review comment: Sounds good to refactor. Perhaps we can turn this TODO into a jira? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -517,19 +517,23 @@ class KafkaApis(val requestChannel: RequestChannel, } // Note that authorization to a transactionalId implies ProducerId authorization - } else if (produceRequest.hasIdempotentRecords && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { + } else if (RequestUtils.hasIdempotentRecords(produceRequest) && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception) return } - val produceRecords = produceRequest.partitionRecordsOrFail.asScala val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() - val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, produceRecords)(_._1.topic) - - for ((topicPartition, memoryRecords) <- produceRecords) { + val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, produceRequest.dataOrException().topicData().asScala)(_.topic()) + + produceRequest.dataOrException().topicData().forEach(topic => topic.data().forEach { partition => + val topicPartition = new TopicPartition(topic.topic(), partition.partition()) + // This caller assumes the type is MemoryRecords and that is true on current serialization + // We cast the type to avoid causing big change to code base. + // TODO: maybe we need to refactor code to avoid this casting Review comment: That's a good question. I can't think of any great options. We ended up making `FetchResponse` generic to address a similar issue. I think the cast is reasonable for now. Can we move the TODO to a jira? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ########## @@ -194,7 +107,27 @@ private ProduceRequest build(short version, boolean validate) { ProduceRequest.validateRecords(version, records); } } - return new ProduceRequest(version, acks, timeout, partitionRecords, transactionalId); + + List<ProduceRequestData.TopicProduceData> tpd = partitionRecords Review comment: I wonder if we would get any benefit computing `partitionSizes` during this pass. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -517,19 +517,23 @@ class KafkaApis(val requestChannel: RequestChannel, } // Note that authorization to a transactionalId implies ProducerId authorization - } else if (produceRequest.hasIdempotentRecords && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { + } else if (RequestUtils.hasIdempotentRecords(produceRequest) && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception) return } - val produceRecords = produceRequest.partitionRecordsOrFail.asScala val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() - val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, produceRecords)(_._1.topic) - - for ((topicPartition, memoryRecords) <- produceRecords) { + val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, produceRequest.dataOrException().topicData().asScala)(_.topic()) + + produceRequest.dataOrException().topicData().forEach(topic => topic.data().forEach { partition => + val topicPartition = new TopicPartition(topic.topic(), partition.partition()) Review comment: nit: unnecessary parenthesis (a few of these around here) ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -517,19 +517,23 @@ class KafkaApis(val requestChannel: RequestChannel, } // Note that authorization to a transactionalId implies ProducerId authorization - } else if (produceRequest.hasIdempotentRecords && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { + } else if (RequestUtils.hasIdempotentRecords(produceRequest) && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception) return } - val produceRecords = produceRequest.partitionRecordsOrFail.asScala val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() - val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, produceRecords)(_._1.topic) - - for ((topicPartition, memoryRecords) <- produceRecords) { + val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, produceRequest.dataOrException().topicData().asScala)(_.topic()) Review comment: This logic surprised me a little bit until I realized that we were trying to avoid redundant authorization calls. Might be worth adding a comment since I was almost ready to suggest moving this logic into the loop. ########## File path: clients/src/main/resources/common/message/ProduceRequest.json ########## @@ -33,21 +33,21 @@ "validVersions": "0-8", "flexibleVersions": "none", "fields": [ - { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId", + { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "ignorable": true, "entityType": "transactionalId", "about": "The transactional ID, or null if the producer is not transactional." }, { "name": "Acks", "type": "int16", "versions": "0+", "about": "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 for the full ISR." }, - { "name": "TimeoutMs", "type": "int32", "versions": "0+", + { "name": "Timeout", "type": "int32", "versions": "0+", Review comment: The names do not get serialized, so I think we can make them whatever we want. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org