This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e9b951232418188013fcc9671e6d5e67f6ed8e3f Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Wed Jan 26 15:42:29 2022 +0800 [Transaction] PendingAckHandleImpl handle isInCacheRequest (#13481) ### Motivation When PendingAck is not Ready, requests will be put into the queue first. When the PendingAck recovery is completed, the new request and the request in the queue are processed at the same time, which may cause the ack to be out of order. ### Modification Use a single thread to handle requests. After the recovery is completed, the callback is processed in a single thread, which is mutually exclusive with the processing of new requests. (cherry picked from commit 97f0030ea4651c01227072b3a7522dc9ff198623) --- .../org/apache/pulsar/broker/PulsarService.java | 17 +- .../service/persistent/PersistentSubscription.java | 8 +- .../buffer/impl/TopicTransactionBuffer.java | 5 +- .../transaction/pendingack/PendingAckHandle.java | 15 +- .../pendingack/impl/MLPendingAckReplyCallBack.java | 6 +- .../pendingack/impl/PendingAckHandleDisabled.java | 12 +- .../pendingack/impl/PendingAckHandleImpl.java | 331 ++++++++++----------- .../persistent/PersistentSubscriptionTest.java | 5 +- .../pendingack/PendingAckPersistentTest.java | 1 + 9 files changed, 185 insertions(+), 215 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 28ccbb1..79aba81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -258,7 +258,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private PulsarResources pulsarResources; private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider; - private final ScheduledExecutorService transactionReplayExecutor; + private final ExecutorProvider transactionExecutorProvider; public enum State { Init, Started, Closing, Closed @@ -314,11 +314,10 @@ public class PulsarService implements AutoCloseable, ShutdownService { new DefaultThreadFactory("zk-cache-callback")); if (config.isTransactionCoordinatorEnabled()) { - this.transactionReplayExecutor = Executors.newScheduledThreadPool( - config.getNumTransactionReplayThreadPoolSize(), - new DefaultThreadFactory("transaction-replay")); + this.transactionExecutorProvider = new ExecutorProvider(this.getConfiguration() + .getNumTransactionReplayThreadPoolSize(), "pulsar-transaction-executor"); } else { - this.transactionReplayExecutor = null; + this.transactionExecutorProvider = null; } this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(), @@ -494,8 +493,8 @@ public class PulsarService implements AutoCloseable, ShutdownService { configurationMetadataStore.close(); } - if (transactionReplayExecutor != null) { - transactionReplayExecutor.shutdown(); + if (transactionExecutorProvider != null) { + transactionExecutorProvider.shutdownNow(); } ioEventLoopGroup.shutdownGracefully(); @@ -1246,8 +1245,8 @@ public class PulsarService implements AutoCloseable, ShutdownService { return cacheExecutor; } - public ScheduledExecutorService getTransactionReplayExecutor() { - return transactionReplayExecutor; + public ExecutorProvider getTransactionExecutorProvider() { + return transactionExecutorProvider; } public ScheduledExecutorService getLoadManagerExecutor() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 8d75ea7..cd9f462 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -499,11 +499,11 @@ public class PersistentSubscription implements Subscription { public CompletableFuture<Void> transactionIndividualAcknowledge( TxnID txnId, List<MutablePair<PositionImpl, Integer>> positions) { - return pendingAckHandle.individualAcknowledgeMessage(txnId, positions, false); + return pendingAckHandle.individualAcknowledgeMessage(txnId, positions); } public CompletableFuture<Void> transactionCumulativeAcknowledge(TxnID txnId, List<PositionImpl> positions) { - return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions, false); + return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions); } private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() { @@ -1173,14 +1173,14 @@ public class PersistentSubscription implements Subscription { public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction, long lowWaterMark) { TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits); if (TxnAction.COMMIT.getValue() == txnAction) { - return pendingAckHandle.commitTxn(txnID, Collections.emptyMap(), lowWaterMark, false); + return pendingAckHandle.commitTxn(txnID, Collections.emptyMap(), lowWaterMark); } else if (TxnAction.ABORT.getValue() == txnAction) { Consumer redeliverConsumer = null; if (getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) { redeliverConsumer = ((PersistentDispatcherSingleActiveConsumer) getDispatcher()).getActiveConsumer(); } - return pendingAckHandle.abortTxn(txnID, redeliverConsumer, lowWaterMark, false); + return pendingAckHandle.abortTxn(txnID, redeliverConsumer, lowWaterMark); } else { return FutureUtil.failedFuture(new NotAllowedException("Unsupported txnAction " + txnAction)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 9a324fb..12818b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -104,7 +104,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar() .getConfiguration().getTransactionBufferSnapshotMinTimeInMillis(); this.maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); - this.topic.getBrokerService().getPulsar().getTransactionReplayExecutor() + this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this) .execute(new TopicTransactionBufferRecover(new TopicTransactionBufferRecoverCallBack() { @Override public void recoverComplete() { @@ -607,7 +607,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen closeCursor(managedCursor); callBack.recoverComplete(); - }, topic.getBrokerService().getPulsar().getTransactionReplayExecutor()).exceptionally(e -> { + }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this)) + .exceptionally(e -> { callBack.recoverExceptionally(new Exception(e)); log.error("[{}]Transaction buffer new snapshot reader fail!", topic.getName(), e); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java index dc64cbe..620db5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java @@ -50,15 +50,13 @@ public interface PendingAckHandle { * * @param txnID {@link TxnID} TransactionID of an ongoing transaction trying to sck message. * @param positions {@link MutablePair} the pair of positions and these batch size. - * @param isInCacheRequest {@link Boolean} the boolean of the request in cache whether or not. * @return the future of this operation. * @throws TransactionConflictException if the ack with transaction is conflict with pending ack. * @throws NotAllowedException if Use this method incorrectly eg. not use * PositionImpl or cumulative ack with a list of positions. */ CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePair<PositionImpl, - Integer>> positions, boolean isInCacheRequest); - + Integer>> positions); /** * Acknowledge message(s) for an ongoing transaction. * <p> @@ -76,14 +74,12 @@ public interface PendingAckHandle { * * @param txnID {@link TxnID} TransactionID of an ongoing transaction trying to sck message. * @param positions {@link MutablePair} the pair of positions and these batch size. - * @param isInCacheRequest {@link Boolean} the boolean of the request in cache whether or not. * @return the future of this operation. * @throws TransactionConflictException if the ack with transaction is conflict with pending ack. * @throws NotAllowedException if Use this method incorrectly eg. not use * PositionImpl or cumulative ack with a list of positions. */ - CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions, - boolean isInCacheRequest); + CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions); /** * Commit a transaction. @@ -92,11 +88,9 @@ public interface PendingAckHandle { * @param properties Additional user-defined properties that can be * associated with a particular cursor position. * @param lowWaterMark the low water mark of this transaction - * @param isInCacheRequest {@link Boolean} the boolean of the request in cache whether or not. * @return the future of this operation. */ - CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, - long lowWaterMark, boolean isInCacheRequest); + CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark); /** * Abort a transaction. @@ -104,10 +98,9 @@ public interface PendingAckHandle { * @param txnId {@link TxnID} to identify the transaction. * @param consumer {@link Consumer} which aborting transaction. * @param lowWaterMark the low water mark of this transaction - * @param isInCacheRequest {@link Boolean} the boolean of the request in cache whether or not. * @return the future of this operation. */ - CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark, boolean isInCacheRequest); + CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark); /** * Sync the position ack set, in order to clean up the cache of this position for pending ack handle. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java index b98e64e..728e7f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java @@ -44,7 +44,7 @@ public class MLPendingAckReplyCallBack implements PendingAckReplyCallBack { @Override public void replayComplete() { - synchronized (pendingAckHandle) { + pendingAckHandle.getInternalPinnedExecutor().execute(() -> { log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!", pendingAckHandle.getTopicName(), pendingAckHandle.getSubName()); @@ -56,8 +56,8 @@ public class MLPendingAckReplyCallBack implements PendingAckReplyCallBack { log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!", pendingAckHandle.getTopicName(), pendingAckHandle.getSubName()); } - } - pendingAckHandle.handleCacheRequest(); + pendingAckHandle.handleCacheRequest(); + }); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java index 634655e..1190727 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java @@ -42,26 +42,22 @@ public class PendingAckHandleDisabled implements PendingAckHandle { @Override public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, - List<MutablePair<PositionImpl, Integer>> positions, - boolean isInCacheRequest) { + List<MutablePair<PositionImpl, Integer>> positions) { return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled")); } @Override - public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions, - boolean isInCacheRequest) { + public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions) { return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled")); } @Override - public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark, - boolean isInCacheRequest) { + public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark) { return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled")); } @Override - public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark, - boolean isInCacheRequest) { + public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark) { return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled")); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index ba74d78..1c0b10f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -29,7 +29,10 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; @@ -42,7 +45,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedExcepti import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; @@ -111,13 +113,23 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi private final BlockingQueue<Runnable> acceptQueue = new LinkedBlockingDeque<>(); + @Getter + private final ExecutorService internalPinnedExecutor; + + public PendingAckHandleImpl(PersistentSubscription persistentSubscription) { super(State.None); this.topicName = persistentSubscription.getTopicName(); this.subName = persistentSubscription.getName(); this.persistentSubscription = persistentSubscription; - - this.pendingAckStoreProvider = ((PersistentTopic) this.persistentSubscription.getTopic()) + internalPinnedExecutor = persistentSubscription + .getTopic() + .getBrokerService() + .getPulsar() + .getTransactionExecutorProvider() + .getExecutor(this); + + this.pendingAckStoreProvider = this.persistentSubscription.getTopic() .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider(); pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> { if (init) { @@ -130,22 +142,19 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi private void initPendingAckStore() { if (changeToInitializingState()) { - synchronized (PendingAckHandleImpl.this) { - if (!checkIfClose()) { - this.pendingAckStoreFuture = - pendingAckStoreProvider.newPendingAckStore(persistentSubscription); - this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { - pendingAckStore.replayAsync(this, - ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService() - .getPulsar().getTransactionReplayExecutor()); - }).exceptionally(e -> { - acceptQueue.clear(); - changeToErrorState(); - exceptionHandleFuture(e.getCause()); - log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e); - return null; - }); - } + if (!checkIfClose()) { + this.pendingAckStoreFuture = + pendingAckStoreProvider.newPendingAckStore(persistentSubscription); + this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { + pendingAckStore.replayAsync(this, + (ScheduledExecutorService) internalPinnedExecutor); + }).exceptionally(e -> { + acceptQueue.clear(); + changeToErrorState(); + log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e); + exceptionHandleFuture(e.getCause()); + return null; + }); } } } @@ -153,50 +162,20 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi private void addIndividualAcknowledgeMessageRequest(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions, CompletableFuture<Void> completableFuture) { - acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, true).thenAccept(v -> - completableFuture.complete(null)).exceptionally(e -> { - completableFuture.completeExceptionally(e); - return null; - })); + acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID, positions, completableFuture)); } - @Override - public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, - List<MutablePair<PositionImpl, Integer>> positions, - boolean isInCacheRequest) { - - if (!checkIfReady()) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - synchronized (PendingAckHandleImpl.this) { - switch (state) { - case Initializing: - addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); - return completableFuture; - case None: - addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); - initPendingAckStore(); - return completableFuture; - case Error: - completableFuture.completeExceptionally( - new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); - return completableFuture; - case Close: - completableFuture.completeExceptionally( - new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); - return completableFuture; - default: - break; - } - } - } - + public void internalIndividualAcknowledgeMessage(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions, + CompletableFuture<Void> completableFuture) { if (txnID == null) { - return FutureUtil.failedFuture(new NotAllowedException("TransactionID can not be null.")); + completableFuture.completeExceptionally(new NotAllowedException("Positions can not be null.")); + return; + } if (positions == null) { - return FutureUtil.failedFuture(new NotAllowedException("Positions can not be null.")); + completableFuture.completeExceptionally(new NotAllowedException("Positions can not be null.")); + return; } - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); this.pendingAckStoreFuture.thenAccept(pendingAckStore -> pendingAckStore.appendIndividualAck(txnID, positions).thenAccept(v -> { @@ -289,67 +268,67 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi completableFuture.completeExceptionally(e); return null; }); - return completableFuture; - } - - private void addCumulativeAcknowledgeMessageRequest(TxnID txnID, - List<PositionImpl> positions, - CompletableFuture<Void> completableFuture) { - acceptQueue.add(() -> cumulativeAcknowledgeMessage(txnID, positions, true).thenAccept(v -> - completableFuture.complete(null)).exceptionally(e -> { - completableFuture.completeExceptionally(e); - return null; - })); } @Override - public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, - List<PositionImpl> positions, - boolean isInCacheRequest) { - if (!checkIfReady()) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - synchronized (PendingAckHandleImpl.this) { + public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, + List<MutablePair<PositionImpl, Integer>> positions) { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { switch (state) { case Initializing: - addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture); - return completableFuture; + addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); + return; case None: - addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture); + addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); initPendingAckStore(); - return completableFuture; + return; case Error: completableFuture.completeExceptionally( new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); - return completableFuture; + return; case Close: completableFuture.completeExceptionally( new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); - return completableFuture; + return; default: break; - } } - } + internalIndividualAcknowledgeMessage(txnID, positions, completableFuture); + }); + return completableFuture; + } + + private void addCumulativeAcknowledgeMessageRequest(TxnID txnID, + List<PositionImpl> positions, + CompletableFuture<Void> completableFuture) { + acceptQueue.add(() -> internalCumulativeAcknowledgeMessage(txnID, positions, completableFuture)); + } + public void internalCumulativeAcknowledgeMessage(TxnID txnID, + List<PositionImpl> positions, + CompletableFuture<Void> completableFuture) { if (txnID == null) { - return FutureUtil.failedFuture(new NotAllowedException("TransactionID can not be null.")); + completableFuture.completeExceptionally(new NotAllowedException("TransactionID can not be null.")); + return; } if (positions == null) { - return FutureUtil.failedFuture(new NotAllowedException("Positions can not be null.")); + completableFuture.completeExceptionally(new NotAllowedException("Positions can not be null.")); + return; } if (positions.size() != 1) { String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnID + " invalid cumulative ack received with multiple message ids."; log.error(errorMsg); - return FutureUtil.failedFuture(new NotAllowedException(errorMsg)); + completableFuture.completeExceptionally(new NotAllowedException(errorMsg)); + return; } PositionImpl position = positions.get(0); - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - this.pendingAckStoreFuture.thenAccept(pendingAckStore -> pendingAckStore.appendCumulativeAck(txnID, position).thenAccept(v -> { if (log.isDebugEnabled()) { @@ -392,59 +371,46 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi completableFuture.completeExceptionally(e); return null; }); - return completableFuture; - } - - private void addCommitTxnRequest(TxnID txnId, Map<String, Long> properties, long lowWaterMark, - CompletableFuture<Void> completableFuture) { - acceptQueue.add(() -> commitTxn(txnId, properties, lowWaterMark, true).thenAccept(v -> - completableFuture.complete(null)).exceptionally(e -> { - completableFuture.completeExceptionally(e); - return null; - })); } @Override - public synchronized CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, - long lowWaterMark, boolean isInCacheRequest) { - if (!checkIfReady()) { - synchronized (PendingAckHandleImpl.this) { - if (state == State.Initializing) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - addCommitTxnRequest(txnID, properties, lowWaterMark, completableFuture); - return completableFuture; - } else if (state == State.None) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - addCommitTxnRequest(txnID, properties, lowWaterMark, completableFuture); - initPendingAckStore(); - return completableFuture; - } else if (checkIfReady()) { - - } else { - if (state == State.Error) { - return FutureUtil.failedFuture( + public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions) { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { + switch (state) { + case Initializing: + addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture); + return; + case None: + addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture); + initPendingAckStore(); + return; + case Error: + completableFuture.completeExceptionally( new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); - } else { - return FutureUtil.failedFuture( + return; + case Close: + completableFuture.completeExceptionally( new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); - } - + return; + default: + break; } } - } + internalCumulativeAcknowledgeMessage(txnID, positions, completableFuture); + }); - if (!acceptQueue.isEmpty() && !isInCacheRequest) { - synchronized (PendingAckHandleImpl.this) { - if (!acceptQueue.isEmpty()) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - addCommitTxnRequest(txnID, properties, lowWaterMark, completableFuture); - return completableFuture; - } - } - } + return completableFuture; + } - CompletableFuture<Void> commitFuture = new CompletableFuture<>(); + private void addCommitTxnRequest(TxnID txnId, Map<String, Long> properties, long lowWaterMark, + CompletableFuture<Void> completableFuture) { + acceptQueue.add(() -> internalCommitTxn(txnId, properties, lowWaterMark, completableFuture)); + } + private void internalCommitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark, + CompletableFuture<Void> commitFuture) { // It's valid to create transaction then commit without doing any operation, which will cause // pendingAckMessagesMap to be null. if (this.cumulativeAckOfTransaction != null) { @@ -500,58 +466,44 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi return null; }); } - return commitFuture; - } - - private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long lowWaterMark, - CompletableFuture<Void> completableFuture) { - acceptQueue.add(() -> abortTxn(txnId, consumer, lowWaterMark, true).thenAccept(v -> - completableFuture.complete(null)).exceptionally(e -> { - completableFuture.completeExceptionally(e); - return null; - })); } @Override - public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, - long lowWaterMark, boolean isInCacheRequest) { - if (!checkIfReady()) { - synchronized (PendingAckHandleImpl.this) { - if (state == State.Initializing) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - addAbortTxnRequest(txnId, consumer, lowWaterMark, completableFuture); - return completableFuture; - } else if (state == State.None) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - addAbortTxnRequest(txnId, consumer, lowWaterMark, completableFuture); - initPendingAckStore(); - return completableFuture; - } else if (checkIfReady()) { - - } else { - if (state == State.Error) { - return FutureUtil.failedFuture( - new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); - } else { - return FutureUtil.failedFuture( - new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); - } + public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark) { + CompletableFuture<Void> commitFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { + switch (state) { + case Initializing: + addCommitTxnRequest(txnID, properties, lowWaterMark, commitFuture); + return; + case None: + addCommitTxnRequest(txnID, properties, lowWaterMark, commitFuture); + initPendingAckStore(); + return; + case Error: + if (state == State.Error) { + commitFuture.completeExceptionally( + new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); + } else { + commitFuture.completeExceptionally( + new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); + } + return; } } - } - + internalCommitTxn(txnID, properties, lowWaterMark, commitFuture); + }); + return commitFuture; + } - if (!acceptQueue.isEmpty() && !isInCacheRequest) { - synchronized (PendingAckHandleImpl.this) { - if (!acceptQueue.isEmpty()) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - addAbortTxnRequest(txnId, consumer, lowWaterMark, completableFuture); - return completableFuture; - } - } - } + private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long lowWaterMark, + CompletableFuture<Void> completableFuture) { + acceptQueue.add(() -> internalAbortTxn(txnId, consumer, lowWaterMark, completableFuture)); + } - CompletableFuture<Void> abortFuture = new CompletableFuture<>(); + public CompletableFuture<Void> internalAbortTxn(TxnID txnId, Consumer consumer, + long lowWaterMark, CompletableFuture<Void> abortFuture) { if (this.cumulativeAckOfTransaction != null) { pendingAckStoreFuture.thenAccept(pendingAckStore -> pendingAckStore.appendAbortMark(txnId, AckType.Cumulative).thenAccept(v -> { @@ -611,6 +563,35 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi return abortFuture; } + @Override + public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark) { + CompletableFuture<Void> abortFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { + switch (state) { + case Initializing: + addAbortTxnRequest(txnId, consumer, lowWaterMark, abortFuture); + return; + case None: + addAbortTxnRequest(txnId, consumer, lowWaterMark, abortFuture); + initPendingAckStore(); + return; + default: + if (state == State.Error) { + abortFuture.completeExceptionally( + new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); + } else { + abortFuture.completeExceptionally( + new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); + } + return; + } + } + internalAbortTxn(txnId, consumer, lowWaterMark, abortFuture); + }); + return abortFuture; + } + private void handleLowWaterMark(TxnID txnID, long lowWaterMark) { if (individualAckOfTransaction != null && !individualAckOfTransaction.isEmpty()) { TxnID firstTxn = individualAckOfTransaction.firstKey(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index a76c637..b72fe76 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -56,7 +56,6 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; @@ -264,7 +263,7 @@ public class PersistentSubscriptionTest { } @Test - public void testCanAcknowledgeAndAbortForTransaction() throws BrokerServiceException, InterruptedException { + public void testCanAcknowledgeAndAbortForTransaction() throws Exception { List<MutablePair<PositionImpl, Integer>> positionsPair = new ArrayList<>(); positionsPair.add(new MutablePair<>(new PositionImpl(2, 1), 0)); positionsPair.add(new MutablePair<>(new PositionImpl(2, 3), 0)); @@ -293,7 +292,7 @@ public class PersistentSubscriptionTest { positions.add(new PositionImpl(1, 100)); // Cumulative ack for txn1 - persistentSubscription.transactionCumulativeAcknowledge(txnID1, positions); + persistentSubscription.transactionCumulativeAcknowledge(txnID1, positions).get(); positions.clear(); positions.add(new PositionImpl(2, 1)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 97f8f51d3..affaf45 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -60,6 +60,7 @@ import org.testng.annotations.Test; * Test for consuming transaction messages. */ @Slf4j +@Test(groups = "broker") public class PendingAckPersistentTest extends TransactionTestBase { private static final String PENDING_ACK_REPLAY_TOPIC = NAMESPACE1 + "/pending-ack-replay";