This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 917d695322f KAFKA-17019 Producer TimeoutException should include root 
cause (#20159)
917d695322f is described below

commit 917d695322f3d307ea3a3ae2d066acaa1678732f
Author: ChickenchickenLove <[email protected]>
AuthorDate: Wed Dec 3 21:55:36 2025 +0900

    KAFKA-17019 Producer TimeoutException should include root cause (#20159)
    
    ### Changes
    - Add new Exception class `PotentialCauseException`.
    - All `org.apache.kafka.common.errors.TimeoutException` in
    `KafkaProducer` has `PotentialCauseException` as  root cause if it
    cannot catch any exception.
    
    ### Describe
    `TimeoutException` can be thrown for various reasons.
    However, it is often difficult to identify the root cause,
    Because there are so many potential factors that can lead to a
    `TimeoutException`.
    
    For example:
    1. The `ProducerClient` might be busy, so it may not be able to send the
    request in time. As a result, some batches may expire, leading to a
    `TimeoutException`.
    2. The `broker` might be unavailable due to network issues or internal
    failures.
    3. A request may be in flight, and although the broker successfully
    handles and responds to it, the response might arrive slightly late.
    
    As shown above, there are many possible causes.  In some cases, no
    `exception` is caught in the `catch` block, and a `TimeoutException` is
    thrown simply by comparing the `elapsed time`.  However, the developer
    using `TimeoutException` in `KafkaProducer` likely already knows which
    specific reasons could cause it in that context.  Therefore, I think it
    would be helpful to include a `PotentialCauseException` that reflects
    the likely reason, based on the developer’s knowledge.
    
    Reviewers: TengYao Chi <[email protected]>, Yung
     <[email protected]>, Andrew Schofield
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      |  20 +++-
 .../kafka/clients/producer/MockProducer.java       |   2 +-
 .../kafka/clients/producer/internals/Sender.java   |   3 +-
 .../internals/TransactionalRequestResult.java      |   8 +-
 .../kafka/clients/producer/KafkaProducerTest.java  |  14 ++-
 .../clients/producer/internals/SenderTest.java     |  21 ++--
 .../producer/internals/TransactionManagerTest.java | 110 +++++++++++++--------
 .../test/java/org/apache/kafka/test/TestUtils.java |   9 ++
 8 files changed, 123 insertions(+), 64 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 9672b595e36..0ea2e66f57d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -248,6 +248,18 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
     public static final String NETWORK_THREAD_PREFIX = 
"kafka-producer-network-thread";
     public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
 
+    private static final String INIT_TXN_TIMEOUT_MSG = "InitTransactions timed 
out — " +
+            "did not complete coordinator discovery or " +
+            "receive the InitProducerId response within max.block.ms.";
+
+    private static final String SEND_OFFSETS_TIMEOUT_MSG =
+            "SendOffsetsToTransaction timed out – did not reach the 
coordinator or " +
+                    "receive the TxnOffsetCommit/AddOffsetsToTxn response 
within max.block.ms";
+    private static final String COMMIT_TXN_TIMEOUT_MSG =
+            "CommitTransaction timed out – did not complete EndTxn with the 
transaction coordinator within max.block.ms";
+    private static final String ABORT_TXN_TIMEOUT_MSG =
+            "AbortTransaction timed out – did not complete EndTxn(abort) with 
the transaction coordinator within max.block.ms";
+    
     private final String clientId;
     // Visible for testing
     final Metrics metrics;
@@ -672,7 +684,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         long now = time.nanoseconds();
         TransactionalRequestResult result = 
transactionManager.initializeTransactions(keepPreparedTxn);
         sender.wakeup();
-        result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+        result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, 
INIT_TXN_TIMEOUT_MSG);
         producerMetrics.recordInit(time.nanoseconds() - now);
         transactionManager.maybeUpdateTransactionV2Enabled(true);
     }
@@ -761,7 +773,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             long start = time.nanoseconds();
             TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
             sender.wakeup();
-            result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+            result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, 
SEND_OFFSETS_TIMEOUT_MSG);
             producerMetrics.recordSendOffsets(time.nanoseconds() - start);
         }
     }
@@ -847,7 +859,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         long commitStart = time.nanoseconds();
         TransactionalRequestResult result = transactionManager.beginCommit();
         sender.wakeup();
