This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 917d695322f KAFKA-17019 Producer TimeoutException should include root
cause (#20159)
917d695322f is described below
commit 917d695322f3d307ea3a3ae2d066acaa1678732f
Author: ChickenchickenLove <[email protected]>
AuthorDate: Wed Dec 3 21:55:36 2025 +0900
KAFKA-17019 Producer TimeoutException should include root cause (#20159)
### Changes
- Add new Exception class `PotentialCauseException`.
- All `org.apache.kafka.common.errors.TimeoutException` in
`KafkaProducer` has `PotentialCauseException` as root cause if it
cannot catch any exception.
### Describe
`TimeoutException` can be thrown for various reasons.
However, it is often difficult to identify the root cause,
Because there are so many potential factors that can lead to a
`TimeoutException`.
For example:
1. The `ProducerClient` might be busy, so it may not be able to send the
request in time. As a result, some batches may expire, leading to a
`TimeoutException`.
2. The `broker` might be unavailable due to network issues or internal
failures.
3. A request may be in flight, and although the broker successfully
handles and responds to it, the response might arrive slightly late.
As shown above, there are many possible causes. In some cases, no
`exception` is caught in the `catch` block, and a `TimeoutException` is
thrown simply by comparing the `elapsed time`. However, the developer
using `TimeoutException` in `KafkaProducer` likely already knows which
specific reasons could cause it in that context. Therefore, I think it
would be helpful to include a `PotentialCauseException` that reflects
the likely reason, based on the developer’s knowledge.
Reviewers: TengYao Chi <[email protected]>, Yung
<[email protected]>, Andrew Schofield
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 20 +++-
.../kafka/clients/producer/MockProducer.java | 2 +-
.../kafka/clients/producer/internals/Sender.java | 3 +-
.../internals/TransactionalRequestResult.java | 8 +-
.../kafka/clients/producer/KafkaProducerTest.java | 14 ++-
.../clients/producer/internals/SenderTest.java | 21 ++--
.../producer/internals/TransactionManagerTest.java | 110 +++++++++++++--------
.../test/java/org/apache/kafka/test/TestUtils.java | 9 ++
8 files changed, 123 insertions(+), 64 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 9672b595e36..0ea2e66f57d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -248,6 +248,18 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
public static final String NETWORK_THREAD_PREFIX =
"kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
+ private static final String INIT_TXN_TIMEOUT_MSG = "InitTransactions timed
out — " +
+ "did not complete coordinator discovery or " +
+ "receive the InitProducerId response within max.block.ms.";
+
+ private static final String SEND_OFFSETS_TIMEOUT_MSG =
+ "SendOffsetsToTransaction timed out – did not reach the
coordinator or " +
+ "receive the TxnOffsetCommit/AddOffsetsToTxn response
within max.block.ms";
+ private static final String COMMIT_TXN_TIMEOUT_MSG =
+ "CommitTransaction timed out – did not complete EndTxn with the
transaction coordinator within max.block.ms";
+ private static final String ABORT_TXN_TIMEOUT_MSG =
+ "AbortTransaction timed out – did not complete EndTxn(abort) with
the transaction coordinator within max.block.ms";
+
private final String clientId;
// Visible for testing
final Metrics metrics;
@@ -672,7 +684,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
long now = time.nanoseconds();
TransactionalRequestResult result =
transactionManager.initializeTransactions(keepPreparedTxn);
sender.wakeup();
- result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
INIT_TXN_TIMEOUT_MSG);
producerMetrics.recordInit(time.nanoseconds() - now);
transactionManager.maybeUpdateTransactionV2Enabled(true);
}
@@ -761,7 +773,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
long start = time.nanoseconds();
TransactionalRequestResult result =
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
- result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
}
@@ -847,7 +859,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
long commitStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
- result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
COMMIT_TXN_TIMEOUT_MSG);
producerMetrics.recordCommitTxn(time.nanoseconds() - commitStart);
}
@@ -882,7 +894,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
long abortStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginAbort();
sender.wakeup();
- result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
ABORT_TXN_TIMEOUT_MSG);
producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 3e5cb9f5d5a..877f43834f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -424,7 +424,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
if (injectTimeoutExceptionCounter > 0) {
--injectTimeoutExceptionCounter;
}
- throw new TimeoutException();
+ throw new TimeoutException("TimeoutExceptions are successfully
injected for test.");
}
return clientInstanceId;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 983c5c9ba48..f9ac810ca1c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -414,7 +414,8 @@ public class Sender implements Runnable {
log.trace("Expired {} batches in accumulator",
expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + "
record(s) for " + expiredBatch.topicPartition
- + ":" + (now - expiredBatch.createdMs) + " ms has passed since
batch creation";
+ + ":" + (now - expiredBatch.createdMs) + " ms has passed since
batch creation. "
+ + "The request has not been sent, or no server response has
been received yet.";
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the
current in flight batches are fully resolved.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
index 6739da81527..a48b22367b0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
@@ -47,16 +47,12 @@ public final class TransactionalRequestResult {
this.latch.countDown();
}
- public void await() {
- this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- }
-
- public void await(long timeout, TimeUnit unit) {
+ public void await(long timeout, TimeUnit unit, String
expectedTimeoutReason) {
try {
boolean success = latch.await(timeout, unit);
if (!success) {
throw new TimeoutException("Timeout expired after " +
unit.toMillis(timeout) +
- "ms while awaiting " + operation);
+ "ms while awaiting " + operation + ". " +
expectedTimeoutReason);
}
isAcked = true;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 22bccd72743..5d15229a838 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -174,6 +174,12 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class KafkaProducerTest {
+
+ private static final String INIT_TXN_TIMEOUT_MSG =
+ "InitTransactions timed out — " +
+ "did not complete coordinator discovery or " +
+ "receive the InitProducerId response within max.block.ms.";
+
private final String topic = "topic";
private final Collection<Node> nodes = Collections.singletonList(NODE);
private final Cluster emptyCluster = new Cluster(
@@ -1322,7 +1328,7 @@ public class KafkaProducerTest {
"Timed out while waiting for expected `InitProducerId` request
to be sent");
time.sleep(maxBlockMs);
- TestUtils.assertFutureThrows(TimeoutException.class, future);
+
TestUtils.assertFutureThrowsWithMessageContaining(TimeoutException.class,
future, INIT_TXN_TIMEOUT_MSG);
client.respond(initProducerIdResponse(1L, (short) 5, Errors.NONE));
@@ -1352,7 +1358,8 @@ public class KafkaProducerTest {
((FindCoordinatorRequest) request).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE,
"bad-transaction", NODE));
- assertThrows(TimeoutException.class, producer::initTransactions);
+ var timeoutEx = assertThrows(TimeoutException.class,
producer::initTransactions);
+ assertTrue(timeoutEx.getMessage().contains(INIT_TXN_TIMEOUT_MSG));
client.prepareResponse(
request -> request instanceof FindCoordinatorRequest &&
@@ -2364,7 +2371,8 @@ public class KafkaProducerTest {
Producer<String, String> producer = kafkaProducer(configs, new
StringSerializer(), new StringSerializer(),
metadata, client, null, time);
- assertThrows(TimeoutException.class, producer::initTransactions);
+ var timeoutEx1 = assertThrows(TimeoutException.class,
producer::initTransactions);
+ assertTrue(timeoutEx1.getMessage().contains(INIT_TXN_TIMEOUT_MSG));
// other transactional operations should not be allowed if we catch
the error after initTransactions failed
try {
assertThrows(IllegalStateException.class,
producer::beginTransaction);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index cd984ac2a34..a7ff3249615 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -163,6 +163,7 @@ public class SenderTest {
TOPIC_NAME, TOPIC_ID,
"testSplitBatchAndSend", Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1")
);
+ private static final String SENDER_TIMEOUT_MSG = "The request has not been
sent, or no server response has been received yet.";
private final TopicPartition tp0 = new TopicPartition(TOPIC_NAME, 0);
private final TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 1);
private final TopicPartition tp2 = new TopicPartition(TOPIC_NAME, 2);
@@ -425,6 +426,7 @@ public class SenderTest {
@Override
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if (exception instanceof TimeoutException) {
+
assertTrue(exception.getMessage().contains(SENDER_TIMEOUT_MSG));
expiryCallbackCount.incrementAndGet();
try {
accumulator.append(tp1.topic(), tp1.partition(), 0L,
key, value,
@@ -2792,7 +2794,7 @@ public class SenderTest {
runUntil(sender, txnManager::isReady);
assertTrue(commitResult.isSuccessful());
- commitResult.await();
+ commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
"Unexpected Timed out for transaction commit to completed during the test.");
// Finally, we want to assert that the linger time is still
effective
// when the new transaction begins.
@@ -2942,8 +2944,9 @@ public class SenderTest {
sender.forceClose();
sender.run();
- assertThrows(KafkaException.class, commitResult::await,
- "The test expected to throw a KafkaException for forcefully
closing the sender");
+ assertThrows(KafkaException.class, () ->
commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+ "The test expected to throw a KafkaException for forcefully
closing the sender")
+ );
} finally {
m.close();
}
@@ -3154,7 +3157,7 @@ public class SenderTest {
assertTrue(txnManager::isReady);
assertTrue(result.isSuccessful());
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time
out during the test.");
txnManager.beginTransaction();
}
@@ -3193,7 +3196,7 @@ public class SenderTest {
assertTrue(txnManager::isReady);
assertTrue(result.isSuccessful());
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time
out during the test.");
txnManager.beginTransaction();
}
@@ -3232,7 +3235,7 @@ public class SenderTest {
assertTrue(txnManager::isReady);
assertTrue(result.isSuccessful());
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time
out during the test.");
txnManager.beginTransaction();
}
@@ -3264,7 +3267,7 @@ public class SenderTest {
TransactionalRequestResult commitResult =
transactionManager.beginCommit();
sender.runOnce();
try {
- commitResult.await(1000, TimeUnit.MILLISECONDS);
+ commitResult.await(1000, TimeUnit.MILLISECONDS, "Unexpected time
out during the test.");
fail("Expected abortable error to be thrown for commit");
} catch (KafkaException e) {
assertTrue(transactionManager.hasAbortableError());
@@ -3281,7 +3284,7 @@ public class SenderTest {
// Verify the error is converted to KafkaException (not
TransactionAbortableException)
try {
- abortResult.await(1000, TimeUnit.MILLISECONDS);
+ abortResult.await(1000, TimeUnit.MILLISECONDS, "Unexpected time
out during the test.");
fail("Expected KafkaException to be thrown");
} catch (KafkaException e) {
// Verify TM is in FATAL_ERROR state
@@ -3884,7 +3887,7 @@ public class SenderTest {
prepareInitProducerResponse(Errors.NONE,
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
sender.runOnce();
assertTrue(transactionManager.hasProducerId());
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, "Unexpected time
out during the test.");
}
private void prepareFindCoordinatorResponse(Errors error, String txnid) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 15539e40bef..cd2bd3f28b6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -133,6 +133,9 @@ public class TransactionManagerTest {
private final String transactionalId = "foobar";
private final int transactionTimeoutMs = 1121;
+ private static final String SENDER_TIMEOUT_MSG = "The request has not been
sent, or no server response has been received yet.";
+ private static final String TEST_TIMEOUT_MSG = "Unexpected time out during
the test.";
+
private final String topic = "test";
private static final Uuid TOPIC_ID =
Uuid.fromString("y2J9jXHhfIkQ1wK8mMKXx1");
private final TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -787,7 +790,7 @@ public class TransactionManagerTest {
// is reached even though the delivery timeout has expired and the
// future has completed exceptionally.
assertTrue(responseFuture1.isDone());
- TestUtils.assertFutureThrows(TimeoutException.class, responseFuture1);
+
TestUtils.assertFutureThrowsWithMessageContaining(TimeoutException.class,
responseFuture1, SENDER_TIMEOUT_MSG);
assertFalse(transactionManager.hasInFlightRequest());
assertEquals(1, client.inFlightRequestCount());
@@ -970,7 +973,7 @@ public class TransactionManagerTest {
prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch
+ 1));
runUntil(() -> !transactionManager.hasOngoingTransaction());
runUntil(retryResult::isCompleted);
- retryResult.await();
+ retryResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
TEST_TIMEOUT_MSG);
runUntil(retryResult::isAcked);
assertFalse(transactionManager.hasOngoingTransaction());
@@ -1358,7 +1361,9 @@ public class TransactionManagerTest {
assertTrue(transactionManager.hasFatalError());
assertInstanceOf(TransactionalIdAuthorizationException.class,
transactionManager.lastError());
assertFalse(initPidResult.isSuccessful());
- assertThrows(TransactionalIdAuthorizationException.class,
initPidResult::await);
+ assertThrows(TransactionalIdAuthorizationException.class, () ->
initPidResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+ TEST_TIMEOUT_MSG)
+ );
assertFatalError(TransactionalIdAuthorizationException.class);
}
@@ -1373,7 +1378,9 @@ public class TransactionManagerTest {
runUntil(transactionManager::hasError);
assertTrue(initPidResult.isCompleted());
assertFalse(initPidResult.isSuccessful());
- assertThrows(TransactionalIdAuthorizationException.class,
initPidResult::await);
+ assertThrows(TransactionalIdAuthorizationException.class, () ->
initPidResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+ TEST_TIMEOUT_MSG)
+ );
assertAbortableError(TransactionalIdAuthorizationException.class);
}
@@ -1622,7 +1629,7 @@ public class TransactionManagerTest {
assertFalse(transactionManager.hasPartitionsToAdd());
assertFalse(accumulator.hasIncomplete());
assertTrue(abortResult.isSuccessful());
- abortResult.await();
+ abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
TEST_TIMEOUT_MSG);
// ensure we can now start a new transaction
@@ -1656,7 +1663,10 @@ public class TransactionManagerTest {
runUntil(() -> transactionManager.transactionContainsPartition(tp0));
TransactionalRequestResult result = transactionManager.beginAbort();
- assertThrows(TimeoutException.class, () -> result.await(0,
TimeUnit.MILLISECONDS));
+ var timoutEx = assertThrows(TimeoutException.class, () ->
+ result.await(0, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG)
+ );
+ assertTrue(timoutEx.getMessage().contains(TEST_TIMEOUT_MSG));
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT,
producerId, epoch);
runUntil(transactionManager::isReady);
@@ -1670,7 +1680,7 @@ public class TransactionManagerTest {
assertThrows(IllegalStateException.class, () ->
transactionManager.maybeAddPartition(tp0));
assertSame(result, transactionManager.beginAbort());
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
transactionManager.beginTransaction();
assertTrue(transactionManager.hasOngoingTransaction());
@@ -1690,8 +1700,9 @@ public class TransactionManagerTest {
runUntil(() -> transactionManager.transactionContainsPartition(tp0));
TransactionalRequestResult result = transactionManager.beginCommit();
- assertThrows(TimeoutException.class, () -> result.await(0,
TimeUnit.MILLISECONDS));
-
+ var timeoutEx = assertThrows(TimeoutException.class, () ->
result.await(0, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG));
+ assertTrue(timeoutEx.getMessage().contains(TEST_TIMEOUT_MSG));
+
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT,
producerId, epoch);
runUntil(transactionManager::isReady);
assertTrue(result.isSuccessful());
@@ -1704,7 +1715,7 @@ public class TransactionManagerTest {
assertThrows(IllegalStateException.class, () ->
transactionManager.maybeAddPartition(tp0));
assertSame(result, transactionManager.beginCommit());
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
transactionManager.beginTransaction();
assertTrue(transactionManager.hasOngoingTransaction());
@@ -1717,7 +1728,8 @@ public class TransactionManagerTest {
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode,
transactionManager.coordinator(CoordinatorType.TRANSACTION));
- assertThrows(TimeoutException.class, () -> result.await(0,
TimeUnit.MILLISECONDS));
+ var timeoutEx = assertThrows(TimeoutException.class, () ->
result.await(0, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG));
+ assertTrue(timeoutEx.getMessage().contains(TEST_TIMEOUT_MSG));
prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
runUntil(transactionManager::hasProducerId);
@@ -1734,7 +1746,7 @@ public class TransactionManagerTest {
assertThrows(IllegalStateException.class, () ->
transactionManager.maybeAddPartition(tp0));
assertSame(result, transactionManager.initializeTransactions(false));
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
assertTrue(result.isAcked());
assertThrows(IllegalStateException.class, () ->
transactionManager.initializeTransactions(false));
@@ -1773,7 +1785,7 @@ public class TransactionManagerTest {
assertFalse(transactionManager.hasPartitionsToAdd());
assertFalse(accumulator.hasIncomplete());
assertTrue(result.isSuccessful());
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
// ensure we can now start a new transaction
@@ -1837,7 +1849,7 @@ public class TransactionManagerTest {
assertFalse(transactionManager.hasPartitionsToAdd());
assertFalse(accumulator.hasIncomplete());
assertTrue(abortResult.isSuccessful());
- abortResult.await();
+ abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
TEST_TIMEOUT_MSG);
// ensure we can now start a new transaction
@@ -1989,7 +2001,7 @@ public class TransactionManagerTest {
prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
runUntil(transactionManager::hasProducerId);
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
transactionManager.beginTransaction();
// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is
handled differently here, we substitute.
@@ -2021,7 +2033,7 @@ public class TransactionManagerTest {
prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
runUntil(transactionManager::hasProducerId);
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
}
@Test
@@ -2044,7 +2056,9 @@ public class TransactionManagerTest {
runUntil(transactionManager::hasError);
- assertThrows(ProducerFencedException.class, result::await);
+ assertThrows(ProducerFencedException.class, () ->
result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+ TEST_TIMEOUT_MSG)
+ );
assertThrows(ProducerFencedException.class, () ->
transactionManager.beginTransaction());
assertThrows(ProducerFencedException.class, () ->
transactionManager.beginCommit());
@@ -2139,7 +2153,9 @@ public class TransactionManagerTest {
runUntil(commitResult::isCompleted);
runUntil(responseFuture::isDone);
- assertThrows(KafkaException.class, commitResult::await);
+ assertThrows(KafkaException.class, () ->
commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+ TEST_TIMEOUT_MSG
+ ));
assertFalse(commitResult.isSuccessful());
assertTrue(commitResult.isAcked());
@@ -2201,7 +2217,9 @@ public class TransactionManagerTest {
runUntil(commitResult::isCompleted); // commit should be cancelled
with exception without being sent.
- assertThrows(KafkaException.class, commitResult::await);
+ assertThrows(KafkaException.class, () ->
commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+ TEST_TIMEOUT_MSG
+ ));
TestUtils.assertFutureThrows(OutOfOrderSequenceException.class,
responseFuture);
// Commit is not allowed, so let's abort and try again.
@@ -2859,10 +2877,11 @@ public class TransactionManagerTest {
runUntil(responseFuture::isDone);
// make sure the produce was expired.
- assertInstanceOf(
+ var timeoutEx = assertInstanceOf(
TimeoutException.class,
assertThrows(ExecutionException.class,
responseFuture::get).getCause(),
"Expected to get a TimeoutException since the queued ProducerBatch
should have been expired");
+ assertTrue(timeoutEx.getMessage().contains(SENDER_TIMEOUT_MSG));
assertTrue(transactionManager.hasAbortableError());
}
@@ -2907,15 +2926,17 @@ public class TransactionManagerTest {
runUntil(secondBatchResponse::isDone);
// make sure the produce was expired.
- assertInstanceOf(
+ var timeoutEx1 = assertInstanceOf(
TimeoutException.class,
assertThrows(ExecutionException.class,
firstBatchResponse::get).getCause(),
"Expected to get a TimeoutException since the queued ProducerBatch
should have been expired");
+ assertTrue(timeoutEx1.getMessage().contains(SENDER_TIMEOUT_MSG));
// make sure the produce was expired.
- assertInstanceOf(
+ var timeoutEx2 = assertInstanceOf(
TimeoutException.class,
assertThrows(ExecutionException.class,
secondBatchResponse::get).getCause(),
"Expected to get a TimeoutException since the queued ProducerBatch
should have been expired");
+ assertTrue(timeoutEx2.getMessage().contains(SENDER_TIMEOUT_MSG));
assertTrue(transactionManager.hasAbortableError());
}
@@ -2952,13 +2973,17 @@ public class TransactionManagerTest {
runUntil(responseFuture::isDone); // We should try to flush the
produce, but expire it instead without sending anything.
// make sure the produce was expired.
- assertInstanceOf(
+ var timeoutEx1 = assertInstanceOf(
TimeoutException.class,
assertThrows(ExecutionException.class,
responseFuture::get).getCause(),
"Expected to get a TimeoutException since the queued ProducerBatch
should have been expired");
+ assertTrue(timeoutEx1.getMessage().contains(SENDER_TIMEOUT_MSG));
+
runUntil(commitResult::isCompleted); // the commit shouldn't be
completed without being sent since the produce request failed.
assertFalse(commitResult.isSuccessful()); // the commit shouldn't
succeed since the produce request failed.
- assertInstanceOf(TimeoutException.class,
assertThrows(TransactionAbortableException.class,
commitResult::await).getCause());
+ var timeoutEx2 = assertInstanceOf(TimeoutException.class,
assertThrows(TransactionAbortableException.class,
+ () -> commitResult.await(Long.MAX_VALUE,
TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG)).getCause());
+ assertTrue(timeoutEx2.getMessage().contains(SENDER_TIMEOUT_MSG));
assertTrue(transactionManager.hasAbortableError());
assertTrue(transactionManager.hasOngoingTransaction());
@@ -3024,10 +3049,11 @@ public class TransactionManagerTest {
runUntil(responseFuture::isDone); // We should try to flush the
produce, but expire it instead without sending anything.
// make sure the produce was expired.
- assertInstanceOf(
+ var timeoutEx = assertInstanceOf(
TimeoutException.class,
assertThrows(ExecutionException.class,
responseFuture::get).getCause(),
"Expected to get a TimeoutException since the queued ProducerBatch
should have been expired");
+ assertTrue(timeoutEx.getMessage().contains(SENDER_TIMEOUT_MSG));
runUntil(commitResult::isCompleted);
assertFalse(commitResult.isSuccessful()); // the commit should have
been dropped.
@@ -3309,7 +3335,7 @@ public class TransactionManagerTest {
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT,
producerId, epoch);
runUntil(abortResult::isCompleted);
assertTrue(abortResult.isSuccessful());
- abortResult.await();
+ abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
TEST_TIMEOUT_MSG);
assertTrue(transactionManager.isReady()); // make sure we are ready
for a transaction now.
transactionManager.beginTransaction();
@@ -3377,7 +3403,7 @@ public class TransactionManagerTest {
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT,
producerId, epoch);
runUntil(abortResult::isCompleted);
assertTrue(abortResult.isSuccessful());
- abortResult.await();
+ abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
TEST_TIMEOUT_MSG);
assertTrue(transactionManager.isReady()); // make sure we are ready
for a transaction now.
transactionManager.beginTransaction();
@@ -3425,7 +3451,7 @@ public class TransactionManagerTest {
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
- abortResult.await();
+ abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
TEST_TIMEOUT_MSG);
assertTrue(transactionManager.isReady()); // make sure we are ready
for a transaction now.
transactionManager.beginTransaction();
@@ -3472,7 +3498,7 @@ public class TransactionManagerTest {
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
- abortResult.await();
+ abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
TEST_TIMEOUT_MSG);
assertTrue(transactionManager.isReady()); // make sure we are ready
for a transaction now.
transactionManager.beginTransaction();
@@ -3531,7 +3557,7 @@ public class TransactionManagerTest {
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
- abortResult.await();
+ abortResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
TEST_TIMEOUT_MSG);
assertTrue(transactionManager.isReady()); // make sure we are ready
for a transaction now.
transactionManager.beginTransaction();
@@ -3863,7 +3889,7 @@ public class TransactionManagerTest {
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT,
producerId, epoch);
runUntil(() -> !transactionManager.hasOngoingTransaction());
runUntil(retryResult::isCompleted);
- retryResult.await();
+ retryResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
TEST_TIMEOUT_MSG);
runUntil(retryResult::isAcked);
assertFalse(transactionManager.hasOngoingTransaction());
}
@@ -3879,7 +3905,9 @@ public class TransactionManagerTest {
runUntil(transactionManager::hasError);
assertTrue(initPidResult.isCompleted());
assertFalse(initPidResult.isSuccessful());
- assertThrows(TransactionAbortableException.class,
initPidResult::await);
+ assertThrows(TransactionAbortableException.class, () ->
initPidResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+ TEST_TIMEOUT_MSG
+ ));
assertAbortableError(TransactionAbortableException.class);
}
@@ -3939,7 +3967,9 @@ public class TransactionManagerTest {
runUntil(commitResult::isCompleted);
runUntil(responseFuture::isDone);
- assertThrows(KafkaException.class, commitResult::await);
+ assertThrows(KafkaException.class, () ->
commitResult.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+ TEST_TIMEOUT_MSG
+ ));
assertFalse(commitResult.isSuccessful());
assertTrue(commitResult.isAcked());
@@ -4012,7 +4042,7 @@ public class TransactionManagerTest {
prepareEndTxnResponse(Errors.NONE, firstTransactionResult, producerId,
epoch, producerId, epoch, true);
runUntil(() -> !client.hasPendingResponses());
assertFalse(result.isCompleted());
- assertThrows(TimeoutException.class, () ->
result.await(MAX_BLOCK_TIMEOUT, TimeUnit.MILLISECONDS));
+ assertThrows(TimeoutException.class, () ->
result.await(MAX_BLOCK_TIMEOUT, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG));
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() -> !client.hasPendingResponses());
@@ -4102,8 +4132,8 @@ public class TransactionManagerTest {
runUntil(transactionManager::hasProducerId);
transactionManager.maybeUpdateTransactionV2Enabled(true);
-
- result.await();
+
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
assertTrue(result.isSuccessful());
// Verify transaction manager transitioned to PREPARED_TRANSACTION
state
@@ -4143,8 +4173,8 @@ public class TransactionManagerTest {
runUntil(transactionManager::hasProducerId);
transactionManager.maybeUpdateTransactionV2Enabled(true);
-
- result.await();
+
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
assertTrue(result.isSuccessful());
// Verify transaction manager transitioned to READY state (not
PREPARED_TRANSACTION)
@@ -4490,7 +4520,7 @@ public class TransactionManagerTest {
runUntil(transactionManager::hasProducerId);
transactionManager.maybeUpdateTransactionV2Enabled(true);
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
assertTrue(result.isSuccessful());
assertTrue(result.isAcked());
}
@@ -4532,7 +4562,7 @@ public class TransactionManagerTest {
runUntil(transactionManager::hasProducerId);
transactionManager.maybeUpdateTransactionV2Enabled(true);
- result.await();
+ result.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, TEST_TIMEOUT_MSG);
assertTrue(result.isSuccessful());
assertTrue(result.isAcked());
}
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 078d006e37a..c27e1259f86 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -620,6 +620,15 @@ public class TestUtils {
assertEquals(expectedMessage, receivedException.getMessage());
}
+ public static <T extends Throwable> void
assertFutureThrowsWithMessageContaining(
+ Class<T> expectedCauseClassApiException,
+ Future<?> future,
+ String expectedMessage
+ ) {
+ T receivedException =
assertFutureThrows(expectedCauseClassApiException, future);
+ assertTrue(receivedException.getMessage().contains(expectedMessage));
+ }
+
public static ApiKeys apiKeyFrom(NetworkReceive networkReceive) {
return
RequestHeader.parse(networkReceive.payload().duplicate()).apiKey();
}