cadonna commented on code in PR #16222:
URL: https://github.com/apache/kafka/pull/16222#discussion_r1629028760


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -88,6 +88,7 @@ public RecordCollectorImpl(final LogContext logContext,
         this.log = logContext.logger(getClass());
         this.taskId = taskId;
         this.streamsProducer = streamsProducer;
+        this.sendException = streamsProducer.sendException();

Review Comment:
   That is basically the fix.
   
   Notice that now an exception caused by one task can be thrown by a different 
task. For example:
   
   ```java
   2024-05-30 10:20:35,916] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Error flushing caches of 
dirty task 0_0 (org.apache.kafka.streams.processor.internals.TaskManager)
   org.apache.kafka.streams.errors.TaskMigratedException: Error encountered 
sending record to topic stream-soak-test-network-id-repartition for task 1_1 
due to:
   org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
   Written offsets would not be recorded and no more records would be sent 
since the producer is fenced, indicating the task may be migrated out; it means 
all tasks belonging to this thread should be migrated.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:305)
   ```
   Task `0_0` throws error caused by `1_1`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to