dajac commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r521902936



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##########
@@ -194,7 +106,27 @@ private ProduceRequest build(short version, boolean 
validate) {
                     ProduceRequest.validateRecords(version, records);
                 }
             }
-            return new ProduceRequest(version, acks, timeout, 
partitionRecords, transactionalId);
+
+            List<ProduceRequestData.TopicProduceData> tpd = partitionRecords

Review comment:
       I wonder if we could avoid all of this by requesting the `Sender` to 
create `TopicProduceData` directly. It seems that the `Sender` creates 
`partitionRecords` right before calling the builder: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L734.
 So we may be able to directly construct the expect data structure there.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##########
@@ -323,27 +222,30 @@ public String toString(boolean verbose) {
     @Override
     public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         /* In case the producer doesn't actually want any response */
-        if (acks == 0)
-            return null;
-
+        if (acks == 0) return null;
         Errors error = Errors.forException(e);
-        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = 
new HashMap<>();
-        ProduceResponse.PartitionResponse partitionResponse = new 
ProduceResponse.PartitionResponse(error);
-
-        for (TopicPartition tp : partitions())
-            responseMap.put(tp, partitionResponse);
-
-        return new ProduceResponse(responseMap, throttleTimeMs);
+        return new ProduceResponse(new ProduceResponseData()
+            
.setResponses(partitionSizes().keySet().stream().collect(Collectors.groupingBy(TopicPartition::topic)).entrySet()
+                .stream()
+                .map(entry -> new ProduceResponseData.TopicProduceResponse()
+                    .setPartitionResponses(entry.getValue().stream().map(p -> 
new ProduceResponseData.PartitionProduceResponse()
+                        .setIndex(p.partition())
+                        .setRecordErrors(Collections.emptyList())
+                        .setBaseOffset(INVALID_OFFSET)
+                        .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
+                        .setLogStartOffset(INVALID_OFFSET)
+                        .setErrorMessage(e.getMessage())
+                        .setErrorCode(error.code()))
+                        .collect(Collectors.toList()))
+                    .setName(entry.getKey()))
+                .collect(Collectors.toList()))
+            .setThrottleTimeMs(throttleTimeMs));

Review comment:
       It seems that we could create `ProduceResponseData` based on `data`. 
This avoids the cost of the group-by operation and the cost of constructing 
`partitionSizes`. That should bring the benchmark inline with what we had 
before. Would this work?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
##########
@@ -204,118 +75,79 @@ public ProduceResponse(Map<TopicPartition, 
PartitionResponse> responses) {
      * @param throttleTimeMs Time in milliseconds the response was throttled
      */
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, 
int throttleTimeMs) {
-        this.responses = responses;
-        this.throttleTimeMs = throttleTimeMs;
+        this(new ProduceResponseData()
+            .setResponses(responses.entrySet()

Review comment:
       As we care of performances here, I wonder if we should try not using the 
stream api here.
   
   Another trick would be to  turn `TopicProduceResponse` in the 
`ProduceResponse` schema into a map by setting `"mapKey": true` for the topic 
name. This would allow to iterate over `responses`, get or create 
`TopicProduceResponse` for the topic, and add the `PartitionProduceResponse` 
into it.
   
   It may be worth trying different implementation to compare their 
performances.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to