hachikuji commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r605879590



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -689,30 +680,57 @@ private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionRespons
             transactionManager.handleCompletedBatch(batch, response);
         }
 
-        if (batch.done(response.baseOffset, response.logAppendTime, null)) {
+        if (batch.complete(response.baseOffset, response.logAppendTime)) {
             maybeRemoveAndDeallocateBatch(batch);
         }
     }
 
     private void failBatch(ProducerBatch batch,
                            ProduceResponse.PartitionResponse response,
-                           RuntimeException exception,
                            boolean adjustSequenceNumbers) {
-        failBatch(batch, response.baseOffset, response.logAppendTime, 
exception, adjustSequenceNumbers);
+        final RuntimeException topLevelException;
+        if (response.error == Errors.TOPIC_AUTHORIZATION_FAILED)
+            topLevelException = new 
TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
+        else if (response.error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+            topLevelException = new ClusterAuthorizationException("The 
producer is not authorized to do idempotent sends");
+        else
+            topLevelException = 
response.error.exception(response.errorMessage);
+
+        if (response.recordErrors == null || response.recordErrors.isEmpty()) {
+            failBatch(batch, topLevelException, adjustSequenceNumbers);
+        } else {
+            Map<Integer, RuntimeException> recordErrorMap = new 
HashMap<>(response.recordErrors.size());
+            for (ProduceResponse.RecordError recordError : 
response.recordErrors) {
+                if (recordError.message != null) {
+                    recordErrorMap.put(recordError.batchIndex, 
response.error.exception(recordError.message));

Review comment:
       I think the expectation is that the error code returned at the partition 
level is meaningful for the record-level errors. The one we're really expecting 
here is INVALID_RECORD, right? I guess the alternative is to discard the 
partition error and raise InvalidRecordException directly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to