dajac commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r525965065



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -560,13 +562,24 @@ private void handleProduceResponse(ClientResponse 
response, Map<TopicPartition,
             log.trace("Received produce response from node {} with correlation 
id {}", response.destination(), correlationId);
             // if we have a response, parse it
             if (response.hasResponse()) {
+                // Sender should exercise PartitionProduceResponse rather than 
ProduceResponse.PartitionResponse
+                // https://issues.apache.org/jira/browse/KAFKA-10696
                 ProduceResponse produceResponse = (ProduceResponse) 
response.responseBody();
-                for (Map.Entry<TopicPartition, 
ProduceResponse.PartitionResponse> entry : 
produceResponse.responses().entrySet()) {
-                    TopicPartition tp = entry.getKey();
-                    ProduceResponse.PartitionResponse partResp = 
entry.getValue();
+                produceResponse.data().responses().forEach(r -> 
r.partitionResponses().forEach(p -> {
+                    TopicPartition tp = new TopicPartition(r.name(), 
p.index());
+                    ProduceResponse.PartitionResponse partResp = new 
ProduceResponse.PartitionResponse(
+                            Errors.forCode(p.errorCode()),
+                            p.baseOffset(),
+                            p.logAppendTimeMs(),
+                            p.logStartOffset(),
+                            p.recordErrors()
+                                .stream()
+                                .map(e -> new 
ProduceResponse.RecordError(e.batchIndex(), e.batchIndexErrorMessage()))
+                                .collect(Collectors.toList()),

Review comment:
       nit: As we got rid of the streaming api in this section, would it make 
sense to also remove this one?

##########
File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
##########
@@ -198,8 +202,9 @@ private void checkSimpleRequestResponse(NetworkClient 
networkClient) {
         ResponseHeader respHeader =
             new ResponseHeader(request.correlationId(),
                 
request.apiKey().responseHeaderVersion(PRODUCE.latestVersion()));
-        Struct resp = new 
Struct(PRODUCE.responseSchema(PRODUCE.latestVersion()));
-        resp.set("responses", new Object[0]);
+        Struct resp = new ProduceResponseData()
+                .setThrottleTimeMs(100)
+                .toStruct(ProduceResponseData.HIGHEST_SUPPORTED_VERSION);

Review comment:
       nit: It may be better to use `PRODUCE.latestVersion()` to stay inline 
with L204. There are few cases like this in the file.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
##########
@@ -48,4 +53,41 @@ public static ByteBuffer serialize(Struct headerStruct, 
Struct bodyStruct) {
         buffer.rewind();
         return buffer;
     }
-}
+
+    // visible for testing
+    public static boolean hasIdempotentRecords(ProduceRequest request) {
+        return flags(request).getKey();
+    }
+
+    // visible for testing
+    public static boolean hasTransactionalRecords(ProduceRequest request) {
+        return flags(request).getValue();
+    }
+
+    /**
+     * Get both hasIdempotentRecords flag and hasTransactionalRecords flag 
from produce request.
+     * Noted that we find all flags at once to avoid duplicate loop and record 
batch construction.
+     * @return first flag is "hasIdempotentRecords" and another is 
"hasTransactionalRecords"
+     */
+    public static AbstractMap.SimpleEntry<Boolean, Boolean> 
flags(ProduceRequest request) {
+        boolean hasIdempotentRecords = false;
+        boolean hasTransactionalRecords = false;
+        for (ProduceRequestData.TopicProduceData tpd : 
request.dataOrException().topicData()) {
+            for (ProduceRequestData.PartitionProduceData ppd : 
tpd.partitionData()) {
+                BaseRecords records = ppd.records();
+                if (records instanceof Records) {
+                    Iterator<? extends RecordBatch> iterator = ((Records) 
records).batches().iterator();
+                    if (iterator.hasNext()) {
+                        RecordBatch batch = iterator.next();
+                        hasIdempotentRecords = hasIdempotentRecords || 
batch.hasProducerId();
+                        hasTransactionalRecords = hasTransactionalRecords || 
batch.isTransactional();
+                    }
+                }
+                // return early
+                if (hasIdempotentRecords && hasTransactionalRecords)
+                    return new AbstractMap.SimpleEntry<>(true, true);
+            }
+        }
+        return new AbstractMap.SimpleEntry<>(hasIdempotentRecords, 
hasTransactionalRecords);
+    }
+}

Review comment:
       nit: Add a new line.

##########
File path: clients/src/main/resources/common/message/ProduceRequest.json
##########
@@ -33,21 +33,21 @@
   "validVersions": "0-8",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "0+", "entityType": "transactionalId",
+    { "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "3+", "default": "null", "entityType": "transactionalId",

Review comment:
       I wonder if this one should be `ignorable`. It seems that we were 
ignoring it before when it was not present in the target version:
   ```
   struct.setIfExists(NULLABLE_TRANSACTIONAL_ID, transactionalId)
   ```

##########
File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
##########
@@ -154,8 +155,11 @@ public void testClose() {
         client.poll(1, time.milliseconds());
         assertTrue("The client should be ready", client.isReady(node, 
time.milliseconds()));
 
-        ProduceRequest.Builder builder = 
ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
-                Collections.emptyMap());
+        ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new 
ProduceRequestData()
+                .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection())
+                .setAcks((short) 1)
+                .setTimeoutMs(1000)
+                .setTransactionalId(null));

Review comment:
       nit: It seem that `TransactionalId` is `null` by default so we don't 
have to set it explicitly all the time. There are few cases in the this file 
and in others. I am not sure if this was intentional so I am also fine if you 
want to keep them.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
##########
@@ -192,16 +258,21 @@ public void testMixedTransactionalData() {
         final MemoryRecords txnRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId,
                 producerEpoch, sequence, new SimpleRecord("bar".getBytes()));
 
-        final Map<TopicPartition, MemoryRecords> recordsByPartition = new 
LinkedHashMap<>();
-        recordsByPartition.put(new TopicPartition("foo", 0), txnRecords);
-        recordsByPartition.put(new TopicPartition("foo", 1), nonTxnRecords);
-
-        final ProduceRequest.Builder builder = 
ProduceRequest.Builder.forMagic(RecordVersion.current().value, (short) -1, 5000,
-                recordsByPartition, transactionalId);
+        ProduceRequest.Builder builder = 
ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE,
+                new ProduceRequestData()
+                        .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Arrays.asList(
+                                new 
ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
+                                        new 
ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(txnRecords))),
+                                new 
ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
+                                        new 
ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(nonTxnRecords))))
+                                .iterator()))
+                        .setAcks((short) 1)

