This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8f83592b2a MINOR: improve StreamsProducer error handling (#20058)
c8f83592b2a is described below
commit c8f83592b2acdee0e2aa6993b378ba9e429a1f1e
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Jun 30 15:03:35 2025 -0700
MINOR: improve StreamsProducer error handling (#20058)
StreamProducer may timeout in sendOffsetsToTransaction() or
commitTransaction() call. To distinguish both cases, we should make both
calls in individual try-catch blocks.
Reviewers: Bill Bejeck<[email protected]>
---
.../streams/processor/internals/StreamsProducer.java | 16 ++++++++++++++++
.../streams/processor/internals/StreamsProducerTest.java | 4 ++--
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 1048b5a2ecf..c5fbdd788bc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -247,6 +247,22 @@ public class StreamsProducer {
maybeBeginTransaction();
try {
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
+ } catch (final ProducerFencedException | InvalidProducerEpochException
| CommitFailedException | InvalidPidMappingException error) {
+ throw new TaskMigratedException(
+ formatException("Producer got fenced trying to add offsets to
a transaction"),
+ error
+ );
+ } catch (final TimeoutException timeoutException) {
+ // re-throw to trigger `task.timeout.ms`
+ throw timeoutException;
+ } catch (final KafkaException error) {
+ throw new StreamsException(
+ formatException("Error encountered trying to add offsets to a
transaction"),
+ error
+ );
+ }
+
+ try {
producer.commitTransaction();
transactionInFlight = false;
} catch (final ProducerFencedException | InvalidProducerEpochException
| CommitFailedException | InvalidPidMappingException error) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index 1c084fa63e2..2628ca5b99f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -684,7 +684,7 @@ public class StreamsProducerTest {
assertThat(thrown.getCause(),
is(eosMockProducer.sendOffsetsToTransactionException));
assertThat(
thrown.getMessage(),
- is("Producer got fenced trying to commit a transaction [test];" +
+ is("Producer got fenced trying to add offsets to a transaction
[test];" +
" it means all tasks belonging to this thread should be
migrated.")
);
}
@@ -703,7 +703,7 @@ public class StreamsProducerTest {
assertThat(thrown.getCause(),
is(eosMockProducer.sendOffsetsToTransactionException));
assertThat(
thrown.getMessage(),
- is("Error encountered trying to commit a transaction [test]")
+ is("Error encountered trying to add offsets to a transaction
[test]")
);
}