This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8f83592b2a MINOR: improve StreamsProducer error handling (#20058)
c8f83592b2a is described below

commit c8f83592b2acdee0e2aa6993b378ba9e429a1f1e
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Jun 30 15:03:35 2025 -0700

    MINOR: improve StreamsProducer error handling (#20058)
    
    StreamProducer may timeout in sendOffsetsToTransaction() or
    commitTransaction() call. To distinguish both cases, we should make both
    calls in individual try-catch blocks.
    
    Reviewers: Bill Bejeck<[email protected]>
---
 .../streams/processor/internals/StreamsProducer.java     | 16 ++++++++++++++++
 .../streams/processor/internals/StreamsProducerTest.java |  4 ++--
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 1048b5a2ecf..c5fbdd788bc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -247,6 +247,22 @@ public class StreamsProducer {
         maybeBeginTransaction();
         try {
             producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
+        } catch (final ProducerFencedException | InvalidProducerEpochException 
| CommitFailedException | InvalidPidMappingException error) {
+            throw new TaskMigratedException(
+                formatException("Producer got fenced trying to add offsets to 
a transaction"),
+                error
+            );
+        } catch (final TimeoutException timeoutException) {
+            // re-throw to trigger `task.timeout.ms`
+            throw timeoutException;
+        } catch (final KafkaException error) {
+            throw new StreamsException(
+                formatException("Error encountered trying to add offsets to a 
transaction"),
+                error
+            );
+        }
+
+        try {
             producer.commitTransaction();
             transactionInFlight = false;
         } catch (final ProducerFencedException | InvalidProducerEpochException 
| CommitFailedException | InvalidPidMappingException error) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index 1c084fa63e2..2628ca5b99f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -684,7 +684,7 @@ public class StreamsProducerTest {
         assertThat(thrown.getCause(), 
is(eosMockProducer.sendOffsetsToTransactionException));
         assertThat(
             thrown.getMessage(),
-            is("Producer got fenced trying to commit a transaction [test];" +
+            is("Producer got fenced trying to add offsets to a transaction 
[test];" +
                    " it means all tasks belonging to this thread should be 
migrated.")
         );
     }
@@ -703,7 +703,7 @@ public class StreamsProducerTest {
         assertThat(thrown.getCause(), 
is(eosMockProducer.sendOffsetsToTransactionException));
         assertThat(
             thrown.getMessage(),
-            is("Error encountered trying to commit a transaction [test]")
+            is("Error encountered trying to add offsets to a transaction 
[test]")
         );
     }
 

Reply via email to