-        result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+        result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, 
COMMIT_TXN_TIMEOUT_MSG);
         producerMetrics.recordCommitTxn(time.nanoseconds() - commitStart);
     }
 
@@ -882,7 +894,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         long abortStart = time.nanoseconds();
         TransactionalRequestResult result = transactionManager.beginAbort();
         sender.wakeup();
-        result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+        result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, 
ABORT_TXN_TIMEOUT_MSG);
         producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 3e5cb9f5d5a..877f43834f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -424,7 +424,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
             if (injectTimeoutExceptionCounter > 0) {
                 --injectTimeoutExceptionCounter;
             }
-            throw new TimeoutException();
+            throw new TimeoutException("TimeoutExceptions are successfully 
injected for test.");
         }
 
         return clientInstanceId;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 983c5c9ba48..f9ac810ca1c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -414,7 +414,8 @@ public class Sender implements Runnable {
             log.trace("Expired {} batches in accumulator", 
expiredBatches.size());
         for (ProducerBatch expiredBatch : expiredBatches) {
             String errorMessage = "Expiring " + expiredBatch.recordCount + " 
record(s) for " + expiredBatch.topicPartition
-                + ":" + (now - expiredBatch.createdMs) + " ms has passed since 
batch creation";
+                + ":" + (now - expiredBatch.createdMs) + " ms has passed since 
batch creation. " 
+                + "The request has not been sent, or no server response has 
been received yet.";
             failBatch(expiredBatch, new TimeoutException(errorMessage), false);
             if (transactionManager != null && expiredBatch.inRetry()) {
                 // This ensures that no new batches are drained until the 
current in flight batches are fully resolved.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
index 6739da81527..a48b22367b0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
@@ -47,16 +47,12 @@ public final class TransactionalRequestResult {
         this.latch.countDown();
     }
 
-    public void await() {
-        this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-    }
-
-    public void await(long timeout, TimeUnit unit) {
+    public void await(long timeout, TimeUnit unit, String 
expectedTimeoutReason) {
         try {
             boolean success = latch.await(timeout, unit);
             if (!success) {
                 throw new TimeoutException("Timeout expired after " + 
unit.toMillis(timeout) +
-                    "ms while awaiting " + operation);
+                    "ms while awaiting " + operation + ". " + 
expectedTimeoutReason);
             }
 
             isAcked = true;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 22bccd72743..5d15229a838 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -174,6 +174,12 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class KafkaProducerTest {
+
+    private static final String INIT_TXN_TIMEOUT_MSG =
+            "InitTransactions timed out — " +
+                    "did not complete coordinator discovery or " +
+                    "receive the InitProducerId response within max.block.ms.";
+    
     private final String topic = "topic";
     private final Collection<Node> nodes = Collections.singletonList(NODE);
     private final Cluster emptyCluster = new Cluster(
@@ -1322,7 +1328,7 @@ public class KafkaProducerTest {
                 "Timed out while waiting for expected `InitProducerId` request 
to be sent");
 
             time.sleep(maxBlockMs);
-            TestUtils.assertFutureThrows(TimeoutException.class, future);
+            
TestUtils.assertFutureThrowsWithMessageContaining(TimeoutException.class, 
future, INIT_TXN_TIMEOUT_MSG);
 
             client.respond(initProducerIdResponse(1L, (short) 5, Errors.NONE));
 
@@ -1352,7 +1358,8 @@ public class KafkaProducerTest {
                     ((FindCoordinatorRequest) request).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
                 FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"bad-transaction", NODE));
 
-            assertThrows(TimeoutException.class, producer::initTransactions);
+            var timeoutEx = assertThrows(TimeoutException.class, 
producer::initTransactions);
+            assertTrue(timeoutEx.getMessage().contains(INIT_TXN_TIMEOUT_MSG));
 
             client.prepareResponse(
                 request -> request instanceof FindCoordinatorRequest &&
@@ -2364,7 +2371,8 @@ public class KafkaProducerTest {
 
         Producer<String, String> producer = kafkaProducer(configs, new 
StringSerializer(), new StringSerializer(),
                 metadata, client, null, time);
-        assertThrows(TimeoutException.class, producer::initTransactions);
+        var timeoutEx1 = assertThrows(TimeoutException.class, 
producer::initTransactions);
+        assertTrue(timeoutEx1.getMessage().contains(INIT_TXN_TIMEOUT_MSG));
         // other transactional operations should not be allowed if we catch 
the error after initTransactions failed
         try {
             assertThrows(IllegalStateException.class, 
producer::beginTransaction);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index cd984ac2a34..a7ff3249615 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -163,6 +163,7 @@ public class SenderTest {
             TOPIC_NAME, TOPIC_ID,
             "testSplitBatchAndSend", Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1")
     );
+    private static final String SENDER_TIMEOUT_MSG = "The request has not been 
sent, or no server response has been received yet.";
     private final TopicPartition tp0 = new TopicPartition(TOPIC_NAME, 0);
     private final TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 1);
     private final TopicPartition tp2 = new TopicPartition(TOPIC_NAME, 2);
@@ -425,6 +426,7 @@ public class SenderTest {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception 
exception) {
                 if (exception instanceof TimeoutException) {
+                    
assertTrue(exception.getMessage().contains(SENDER_TIMEOUT_MSG));
                     expiryCallbackCount.incrementAndGet();
                     try {
                         accumulator.append(tp1.topic(), tp1.partition(), 0L, 
key, value,
@@ -2792,7 +2794,7 @@ public class SenderTest {
             runUntil(sender, txnManager::isReady);
 
             assertTrue(commitResult.isSuccessful());
-            commitResult.await();
+            commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
"Unexpected Timed out for transaction commit to completed during the test.");
 
             // Finally, we want to assert that the linger time is still 
effective
             // when the new transaction begins.
@@ -2942,8 +2944,9 @@ public class SenderTest {
 
             sender.forceClose();
             sender.run();
-            assertThrows(KafkaException.class, commitResult::await,
-                "The test expected to throw a KafkaException for forcefully 
closing the sender");
+            assertThrows(KafkaException.class, () -> 
commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+                "The test expected to throw a KafkaException for forcefully 
closing the sender")
+            );
         } finally {
             m.close();
         }
@@ -3154,7 +3157,7 @@ public class SenderTest {
         assertTrue(txnManager::isReady);
 
         assertTrue(result.isSuccessful());
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time 
out during the test.");
 
         txnManager.beginTransaction();
     }
@@ -3193,7 +3196,7 @@ public class SenderTest {
         assertTrue(txnManager::isReady);
 
         assertTrue(result.isSuccessful());
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time 
out during the test.");
 
         txnManager.beginTransaction();
     }
@@ -3232,7 +3235,7 @@ public class SenderTest {
         assertTrue(txnManager::isReady);
 
         assertTrue(result.isSuccessful());
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time 
out during the test.");
 
         txnManager.beginTransaction();
     }
@@ -3264,7 +3267,7 @@ public class SenderTest {
         TransactionalRequestResult commitResult = 
transactionManager.beginCommit();
         sender.runOnce();
         try {
-            commitResult.await(1000, TimeUnit.MILLISECONDS);
+            commitResult.await(1000, TimeUnit.MILLISECONDS, "Unexpected time 
out during the test.");
             fail("Expected abortable error to be thrown for commit");
         } catch (KafkaException e) {
             assertTrue(transactionManager.hasAbortableError());
@@ -3281,7 +3284,7 @@ public class SenderTest {
 
         // Verify the error is converted to KafkaException (not 
TransactionAbortableException)
         try {
-            abortResult.await(1000, TimeUnit.MILLISECONDS);
+            abortResult.await(1000, TimeUnit.MILLISECONDS, "Unexpected time 
out during the test.");
             fail("Expected KafkaException to be thrown");
         } catch (KafkaException e) {
             // Verify TM is in FATAL_ERROR state
@@ -3884,7 +3887,7 @@ public class SenderTest {
         prepareInitProducerResponse(Errors.NONE, 
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
         sender.runOnce();
         assertTrue(transactionManager.hasProducerId());
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time 
out during the test.");
     }
 
     private void prepareFindCoordinatorResponse(Errors error, String txnid) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 15539e40bef..cd2bd3f28b6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -133,6 +133,9 @@ public class TransactionManagerTest {
     private final String transactionalId = "foobar";
     private final int transactionTimeoutMs = 1121;
 
+    private static final String SENDER_TIMEOUT_MSG = "The request has not been 
sent, or no server response has been received yet.";
+    private static final String TEST_TIMEOUT_MSG = "Unexpected time out during 
the test.";
+    
     private final String topic = "test";
     private static final Uuid TOPIC_ID = 
Uuid.fromString("y2J9jXHhfIkQ1wK8mMKXx1");
     private final TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -787,7 +790,7 @@ public class TransactionManagerTest {
         // is reached even though the delivery timeout has expired and the
         // future has completed exceptionally.
         assertTrue(responseFuture1.isDone());
-        TestUtils.assertFutureThrows(TimeoutException.class, responseFuture1);
+        
TestUtils.assertFutureThrowsWithMessageContaining(TimeoutException.class, 
responseFuture1, SENDER_TIMEOUT_MSG);
         assertFalse(transactionManager.hasInFlightRequest());
         assertEquals(1, client.inFlightRequestCount());
 
@@ -970,7 +973,7 @@ public class TransactionManagerTest {
         prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch 
+ 1));
         runUntil(() -> !transactionManager.hasOngoingTransaction());
         runUntil(retryResult::isCompleted);
-        retryResult.await();
+        retryResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
TEST_TIMEOUT_MSG);
         runUntil(retryResult::isAcked);
         assertFalse(transactionManager.hasOngoingTransaction());
 
@@ -1358,7 +1361,9 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.hasFatalError());
         assertInstanceOf(TransactionalIdAuthorizationException.class, 
transactionManager.lastError());
         assertFalse(initPidResult.isSuccessful());
-        assertThrows(TransactionalIdAuthorizationException.class, 
initPidResult::await);
+        assertThrows(TransactionalIdAuthorizationException.class, () -> 
initPidResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+                TEST_TIMEOUT_MSG)
+        );
         assertFatalError(TransactionalIdAuthorizationException.class);
     }
 
@@ -1373,7 +1378,9 @@ public class TransactionManagerTest {
         runUntil(transactionManager::hasError);
         assertTrue(initPidResult.isCompleted());
         assertFalse(initPidResult.isSuccessful());
-        assertThrows(TransactionalIdAuthorizationException.class, 
initPidResult::await);
+        assertThrows(TransactionalIdAuthorizationException.class, () -> 
initPidResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+                TEST_TIMEOUT_MSG)
+        );
         assertAbortableError(TransactionalIdAuthorizationException.class);
     }
 
@@ -1622,7 +1629,7 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasPartitionsToAdd());
         assertFalse(accumulator.hasIncomplete());
         assertTrue(abortResult.isSuccessful());
-        abortResult.await();
+        abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
TEST_TIMEOUT_MSG);
 
         // ensure we can now start a new transaction
 
@@ -1656,7 +1663,10 @@ public class TransactionManagerTest {
         runUntil(() -> transactionManager.transactionContainsPartition(tp0));
 
         TransactionalRequestResult result = transactionManager.beginAbort();
-        assertThrows(TimeoutException.class, () -> result.await(0, 
TimeUnit.MILLISECONDS));
+        var timoutEx = assertThrows(TimeoutException.class, () -> 
+                result.await(0, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG)
+        );
+        assertTrue(timoutEx.getMessage().contains(TEST_TIMEOUT_MSG));
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 
producerId, epoch);
         runUntil(transactionManager::isReady);
@@ -1670,7 +1680,7 @@ public class TransactionManagerTest {
         assertThrows(IllegalStateException.class, () -> 
transactionManager.maybeAddPartition(tp0));
 
         assertSame(result, transactionManager.beginAbort());
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
 
         transactionManager.beginTransaction();
         assertTrue(transactionManager.hasOngoingTransaction());
@@ -1690,8 +1700,9 @@ public class TransactionManagerTest {
         runUntil(() -> transactionManager.transactionContainsPartition(tp0));
 
         TransactionalRequestResult result = transactionManager.beginCommit();
-        assertThrows(TimeoutException.class, () -> result.await(0, 
TimeUnit.MILLISECONDS));
-
+        var timeoutEx = assertThrows(TimeoutException.class, () -> 
result.await(0, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG));
+        assertTrue(timeoutEx.getMessage().contains(TEST_TIMEOUT_MSG));
+        
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
         runUntil(transactionManager::isReady);
         assertTrue(result.isSuccessful());
@@ -1704,7 +1715,7 @@ public class TransactionManagerTest {
         assertThrows(IllegalStateException.class, () -> 
transactionManager.maybeAddPartition(tp0));
 
         assertSame(result, transactionManager.beginCommit());
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
 
         transactionManager.beginTransaction();
         assertTrue(transactionManager.hasOngoingTransaction());
@@ -1717,7 +1728,8 @@ public class TransactionManagerTest {
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
         assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
 
-        assertThrows(TimeoutException.class, () -> result.await(0, 
TimeUnit.MILLISECONDS));
+        var timeoutEx = assertThrows(TimeoutException.class, () -> 
result.await(0, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG));
+        assertTrue(timeoutEx.getMessage().contains(TEST_TIMEOUT_MSG));
 
         prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
         runUntil(transactionManager::hasProducerId);
@@ -1734,7 +1746,7 @@ public class TransactionManagerTest {
         assertThrows(IllegalStateException.class, () -> 
transactionManager.maybeAddPartition(tp0));
 
         assertSame(result, transactionManager.initializeTransactions(false));
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
         assertTrue(result.isAcked());
         assertThrows(IllegalStateException.class, () -> 
transactionManager.initializeTransactions(false));
 
@@ -1773,7 +1785,7 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasPartitionsToAdd());
         assertFalse(accumulator.hasIncomplete());
         assertTrue(result.isSuccessful());
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
 
         // ensure we can now start a new transaction
 
@@ -1837,7 +1849,7 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasPartitionsToAdd());
         assertFalse(accumulator.hasIncomplete());
         assertTrue(abortResult.isSuccessful());
-        abortResult.await();
+        abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
TEST_TIMEOUT_MSG);
 
         // ensure we can now start a new transaction
 
@@ -1989,7 +2001,7 @@ public class TransactionManagerTest {
         prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
         runUntil(transactionManager::hasProducerId);
 
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
         transactionManager.beginTransaction();
 
         // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is 
handled differently here, we substitute.
@@ -2021,7 +2033,7 @@ public class TransactionManagerTest {
         prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
         runUntil(transactionManager::hasProducerId);
 
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
     }
 
     @Test
@@ -2044,7 +2056,9 @@ public class TransactionManagerTest {
 
         runUntil(transactionManager::hasError);
 
-        assertThrows(ProducerFencedException.class, result::await);
+        assertThrows(ProducerFencedException.class, () -> 
result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+                TEST_TIMEOUT_MSG)
+        );
 
         assertThrows(ProducerFencedException.class, () -> 
transactionManager.beginTransaction());
         assertThrows(ProducerFencedException.class, () -> 
transactionManager.beginCommit());
@@ -2139,7 +2153,9 @@ public class TransactionManagerTest {
         runUntil(commitResult::isCompleted);
         runUntil(responseFuture::isDone);
 
-        assertThrows(KafkaException.class, commitResult::await);
+        assertThrows(KafkaException.class, () -> 
commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+                TEST_TIMEOUT_MSG
+        ));
         assertFalse(commitResult.isSuccessful());
         assertTrue(commitResult.isAcked());
 
@@ -2201,7 +2217,9 @@ public class TransactionManagerTest {
 
         runUntil(commitResult::isCompleted);  // commit should be cancelled 
with exception without being sent.
 
-        assertThrows(KafkaException.class, commitResult::await);
+        assertThrows(KafkaException.class, () -> 
commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+                TEST_TIMEOUT_MSG
+        ));
         TestUtils.assertFutureThrows(OutOfOrderSequenceException.class, 
responseFuture);
 
         // Commit is not allowed, so let's abort and try again.
@@ -2859,10 +2877,11 @@ public class TransactionManagerTest {
         runUntil(responseFuture::isDone);
 
         // make sure the produce was expired.
-        assertInstanceOf(
+        var timeoutEx = assertInstanceOf(
             TimeoutException.class,
             assertThrows(ExecutionException.class, 
responseFuture::get).getCause(),
             "Expected to get a TimeoutException since the queued ProducerBatch 
should have been expired");
+        assertTrue(timeoutEx.getMessage().contains(SENDER_TIMEOUT_MSG));
         assertTrue(transactionManager.hasAbortableError());
     }
 
@@ -2907,15 +2926,17 @@ public class TransactionManagerTest {
         runUntil(secondBatchResponse::isDone);
 
         // make sure the produce was expired.
-        assertInstanceOf(
+        var timeoutEx1 = assertInstanceOf(
             TimeoutException.class,
             assertThrows(ExecutionException.class, 
firstBatchResponse::get).getCause(),
             "Expected to get a TimeoutException since the queued ProducerBatch 
should have been expired");
+        assertTrue(timeoutEx1.getMessage().contains(SENDER_TIMEOUT_MSG));
         // make sure the produce was expired.
-        assertInstanceOf(
+        var timeoutEx2 = assertInstanceOf(
             TimeoutException.class,
             assertThrows(ExecutionException.class, 
secondBatchResponse::get).getCause(),
             "Expected to get a TimeoutException since the queued ProducerBatch 
should have been expired");
+        assertTrue(timeoutEx2.getMessage().contains(SENDER_TIMEOUT_MSG));
 
         assertTrue(transactionManager.hasAbortableError());
     }
@@ -2952,13 +2973,17 @@ public class TransactionManagerTest {
         runUntil(responseFuture::isDone);  // We should try to flush the 
produce, but expire it instead without sending anything.
 
         // make sure the produce was expired.
-        assertInstanceOf(
+        var timeoutEx1 = assertInstanceOf(
             TimeoutException.class,
             assertThrows(ExecutionException.class, 
responseFuture::get).getCause(),
             "Expected to get a TimeoutException since the queued ProducerBatch 
should have been expired");
+        assertTrue(timeoutEx1.getMessage().contains(SENDER_TIMEOUT_MSG));
+        
         runUntil(commitResult::isCompleted);  // the commit shouldn't be 
completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't 
succeed since the produce request failed.
-        assertInstanceOf(TimeoutException.class, 
assertThrows(TransactionAbortableException.class, 
commitResult::await).getCause());
+        var timeoutEx2 = assertInstanceOf(TimeoutException.class, 
assertThrows(TransactionAbortableException.class, 
+                () -> commitResult.await(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG)).getCause());
+        assertTrue(timeoutEx2.getMessage().contains(SENDER_TIMEOUT_MSG));
 
         assertTrue(transactionManager.hasAbortableError());
         assertTrue(transactionManager.hasOngoingTransaction());
@@ -3024,10 +3049,11 @@ public class TransactionManagerTest {
         runUntil(responseFuture::isDone);  // We should try to flush the 
produce, but expire it instead without sending anything.
 
         // make sure the produce was expired.
-        assertInstanceOf(
+        var timeoutEx = assertInstanceOf(
             TimeoutException.class,
             assertThrows(ExecutionException.class, 
responseFuture::get).getCause(),
             "Expected to get a TimeoutException since the queued ProducerBatch 
should have been expired");
+        assertTrue(timeoutEx.getMessage().contains(SENDER_TIMEOUT_MSG));
         runUntil(commitResult::isCompleted);
         assertFalse(commitResult.isSuccessful());  // the commit should have 
been dropped.
 
@@ -3309,7 +3335,7 @@ public class TransactionManagerTest {
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 
producerId, epoch);
         runUntil(abortResult::isCompleted);
         assertTrue(abortResult.isSuccessful());
-        abortResult.await();
+        abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
TEST_TIMEOUT_MSG);
         assertTrue(transactionManager.isReady());  // make sure we are ready 
for a transaction now.
 
         transactionManager.beginTransaction();
@@ -3377,7 +3403,7 @@ public class TransactionManagerTest {
         prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 
producerId, epoch);
         runUntil(abortResult::isCompleted);
         assertTrue(abortResult.isSuccessful());
-        abortResult.await();
+        abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
TEST_TIMEOUT_MSG);
         assertTrue(transactionManager.isReady());  // make sure we are ready 
for a transaction now.
 
         transactionManager.beginTransaction();
@@ -3425,7 +3451,7 @@ public class TransactionManagerTest {
 
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
-        abortResult.await();
+        abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
TEST_TIMEOUT_MSG);
         assertTrue(transactionManager.isReady());  // make sure we are ready 
for a transaction now.
 
         transactionManager.beginTransaction();
@@ -3472,7 +3498,7 @@ public class TransactionManagerTest {
 
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
-        abortResult.await();
+        abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
TEST_TIMEOUT_MSG);
         assertTrue(transactionManager.isReady());  // make sure we are ready 
for a transaction now.
 
         transactionManager.beginTransaction();
@@ -3531,7 +3557,7 @@ public class TransactionManagerTest {
 
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
-        abortResult.await();
+        abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
TEST_TIMEOUT_MSG);
         assertTrue(transactionManager.isReady());  // make sure we are ready 
for a transaction now.
 
         transactionManager.beginTransaction();
@@ -3863,7 +3889,7 @@ public class TransactionManagerTest {
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
         runUntil(() -> !transactionManager.hasOngoingTransaction());
         runUntil(retryResult::isCompleted);
-        retryResult.await();
+        retryResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
TEST_TIMEOUT_MSG);
         runUntil(retryResult::isAcked);
         assertFalse(transactionManager.hasOngoingTransaction());
     }
@@ -3879,7 +3905,9 @@ public class TransactionManagerTest {
         runUntil(transactionManager::hasError);
         assertTrue(initPidResult.isCompleted());
         assertFalse(initPidResult.isSuccessful());
-        assertThrows(TransactionAbortableException.class, 
initPidResult::await);
+        assertThrows(TransactionAbortableException.class, () -> 
initPidResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+                TEST_TIMEOUT_MSG
+        ));
         assertAbortableError(TransactionAbortableException.class);
     }
 
@@ -3939,7 +3967,9 @@ public class TransactionManagerTest {
         runUntil(commitResult::isCompleted);
         runUntil(responseFuture::isDone);
 
-        assertThrows(KafkaException.class, commitResult::await);
+        assertThrows(KafkaException.class, () -> 
commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+                TEST_TIMEOUT_MSG
+        ));
         assertFalse(commitResult.isSuccessful());
         assertTrue(commitResult.isAcked());
 
@@ -4012,7 +4042,7 @@ public class TransactionManagerTest {
         prepareEndTxnResponse(Errors.NONE, firstTransactionResult, producerId, 
epoch, producerId, epoch, true);
         runUntil(() -> !client.hasPendingResponses());
         assertFalse(result.isCompleted());
-        assertThrows(TimeoutException.class, () -> 
result.await(MAX_BLOCK_TIMEOUT, TimeUnit.MILLISECONDS));
+        assertThrows(TimeoutException.class, () -> 
result.await(MAX_BLOCK_TIMEOUT, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG));
 
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> !client.hasPendingResponses());
@@ -4102,8 +4132,8 @@ public class TransactionManagerTest {
         
         runUntil(transactionManager::hasProducerId);
         transactionManager.maybeUpdateTransactionV2Enabled(true);
-        
-        result.await();
+
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
         assertTrue(result.isSuccessful());
         
         // Verify transaction manager transitioned to PREPARED_TRANSACTION 
state
@@ -4143,8 +4173,8 @@ public class TransactionManagerTest {
         
         runUntil(transactionManager::hasProducerId);
         transactionManager.maybeUpdateTransactionV2Enabled(true);
-        
-        result.await();
+
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
         assertTrue(result.isSuccessful());
         
         // Verify transaction manager transitioned to READY state (not 
PREPARED_TRANSACTION)
@@ -4490,7 +4520,7 @@ public class TransactionManagerTest {
         runUntil(transactionManager::hasProducerId);
         transactionManager.maybeUpdateTransactionV2Enabled(true);
 
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
         assertTrue(result.isSuccessful());
         assertTrue(result.isAcked());
     }
@@ -4532,7 +4562,7 @@ public class TransactionManagerTest {
         runUntil(transactionManager::hasProducerId);
         transactionManager.maybeUpdateTransactionV2Enabled(true);
 
-        result.await();
+        result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
         assertTrue(result.isSuccessful());
         assertTrue(result.isAcked());
     }
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 078d006e37a..c27e1259f86 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -620,6 +620,15 @@ public class TestUtils {
         assertEquals(expectedMessage, receivedException.getMessage());
     }
 
+    public static <T extends Throwable> void 
assertFutureThrowsWithMessageContaining(
+            Class<T> expectedCauseClassApiException,
+            Future<?> future,
+            String expectedMessage
+    ) {
+        T receivedException = 
assertFutureThrows(expectedCauseClassApiException, future);
+        assertTrue(receivedException.getMessage().contains(expectedMessage));
+    }
+
     public static ApiKeys apiKeyFrom(NetworkReceive networkReceive) {
         return 
RequestHeader.parse(networkReceive.payload().duplicate()).apiKey();
     }


Reply via email to