Review comment:
       nit: I am not sure if this is important or not but we were using `-1` 
previously.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
##########
@@ -203,119 +77,88 @@ public ProduceResponse(Map<TopicPartition, 
PartitionResponse> responses) {
      * @param responses Produced data grouped by topic-partition
      * @param throttleTimeMs Time in milliseconds the response was throttled
      */
+    @Deprecated
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, 
int throttleTimeMs) {
-        this.responses = responses;
-        this.throttleTimeMs = throttleTimeMs;
+        this(toData(responses, throttleTimeMs));
     }
 
-    /**
-     * Constructor from a {@link Struct}.
-     */
-    public ProduceResponse(Struct struct) {
-        responses = new HashMap<>();
-        for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicRespStruct = (Struct) topicResponse;
-            String topic = topicRespStruct.get(TOPIC_NAME);
-
-            for (Object partResponse : 
topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) {
-                Struct partRespStruct = (Struct) partResponse;
-                int partition = partRespStruct.get(PARTITION_ID);
-                Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE));
-                long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
-                long logAppendTime = 
partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ?
-                        partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : 
RecordBatch.NO_TIMESTAMP;
-                long logStartOffset = 
partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);
-
-                List<RecordError> recordErrors = Collections.emptyList();
-                if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
-                    Object[] recordErrorsArray = 
partRespStruct.getArray(RECORD_ERRORS_KEY_NAME);
-                    if (recordErrorsArray.length > 0) {
-                        recordErrors = new 
ArrayList<>(recordErrorsArray.length);
-                        for (Object indexAndMessage : recordErrorsArray) {
-                            Struct indexAndMessageStruct = (Struct) 
indexAndMessage;
-                            recordErrors.add(new RecordError(
-                                    
indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME),
-                                    
indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD)
-                            ));
-                        }
-                    }
-                }
+    @Override
+    protected Send toSend(String destination, ResponseHeader header, short 
apiVersion) {
+        return SendBuilder.buildResponseSend(destination, header, this.data, 
apiVersion);
+    }
 
-                String errorMessage = 
partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null);
-                TopicPartition tp = new TopicPartition(topic, partition);
-                responses.put(tp, new PartitionResponse(error, offset, 
logAppendTime, logStartOffset, recordErrors, errorMessage));
+    private static ProduceResponseData toData(Map<TopicPartition, 
PartitionResponse> responses, int throttleTimeMs) {
+        ProduceResponseData data = new 
ProduceResponseData().setThrottleTimeMs(throttleTimeMs);
+        responses.forEach((tp, response) -> {
+            ProduceResponseData.TopicProduceResponse tpr = 
data.responses().find(tp.topic());
+            if (tpr == null) {
+                tpr = new 
ProduceResponseData.TopicProduceResponse().setName(tp.topic());
+                data.responses().add(tpr);
             }
-        }
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, 
DEFAULT_THROTTLE_TIME);
+            tpr.partitionResponses()
+                .add(new ProduceResponseData.PartitionProduceResponse()
+                    .setIndex(tp.partition())
+                    .setBaseOffset(response.baseOffset)
+                    .setLogStartOffset(response.logStartOffset)
+                    .setLogAppendTimeMs(response.logAppendTime)
+                    .setErrorMessage(response.errorMessage)
+                    .setErrorCode(response.error.code())
+                    .setRecordErrors(response.recordErrors
+                        .stream()
+                        .map(e -> new 
ProduceResponseData.BatchIndexAndErrorMessage()
+                            .setBatchIndex(e.batchIndex)
+                            .setBatchIndexErrorMessage(e.message))
+                        .collect(Collectors.toList())));

Review comment:
       nit: Should we remove this usage of the stream api here as well?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
##########
@@ -192,16 +258,21 @@ public void testMixedTransactionalData() {
         final MemoryRecords txnRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId,
                 producerEpoch, sequence, new SimpleRecord("bar".getBytes()));
 
-        final Map<TopicPartition, MemoryRecords> recordsByPartition = new 
LinkedHashMap<>();
-        recordsByPartition.put(new TopicPartition("foo", 0), txnRecords);
-        recordsByPartition.put(new TopicPartition("foo", 1), nonTxnRecords);
-
-        final ProduceRequest.Builder builder = 
ProduceRequest.Builder.forMagic(RecordVersion.current().value, (short) -1, 5000,
-                recordsByPartition, transactionalId);
+        ProduceRequest.Builder builder = 
ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE,
+                new ProduceRequestData()
+                        .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Arrays.asList(
+                                new 
ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
+                                        new 
ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(txnRecords))),
+                                new 
ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
+                                        new 
ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(nonTxnRecords))))
+                                .iterator()))
+                        .setAcks((short) 1)
+                        .setTimeoutMs(5000)
+                        .setTransactionalId(null));

Review comment:
       We were setting `transactionalId` previously.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to