dajac commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r525965065
########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ########## @@ -560,13 +562,24 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId); // if we have a response, parse it if (response.hasResponse()) { + // Sender should exercise PartitionProduceResponse rather than ProduceResponse.PartitionResponse + // https://issues.apache.org/jira/browse/KAFKA-10696 ProduceResponse produceResponse = (ProduceResponse) response.responseBody(); - for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) { - TopicPartition tp = entry.getKey(); - ProduceResponse.PartitionResponse partResp = entry.getValue(); + produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { + TopicPartition tp = new TopicPartition(r.name(), p.index()); + ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse( + Errors.forCode(p.errorCode()), + p.baseOffset(), + p.logAppendTimeMs(), + p.logStartOffset(), + p.recordErrors() + .stream() + .map(e -> new ProduceResponse.RecordError(e.batchIndex(), e.batchIndexErrorMessage())) + .collect(Collectors.toList()), Review comment: nit: As we got rid of the streaming api in this section, would it make sense to also remove this one? ########## File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ########## @@ -198,8 +202,9 @@ private void checkSimpleRequestResponse(NetworkClient networkClient) { ResponseHeader respHeader = new ResponseHeader(request.correlationId(), request.apiKey().responseHeaderVersion(PRODUCE.latestVersion())); - Struct resp = new Struct(PRODUCE.responseSchema(PRODUCE.latestVersion())); - resp.set("responses", new Object[0]); + Struct resp = new ProduceResponseData() + .setThrottleTimeMs(100) + .toStruct(ProduceResponseData.HIGHEST_SUPPORTED_VERSION); Review comment: nit: It may be better to use `PRODUCE.latestVersion()` to stay inline with L204. There are few cases like this in the file. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java ########## @@ -48,4 +53,41 @@ public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) { buffer.rewind(); return buffer; } -} + + // visible for testing + public static boolean hasIdempotentRecords(ProduceRequest request) { + return flags(request).getKey(); + } + + // visible for testing + public static boolean hasTransactionalRecords(ProduceRequest request) { + return flags(request).getValue(); + } + + /** + * Get both hasIdempotentRecords flag and hasTransactionalRecords flag from produce request. + * Noted that we find all flags at once to avoid duplicate loop and record batch construction. + * @return first flag is "hasIdempotentRecords" and another is "hasTransactionalRecords" + */ + public static AbstractMap.SimpleEntry<Boolean, Boolean> flags(ProduceRequest request) { + boolean hasIdempotentRecords = false; + boolean hasTransactionalRecords = false; + for (ProduceRequestData.TopicProduceData tpd : request.dataOrException().topicData()) { + for (ProduceRequestData.PartitionProduceData ppd : tpd.partitionData()) { + BaseRecords records = ppd.records(); + if (records instanceof Records) { + Iterator<? extends RecordBatch> iterator = ((Records) records).batches().iterator(); + if (iterator.hasNext()) { + RecordBatch batch = iterator.next(); + hasIdempotentRecords = hasIdempotentRecords || batch.hasProducerId(); + hasTransactionalRecords = hasTransactionalRecords || batch.isTransactional(); + } + } + // return early + if (hasIdempotentRecords && hasTransactionalRecords) + return new AbstractMap.SimpleEntry<>(true, true); + } + } + return new AbstractMap.SimpleEntry<>(hasIdempotentRecords, hasTransactionalRecords); + } +} Review comment: nit: Add a new line. ########## File path: clients/src/main/resources/common/message/ProduceRequest.json ########## @@ -33,21 +33,21 @@ "validVersions": "0-8", "flexibleVersions": "none", "fields": [ - { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId", + { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId", Review comment: I wonder if this one should be `ignorable`. It seems that we were ignoring it before when it was not present in the target version: ``` struct.setIfExists(NULLABLE_TRANSACTIONAL_ID, transactionalId) ``` ########## File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ########## @@ -154,8 +155,11 @@ public void testClose() { client.poll(1, time.milliseconds()); assertTrue("The client should be ready", client.isReady(node, time.milliseconds())); - ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, - Collections.emptyMap()); + ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection()) + .setAcks((short) 1) + .setTimeoutMs(1000) + .setTransactionalId(null)); Review comment: nit: It seem that `TransactionalId` is `null` by default so we don't have to set it explicitly all the time. There are few cases in the this file and in others. I am not sure if this was intentional so I am also fine if you want to keep them. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java ########## @@ -192,16 +258,21 @@ public void testMixedTransactionalData() { final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, new SimpleRecord("bar".getBytes())); - final Map<TopicPartition, MemoryRecords> recordsByPartition = new LinkedHashMap<>(); - recordsByPartition.put(new TopicPartition("foo", 0), txnRecords); - recordsByPartition.put(new TopicPartition("foo", 1), nonTxnRecords); - - final ProduceRequest.Builder builder = ProduceRequest.Builder.forMagic(RecordVersion.current().value, (short) -1, 5000, - recordsByPartition, transactionalId); + ProduceRequest.Builder builder = ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE, + new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList( + new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(txnRecords))), + new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(nonTxnRecords)))) + .iterator())) + .setAcks((short) 1) Review comment: nit: I am not sure if this is important or not but we were using `-1` previously. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ########## @@ -203,119 +77,88 @@ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) { * @param responses Produced data grouped by topic-partition * @param throttleTimeMs Time in milliseconds the response was throttled */ + @Deprecated public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) { - this.responses = responses; - this.throttleTimeMs = throttleTimeMs; + this(toData(responses, throttleTimeMs)); } - /** - * Constructor from a {@link Struct}. - */ - public ProduceResponse(Struct struct) { - responses = new HashMap<>(); - for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) { - Struct topicRespStruct = (Struct) topicResponse; - String topic = topicRespStruct.get(TOPIC_NAME); - - for (Object partResponse : topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) { - Struct partRespStruct = (Struct) partResponse; - int partition = partRespStruct.get(PARTITION_ID); - Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE)); - long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); - long logAppendTime = partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ? - partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : RecordBatch.NO_TIMESTAMP; - long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); - - List<RecordError> recordErrors = Collections.emptyList(); - if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) { - Object[] recordErrorsArray = partRespStruct.getArray(RECORD_ERRORS_KEY_NAME); - if (recordErrorsArray.length > 0) { - recordErrors = new ArrayList<>(recordErrorsArray.length); - for (Object indexAndMessage : recordErrorsArray) { - Struct indexAndMessageStruct = (Struct) indexAndMessage; - recordErrors.add(new RecordError( - indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME), - indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD) - )); - } - } - } + @Override + protected Send toSend(String destination, ResponseHeader header, short apiVersion) { + return SendBuilder.buildResponseSend(destination, header, this.data, apiVersion); + } - String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null); - TopicPartition tp = new TopicPartition(topic, partition); - responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, recordErrors, errorMessage)); + private static ProduceResponseData toData(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) { + ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs); + responses.forEach((tp, response) -> { + ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic()); + if (tpr == null) { + tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic()); + data.responses().add(tpr); } - } - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + tpr.partitionResponses() + .add(new ProduceResponseData.PartitionProduceResponse() + .setIndex(tp.partition()) + .setBaseOffset(response.baseOffset) + .setLogStartOffset(response.logStartOffset) + .setLogAppendTimeMs(response.logAppendTime) + .setErrorMessage(response.errorMessage) + .setErrorCode(response.error.code()) + .setRecordErrors(response.recordErrors + .stream() + .map(e -> new ProduceResponseData.BatchIndexAndErrorMessage() + .setBatchIndex(e.batchIndex) + .setBatchIndexErrorMessage(e.message)) + .collect(Collectors.toList()))); Review comment: nit: Should we remove this usage of the stream api here as well? ########## File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java ########## @@ -192,16 +258,21 @@ public void testMixedTransactionalData() { final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, new SimpleRecord("bar".getBytes())); - final Map<TopicPartition, MemoryRecords> recordsByPartition = new LinkedHashMap<>(); - recordsByPartition.put(new TopicPartition("foo", 0), txnRecords); - recordsByPartition.put(new TopicPartition("foo", 1), nonTxnRecords); - - final ProduceRequest.Builder builder = ProduceRequest.Builder.forMagic(RecordVersion.current().value, (short) -1, 5000, - recordsByPartition, transactionalId); + ProduceRequest.Builder builder = ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE, + new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList( + new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(txnRecords))), + new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(nonTxnRecords)))) + .iterator())) + .setAcks((short) 1) + .setTimeoutMs(5000) + .setTransactionalId(null)); Review comment: We were setting `transactionalId` previously. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org