This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f29ca21976d [fix][broker]Transactional messages can never be sent
successfully if concurrently taking transaction buffer snapshot (#24945)
f29ca21976d is described below
commit f29ca21976d63a92371785fbdbe712f9f5e54cf2
Author: fengyubiao <[email protected]>
AuthorDate: Tue Nov 11 17:23:45 2025 +0800
[fix][broker]Transactional messages can never be sent successfully if
concurrently taking transaction buffer snapshot (#24945)
---
.../buffer/impl/TopicTransactionBuffer.java | 194 ++++++++++++++++-----
.../buffer/impl/TopicTransactionBufferState.java | 21 ++-
.../broker/transaction/TransactionConsumeTest.java | 101 +++++++++++
.../buffer/TopicTransactionBufferTest.java | 18 +-
.../buffer/utils/TransactionBufferTestImpl.java | 15 ++
5 files changed, 285 insertions(+), 64 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 0c777afaa26..2df6e717981 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -92,9 +93,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final CompletableFuture<Void> transactionBufferFuture = new
CompletableFuture<>();
- private CompletableFuture<Position> publishFuture =
getTransactionBufferFuture()
- .thenApply(__ -> PositionFactory.EARLIEST);
-
/**
* The map is used to store the lowWaterMarks which key is TC ID and value
is lowWaterMark of the TC.
*/
@@ -108,6 +106,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final AbortedTxnProcessor.SnapshotType snapshotType;
private final MaxReadPositionCallBack maxReadPositionCallBack;
+ /** if the first snapshot is in progress, it will pending following
publishing tasks. **/
+ private final LinkedList<PendingAppendingTxnBufferTask>
pendingAppendingTxnBufferTasks = new LinkedList<>();
private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic
topic) {
return
topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
@@ -232,16 +232,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
return CompletableFuture.completedFuture(null);
}
- @VisibleForTesting
- public void setPublishFuture(CompletableFuture<Position> publishFuture) {
- this.publishFuture = publishFuture;
- }
-
- @VisibleForTesting
- public CompletableFuture<Position> getPublishFuture() {
- return publishFuture;
- }
-
@VisibleForTesting
public CompletableFuture<Void> getTransactionBufferFuture() {
return transactionBufferFuture;
@@ -267,47 +257,146 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
return this.txnCommittedCounter.sum();
}
+ private record PendingAppendingTxnBufferTask(TxnID txnId, long sequenceId,
ByteBuf buffer,
+ CompletableFuture<Position>
pendingPublishFuture) {
+
+ void fail(Throwable throwable) {
+ buffer.release();
+ pendingPublishFuture.completeExceptionally(throwable);
+ }
+ }
+
@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long
sequenceId, ByteBuf buffer) {
- // Method `takeAbortedTxnsSnapshot` will be executed in the different
thread.
- // So we need to retain the buffer in this thread. It will be released
after message persistent.
- buffer.retain();
- CompletableFuture<Position> future =
getPublishFuture().thenCompose(ignore -> {
- if (checkIfNoSnapshot()) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- // `publishFuture` will be completed after message persistent,
so there will not be two threads
- // writing snapshots at the same time.
-
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(()
-> {
- if (changeToReadyStateFromNoSnapshot()) {
- timer.newTimeout(TopicTransactionBuffer.this,
- takeSnapshotIntervalTime,
TimeUnit.MILLISECONDS);
- completableFuture.complete(null);
- } else {
- log.error("[{}]Failed to change state of transaction
buffer to Ready from NoSnapshot",
- topic.getName());
- completableFuture.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(
- "Transaction Buffer take first snapshot
failed, the current state is: " + getState()));
- }
- }).exceptionally(exception -> {
- log.error("Topic {} failed to take snapshot",
this.topic.getName());
- completableFuture.completeExceptionally(exception);
- return null;
- });
- return completableFuture.thenCompose(__ ->
internalAppendBufferToTxn(txnId, buffer));
- } else if (checkIfReady()) {
- return internalAppendBufferToTxn(txnId, buffer);
- } else {
- // `publishFuture` will be completed after transaction buffer
recover completely
- // during initializing, so this case should not happen.
+ synchronized (pendingAppendingTxnBufferTasks) {
+ // The first snapshot is in progress, the following publish tasks
will be pending.
+ if (!pendingAppendingTxnBufferTasks.isEmpty()) {
+ CompletableFuture<Position> res = new CompletableFuture<>();
+ buffer.retain();
+ pendingAppendingTxnBufferTasks.offer(new
PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res));
+ return res;
+ }
+
+ // `publishFuture` will be completed after transaction buffer
recover completely
+ // during initializing, so this case should not happen.
+ if (!checkIfReady() && !checkIfNoSnapshot() &&
!checkIfFirstSnapshotting() && !checkIfInitializing()) {
+ log.error("[{}] unexpected state: {} when try to take the
first transaction buffer snapshot",
+ topic.getName(), getState());
return FutureUtil.failedFuture(new
BrokerServiceException.ServiceUnitNotReadyException(
"Transaction Buffer recover failed, the current state
is: " + getState()));
}
- }).whenComplete(((position, throwable) -> buffer.release()));
- setPublishFuture(future);
- return future;
+
+ // The transaction buffer is ready to write.
+ if (checkIfReady()) {
+ return internalAppendBufferToTxn(txnId, buffer, sequenceId);
+ }
+
+ // Pending the current publishing and trigger new snapshot if
needed.
+ CompletableFuture<Position> res = new CompletableFuture<>();
+ buffer.retain();
+ pendingAppendingTxnBufferTasks.offer(new
PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res));
+
+ final java.util.function.Consumer<Throwable> failPendingTasks =
throwable -> {
+ synchronized (pendingAppendingTxnBufferTasks) {
+ PendingAppendingTxnBufferTask pendingTask = null;
+ while ((pendingTask =
pendingAppendingTxnBufferTasks.poll()) != null) {
+ pendingTask.fail(throwable);
+ }
+ }
+ };
+
+ final Runnable flushPendingTasks = () -> {
+ PendingAppendingTxnBufferTask pendingTask = null;
+ try {
+ synchronized (pendingAppendingTxnBufferTasks) {
+ while ((pendingTask =
pendingAppendingTxnBufferTasks.poll()) != null) {
+ final ByteBuf data = pendingTask.buffer;
+ final CompletableFuture<Position> pendingFuture =
+ pendingTask.pendingPublishFuture;
+ internalAppendBufferToTxn(pendingTask.txnId,
pendingTask.buffer,
+ pendingTask.sequenceId)
+ .whenComplete((positionAdded, ex3) -> {
+ data.release();
+ if (ex3 != null) {
+
pendingFuture.completeExceptionally(ex3);
+ return;
+ }
+ pendingFuture.complete(positionAdded);
+ });
+ }
+ }
+ } catch (Exception e) {
+ // If there are some error when adding entries or caching
entries, this log will be printed.
+ log.error("[{}] Failed to flush pending publishing
requests after taking the first"
+ + " snapshot.",
+ topic.getName(), e);
+ if (pendingTask != null) {
+ pendingTask.fail(e);
+ }
+ failPendingTasks.accept(e);
+ }
+ };
+
+ // Trigger the first snapshot.
+ transactionBufferFuture.whenComplete((ignore1, ex1) -> {
+ if (ex1 != null) {
+ log.error("[{}] Transaction buffer recover failed",
topic.getName(), ex1);
+ failPendingTasks.accept(ex1);
+ return;
+ }
+ if (changeToFirstSnapshotting()) {
+ log.info("[{}] Start to take the first snapshot",
topic.getName());
+ // Flush pending publishing after the first snapshot
finished.
+ takeFirstSnapshot().whenComplete((ignore2, ex2) -> {
+ if (ex2 != null) {
+ log.error("[{}] Failed to take the first snapshot,
flushing failed publishing requests",
+ topic.getName(), ex2);
+ failPendingTasks.accept(ex2);
+ return;
+ }
+ log.info("[{}] Finished to take the first snapshot,
flushing publishing {} requests",
+ topic.getName(),
pendingAppendingTxnBufferTasks.size());
+ flushPendingTasks.run();
+ });
+ } else if (checkIfReady()) {
+ log.info("[{}] No need to take the first snapshot,
flushing publishing {} requests",
+ topic.getName(),
pendingAppendingTxnBufferTasks.size());
+ flushPendingTasks.run();
+ } else {
+ log.error("[{}] Transaction buffer recover failed, current
state is {}", topic.getName(),
+ getState());
+ failPendingTasks.accept(new
BrokerServiceException.ServiceUnitNotReadyException(
+ "Transaction Buffer recover failed, the current
state is: " + getState()));
+ }
+ });
+ return res;
+ }
+ }
+
+ private CompletableFuture<Void> takeFirstSnapshot() {
+ CompletableFuture<Void> firstSnapshottingFuture = new
CompletableFuture<>();
+
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(()
-> {
+ if (changeToReadyStateFromNoSnapshot()) {
+ timer.newTimeout(TopicTransactionBuffer.this,
+ takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
+ firstSnapshottingFuture.complete(null);
+ } else {
+ log.error("[{}]Failed to change state of transaction buffer to
Ready from NoSnapshot",
+ topic.getName());
+ firstSnapshottingFuture.completeExceptionally(new
BrokerServiceException
+ .ServiceUnitNotReadyException(
+ "Transaction Buffer take first snapshot failed, the
current state is: " + getState()));
+ }
+ }).exceptionally(exception -> {
+ log.error("Topic {} failed to take snapshot",
this.topic.getName());
+ firstSnapshottingFuture.completeExceptionally(exception);
+ return null;
+ });
+ return firstSnapshottingFuture;
}
- private CompletableFuture<Position> internalAppendBufferToTxn(TxnID txnId,
ByteBuf buffer) {
+ @VisibleForTesting
+ protected CompletableFuture<Position> internalAppendBufferToTxn(TxnID
txnId, ByteBuf buffer, long seq) {
CompletableFuture<Position> completableFuture = new
CompletableFuture<>();
Long lowWaterMark = lowWaterMarks.get(txnId.getMostSigBits());
if (lowWaterMark != null && lowWaterMark >= txnId.getLeastSigBits()) {
@@ -550,7 +639,16 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public CompletableFuture<Void> closeAsync() {
- changeToCloseState();
+ synchronized (pendingAppendingTxnBufferTasks) {
+ if (!checkIfClosed()) {
+ PendingAppendingTxnBufferTask pendingTask = null;
+ Throwable t = new
BrokerServiceException.ServiceUnitNotReadyException("Topic is closed");
+ while ((pendingTask = pendingAppendingTxnBufferTasks.poll())
!= null) {
+ pendingTask.fail(t);
+ }
+ }
+ changeToCloseState();
+ }
return this.snapshotAbortedTxnProcessor.closeAsync();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
index 92ab1d07b69..9a8f2041bf4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
@@ -33,7 +33,8 @@ public abstract class TopicTransactionBufferState {
Initializing,
Ready,
Close,
- NoSnapshot
+ NoSnapshot,
+ FirstSnapshotting
}
private static final
AtomicReferenceFieldUpdater<TopicTransactionBufferState, State> STATE_UPDATER =
@@ -59,13 +60,25 @@ public abstract class TopicTransactionBufferState {
}
protected boolean changeToReadyStateFromNoSnapshot() {
- return STATE_UPDATER.compareAndSet(this, State.NoSnapshot,
State.Ready);
+ return STATE_UPDATER.compareAndSet(this, State.FirstSnapshotting,
State.Ready);
+ }
+
+ protected boolean changeToFirstSnapshotting() {
+ return STATE_UPDATER.compareAndSet(this, State.NoSnapshot,
State.FirstSnapshotting);
}
protected void changeToCloseState() {
STATE_UPDATER.set(this, State.Close);
}
+ public boolean checkIfInitializing() {
+ return STATE_UPDATER.get(this) == State.Initializing;
+ }
+
+ public boolean checkIfFirstSnapshotting() {
+ return STATE_UPDATER.get(this) == State.FirstSnapshotting;
+ }
+
public boolean checkIfReady() {
return STATE_UPDATER.get(this) == State.Ready;
}
@@ -74,6 +87,10 @@ public abstract class TopicTransactionBufferState {
return STATE_UPDATER.get(this) == State.NoSnapshot;
}
+ public boolean checkIfClosed() {
+ return STATE_UPDATER.get(this) == State.Close;
+ }
+
public State getState() {
return STATE_UPDATER.get(this);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index a7e2aac5174..16ce35214dc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -19,7 +19,10 @@
package org.apache.pulsar.broker.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
@@ -28,18 +31,23 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -62,6 +70,7 @@ import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
@@ -417,4 +426,96 @@ public class TransactionConsumeTest extends
TransactionTestBase {
Assert.assertEquals(admin.topics().getStats(CONSUME_TOPIC).getSubscriptions().get(subName)
.getUnackedMessages(), 0);
}
+
+ @DataProvider
+ public Object[][] doCommitTxn() {
+ return new Object[][] {
+ {true},
+ {false}
+ };
+ }
+
+ @Test(dataProvider = "doCommitTxn", timeOut = 60_000, invocationCount = 3)
+ public void testFirstTnxBufferSnapshotAndRecoveryConcurrently(boolean
doCommitTxn) throws Exception {
+ String topic =
BrokerTestUtil.newUniqueName("persistent://public/txn/tp");
+ // Create many clients and publish with transaction, which will
trigger transaction buffer snapshot
+ // concurrently.
+ int producerCount = 10;
+ List<PulsarClient> clientList = new ArrayList<>();
+ List<Producer<String>> producerList = new ArrayList<>();
+ List<CompletableFuture<MessageId>> sendResults = new ArrayList<>();
+ List<Transaction> pendingTnxList = new ArrayList<>();
+ for (int i = 0; i < producerCount; i++) {
+ clientList.add(PulsarClient.builder()
+ .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ .enableTransaction(true)
+ .build());
+ }
+ for (int i = 0; i < producerCount; i++) {
+
producerList.add(clientList.get(i).newProducer(Schema.STRING).topic(topic).create());
+ }
+ Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+ .subscriptionName("s1").subscribe();
+ for (int i = 0; i < producerCount; i++) {
+ Transaction transaction = clientList.get(i).newTransaction()
+ .withTransactionTimeout(5, TimeUnit.HOURS)
+ .build().get();
+ pendingTnxList.add(transaction);
+ final int index = i;
+ Producer<String> producer = producerList.get(i);
+ new Thread(() -> {
+ sendResults.add(producer.newMessage(transaction).value(index +
"").sendAsync());
+ }).start();
+ }
+
+ // Verify that the transaction buffer snapshot succeed.
+ AtomicReference<TopicTransactionBuffer> topicTransactionBuffer = new
AtomicReference<>();
+ for (PulsarService pulsar : pulsarServiceList) {
+ if (pulsar.getBrokerService().getTopics().containsKey(topic)) {
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopic(topic, false).get().get();
+ topicTransactionBuffer.set((TopicTransactionBuffer)
persistentTopic.getTransactionBuffer());
+ break;
+ }
+ }
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(topicTransactionBuffer.get());
+ assertEquals(topicTransactionBuffer.get().getState().toString(),
"Ready");
+
assertTrue(topicTransactionBuffer.get().getTransactionBufferFuture().isDone());
+
assertFalse(topicTransactionBuffer.get().getTransactionBufferFuture().isCompletedExceptionally());
+ });
+
+ // Verify that all messages are sent successfully.
+ for (int i = 0; i < producerCount; i++) {
+ sendResults.get(i).get();
+ if (doCommitTxn) {
+ pendingTnxList.get(i).commit();
+ } else {
+ pendingTnxList.get(i).abort();
+ }
+ }
+ Set<String> msgReceived = new HashSet<>();
+ while (true) {
+ Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ msgReceived.add(msg.getValue());
+ }
+ if (doCommitTxn) {
+ for (int i = 0; i < producerCount; i++) {
+ assertTrue(msgReceived.contains(i + ""));
+ }
+ } else {
+ assertTrue(msgReceived.isEmpty());
+ }
+
+ // cleanup.
+ consumer.close();
+ for (int i = 0; i < producerCount; i++) {
+ producerList.get(i).close();
+ clientList.get(i).close();
+ }
+ admin.topics().delete(topic, false);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 5a54b37a637..d76a5a88dbd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.RandomUtils;
@@ -69,7 +68,6 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -513,14 +511,6 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
.withTransactionTimeout(5, TimeUnit.HOURS)
.build().get();
- // 2. Set a new future in transaction buffer as
`transactionBufferFuture` to simulate whether the
- // transaction buffer recover completely.
- TransactionBufferTestImpl topicTransactionBuffer =
(TransactionBufferTestImpl) persistentTopic
- .getTransactionBuffer();
- CompletableFuture<Position> completableFuture = new
CompletableFuture<>();
- CompletableFuture<Position> originalFuture =
topicTransactionBuffer.getPublishFuture();
- topicTransactionBuffer.setPublishFuture(completableFuture);
-
topicTransactionBuffer.setState(TopicTransactionBufferState.State.Ready);
// Register this topic to the transaction in advance to avoid the
sending request pending here.
((TransactionImpl) transaction).registerProducedTopic(topic).get(5,
TimeUnit.SECONDS);
// 3. Test the messages sent before transaction buffer ready is in
order.
@@ -528,7 +518,6 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
producer.newMessage(transaction).value(i).sendAsync();
}
// 4. Test the messages sent after transaction buffer ready is in
order.
- completableFuture.complete(originalFuture.get());
for (int i = 50; i < 100; i++) {
producer.newMessage(transaction).value(i).sendAsync();
}
@@ -569,16 +558,17 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
.get(5, TimeUnit.SECONDS);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(byteBuf2.refCnt(), 1));
// 2.3 Test sending message failed.
- topicTransactionBuffer.setPublishFuture(FutureUtil.failedFuture(new
Exception("fail")));
+ topicTransactionBuffer.setFollowingInternalAppendBufferToTxnFail(true);
ByteBuf byteBuf3 = Unpooled.buffer();
try {
topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L,
byteBuf1)
.get(5, TimeUnit.SECONDS);
- fail();
+ fail("this appending should fail because we injected an error");
} catch (Exception e) {
- assertEquals(e.getCause().getMessage(), "fail");
+ assertEquals(e.getCause().getMessage(), "failed because an
injected error for test");
}
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(byteBuf3.refCnt(), 1));
+
topicTransactionBuffer.setFollowingInternalAppendBufferToTxnFail(false);
// 3. release resource
byteBuf1.release();
byteBuf2.release();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java
index b1168d08501..f1a003ff194 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java
@@ -18,14 +18,21 @@
*/
package org.apache.pulsar.broker.transaction.buffer.utils;
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.CompletableFuture;
import lombok.Setter;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import org.apache.pulsar.client.api.transaction.TxnID;
public class TransactionBufferTestImpl extends TopicTransactionBuffer {
@Setter
public State state = null;
+ @Setter
+ private boolean followingInternalAppendBufferToTxnFail;
+
public TransactionBufferTestImpl(PersistentTopic topic) {
super(topic);
}
@@ -34,4 +41,12 @@ public class TransactionBufferTestImpl extends
TopicTransactionBuffer {
public State getState() {
return state == null ? super.getState() : state;
}
+
+ @Override
+ protected CompletableFuture<Position> internalAppendBufferToTxn(TxnID
txnId, ByteBuf buffer, long seq) {
+ if (followingInternalAppendBufferToTxnFail) {
+ return CompletableFuture.failedFuture(new RuntimeException("failed
because an injected error for test"));
+ }
+ return super.internalAppendBufferToTxn(txnId, buffer, seq);
+ }
}