hachikuji commented on a change in pull request #10445: URL: https://github.com/apache/kafka/pull/10445#discussion_r605879590
########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ########## @@ -689,30 +680,57 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons transactionManager.handleCompletedBatch(batch, response); } - if (batch.done(response.baseOffset, response.logAppendTime, null)) { + if (batch.complete(response.baseOffset, response.logAppendTime)) { maybeRemoveAndDeallocateBatch(batch); } } private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, - RuntimeException exception, boolean adjustSequenceNumbers) { - failBatch(batch, response.baseOffset, response.logAppendTime, exception, adjustSequenceNumbers); + final RuntimeException topLevelException; + if (response.error == Errors.TOPIC_AUTHORIZATION_FAILED) + topLevelException = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic())); + else if (response.error == Errors.CLUSTER_AUTHORIZATION_FAILED) + topLevelException = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends"); + else + topLevelException = response.error.exception(response.errorMessage); + + if (response.recordErrors == null || response.recordErrors.isEmpty()) { + failBatch(batch, topLevelException, adjustSequenceNumbers); + } else { + Map<Integer, RuntimeException> recordErrorMap = new HashMap<>(response.recordErrors.size()); + for (ProduceResponse.RecordError recordError : response.recordErrors) { + if (recordError.message != null) { + recordErrorMap.put(recordError.batchIndex, response.error.exception(recordError.message)); Review comment: I think the expectation is that the error code returned at the partition level is meaningful for the record-level errors. The one we're really expecting here is INVALID_RECORD, right? I guess the alternative is to discard the partition error and raise InvalidRecordException directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org