hachikuji commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r605971389



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -181,20 +212,36 @@ public boolean isDone() {
      *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being 
used
-     * @param exception The exception that occurred (or null if the request 
was successful)
+     * @param topLevelException The exception that occurred (or null if the 
request was successful)
+     * @param recordExceptionMap Record exception map keyed by batchIndex 
which overrides `topLevelException`
+     *                           for records present in the map
      * @return true if the batch was completed successfully and false if the 
batch was previously aborted
      */
-    public boolean done(long baseOffset, long logAppendTime, RuntimeException 
exception) {
-        final FinalState tryFinalState = (exception == null) ? 
FinalState.SUCCEEDED : FinalState.FAILED;
-
+    private boolean done(
+        long baseOffset,
+        long logAppendTime,
+        RuntimeException topLevelException,
+        Map<Integer, RuntimeException> recordExceptionMap
+    ) {
+        final FinalState tryFinalState = (topLevelException == null) ? 
FinalState.SUCCEEDED : FinalState.FAILED;
         if (tryFinalState == FinalState.SUCCEEDED) {
             log.trace("Successfully produced messages to {} with base offset 
{}.", topicPartition, baseOffset);
         } else {
-            log.trace("Failed to produce messages to {} with base offset {}.", 
topicPartition, baseOffset, exception);
+            log.trace("Failed to produce messages to {} with base offset {}.", 
topicPartition, baseOffset, topLevelException);
         }
 
         if (this.finalState.compareAndSet(null, tryFinalState)) {
-            completeFutureAndFireCallbacks(baseOffset, logAppendTime, 
exception);
+            Function<Integer, RuntimeException> recordExceptions = null;
+            if (topLevelException != null) {
+                recordExceptions = batchIndex -> {
+                    if (recordExceptionMap == null) {
+                        return topLevelException;
+                    } else {
+                        return recordExceptionMap.getOrDefault(batchIndex, 
topLevelException);

Review comment:
       This seems related to your other question. The top-level (i.e. 
partition-level) error code is really the only one we get from the response. At 
the record level, all we have is an error message.  The partition-level error 
should be one of either `InvalidRecordException` or `InvalidTimestampException` 
based on the current implementation in `LogValidator`:
   ```scala
       if (recordErrors.nonEmpty) {
         val errors = recordErrors.map(_.recordError)
         if (recordErrors.exists(_.apiError == Errors.INVALID_TIMESTAMP)) {
           throw new RecordValidationException(new InvalidTimestampException(
             "One or more records have been rejected due to invalid 
timestamp"), errors)
         } else {
           throw new RecordValidationException(new InvalidRecordException(
             "One or more records have been rejected"), errors)
         }
       }
   ```
   The invalid timestamp case seems problematic. The problem is that we don't 
have a way to tell whether an individual record error is actually an instance 
of this error.
   
   Given the current situation, here's what I'm thinking:
   
   1. If there are record errors present, the client will always raise them as 
InvalidRecordException. The best we can do is make sure the partition-level 
error gets logged so at least it ends up somewhere.
   2. If there are no record errors present, then we will raise the 
partition-level exception for each record.
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to