dajac commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r521902936
########## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ########## @@ -194,7 +106,27 @@ private ProduceRequest build(short version, boolean validate) { ProduceRequest.validateRecords(version, records); } } - return new ProduceRequest(version, acks, timeout, partitionRecords, transactionalId); + + List<ProduceRequestData.TopicProduceData> tpd = partitionRecords Review comment: I wonder if we could avoid all of this by requesting the `Sender` to create `TopicProduceData` directly. It seems that the `Sender` creates `partitionRecords` right before calling the builder: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L734. So we may be able to directly construct the expect data structure there. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ########## @@ -323,27 +222,30 @@ public String toString(boolean verbose) { @Override public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) { /* In case the producer doesn't actually want any response */ - if (acks == 0) - return null; - + if (acks == 0) return null; Errors error = Errors.forException(e); - Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>(); - ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(error); - - for (TopicPartition tp : partitions()) - responseMap.put(tp, partitionResponse); - - return new ProduceResponse(responseMap, throttleTimeMs); + return new ProduceResponse(new ProduceResponseData() + .setResponses(partitionSizes().keySet().stream().collect(Collectors.groupingBy(TopicPartition::topic)).entrySet() + .stream() + .map(entry -> new ProduceResponseData.TopicProduceResponse() + .setPartitionResponses(entry.getValue().stream().map(p -> new ProduceResponseData.PartitionProduceResponse() + .setIndex(p.partition()) + .setRecordErrors(Collections.emptyList()) + .setBaseOffset(INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(INVALID_OFFSET) + .setErrorMessage(e.getMessage()) + .setErrorCode(error.code())) + .collect(Collectors.toList())) + .setName(entry.getKey())) + .collect(Collectors.toList())) + .setThrottleTimeMs(throttleTimeMs)); Review comment: It seems that we could create `ProduceResponseData` based on `data`. This avoids the cost of the group-by operation and the cost of constructing `partitionSizes`. That should bring the benchmark inline with what we had before. Would this work? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ########## @@ -204,118 +75,79 @@ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) { * @param throttleTimeMs Time in milliseconds the response was throttled */ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) { - this.responses = responses; - this.throttleTimeMs = throttleTimeMs; + this(new ProduceResponseData() + .setResponses(responses.entrySet() Review comment: As we care of performances here, I wonder if we should try not using the stream api here. Another trick would be to turn `TopicProduceResponse` in the `ProduceResponse` schema into a map by setting `"mapKey": true` for the topic name. This would allow to iterate over `responses`, get or create `TopicProduceResponse` for the topic, and add the `PartitionProduceResponse` into it. It may be worth trying different implementation to compare their performances. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org