This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e9b951232418188013fcc9671e6d5e67f6ed8e3f
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Wed Jan 26 15:42:29 2022 +0800

    [Transaction] PendingAckHandleImpl handle isInCacheRequest (#13481)
    
    ### Motivation
    When PendingAck is not Ready, requests will be put into the queue first. 
When the PendingAck recovery is completed, the new request and the request in 
the queue are processed at the same time, which may cause the ack to be out of 
order.
    ### Modification
    Use a single thread to handle requests. After the recovery is completed, 
the callback is processed in a single thread, which is mutually exclusive with 
the processing of new requests.
    
    (cherry picked from commit 97f0030ea4651c01227072b3a7522dc9ff198623)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  17 +-
 .../service/persistent/PersistentSubscription.java |   8 +-
 .../buffer/impl/TopicTransactionBuffer.java        |   5 +-
 .../transaction/pendingack/PendingAckHandle.java   |  15 +-
 .../pendingack/impl/MLPendingAckReplyCallBack.java |   6 +-
 .../pendingack/impl/PendingAckHandleDisabled.java  |  12 +-
 .../pendingack/impl/PendingAckHandleImpl.java      | 331 ++++++++++-----------
 .../persistent/PersistentSubscriptionTest.java     |   5 +-
 .../pendingack/PendingAckPersistentTest.java       |   1 +
 9 files changed, 185 insertions(+), 215 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 28ccbb1..79aba81 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -258,7 +258,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private PulsarResources pulsarResources;
 
     private TransactionPendingAckStoreProvider 
transactionPendingAckStoreProvider;
-    private final ScheduledExecutorService transactionReplayExecutor;
+    private final ExecutorProvider transactionExecutorProvider;
 
     public enum State {
         Init, Started, Closing, Closed
@@ -314,11 +314,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 new DefaultThreadFactory("zk-cache-callback"));
 
         if (config.isTransactionCoordinatorEnabled()) {
-            this.transactionReplayExecutor = Executors.newScheduledThreadPool(
-                    config.getNumTransactionReplayThreadPoolSize(),
-                    new DefaultThreadFactory("transaction-replay"));
+            this.transactionExecutorProvider = new 
ExecutorProvider(this.getConfiguration()
+                    .getNumTransactionReplayThreadPoolSize(), 
"pulsar-transaction-executor");
         } else {
-            this.transactionReplayExecutor = null;
+            this.transactionExecutorProvider = null;
         }
 
         this.ioEventLoopGroup = 
EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), 
config.isEnableBusyWait(),
@@ -494,8 +493,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 configurationMetadataStore.close();
             }
 
-            if (transactionReplayExecutor != null) {
-                transactionReplayExecutor.shutdown();
+            if (transactionExecutorProvider != null) {
+                transactionExecutorProvider.shutdownNow();
             }
 
             ioEventLoopGroup.shutdownGracefully();
@@ -1246,8 +1245,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return cacheExecutor;
     }
 
-    public ScheduledExecutorService getTransactionReplayExecutor() {
-        return transactionReplayExecutor;
+    public ExecutorProvider getTransactionExecutorProvider() {
+        return transactionExecutorProvider;
     }
 
     public ScheduledExecutorService getLoadManagerExecutor() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8d75ea7..cd9f462 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -499,11 +499,11 @@ public class PersistentSubscription implements 
Subscription {
     public CompletableFuture<Void> transactionIndividualAcknowledge(
             TxnID txnId,
             List<MutablePair<PositionImpl, Integer>> positions) {
-        return pendingAckHandle.individualAcknowledgeMessage(txnId, positions, 
false);
+        return pendingAckHandle.individualAcknowledgeMessage(txnId, positions);
     }
 
     public CompletableFuture<Void> transactionCumulativeAcknowledge(TxnID 
txnId, List<PositionImpl> positions) {
-        return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions, 
false);
+        return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions);
     }
 
     private final MarkDeleteCallback markDeleteCallback = new 
MarkDeleteCallback() {
@@ -1173,14 +1173,14 @@ public class PersistentSubscription implements 
Subscription {
     public CompletableFuture<Void> endTxn(long txnidMostBits, long 
txnidLeastBits, int txnAction, long lowWaterMark) {
         TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
         if (TxnAction.COMMIT.getValue() == txnAction) {
-            return pendingAckHandle.commitTxn(txnID, Collections.emptyMap(), 
lowWaterMark, false);
+            return pendingAckHandle.commitTxn(txnID, Collections.emptyMap(), 
lowWaterMark);
         } else if (TxnAction.ABORT.getValue() == txnAction) {
             Consumer redeliverConsumer = null;
             if (getDispatcher() instanceof 
PersistentDispatcherSingleActiveConsumer) {
                 redeliverConsumer = ((PersistentDispatcherSingleActiveConsumer)
                         getDispatcher()).getActiveConsumer();
             }
-            return pendingAckHandle.abortTxn(txnID, redeliverConsumer, 
lowWaterMark, false);
+            return pendingAckHandle.abortTxn(txnID, redeliverConsumer, 
lowWaterMark);
         } else {
             return FutureUtil.failedFuture(new 
NotAllowedException("Unsupported txnAction " + txnAction));
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 9a324fb..12818b7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -104,7 +104,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
                 
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
         this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
-        
this.topic.getBrokerService().getPulsar().getTransactionReplayExecutor()
+        
this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this)
                 .execute(new TopicTransactionBufferRecover(new 
TopicTransactionBufferRecoverCallBack() {
                     @Override
                     public void recoverComplete() {
@@ -607,7 +607,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
                 closeCursor(managedCursor);
                 callBack.recoverComplete();
-            }, 
topic.getBrokerService().getPulsar().getTransactionReplayExecutor()).exceptionally(e
 -> {
+            }, 
topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this))
+                    .exceptionally(e -> {
                 callBack.recoverExceptionally(new Exception(e));
                 log.error("[{}]Transaction buffer new snapshot reader fail!", 
topic.getName(), e);
                 return null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index dc64cbe..620db5c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -50,15 +50,13 @@ public interface PendingAckHandle {
      *
      * @param txnID                  {@link TxnID} TransactionID of an ongoing 
transaction trying to sck message.
      * @param positions              {@link MutablePair} the pair of positions 
and these batch size.
-     * @param isInCacheRequest       {@link Boolean} the boolean of the 
request in cache whether or not.
      * @return the future of this operation.
      * @throws TransactionConflictException if the ack with transaction is 
conflict with pending ack.
      * @throws NotAllowedException if Use this method incorrectly eg. not use
      * PositionImpl or cumulative ack with a list of positions.
      */
     CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, 
List<MutablePair<PositionImpl,
-            Integer>> positions, boolean isInCacheRequest);
-
+            Integer>> positions);
     /**
      * Acknowledge message(s) for an ongoing transaction.
      * <p>
@@ -76,14 +74,12 @@ public interface PendingAckHandle {
      *
      * @param txnID                  {@link TxnID} TransactionID of an ongoing 
transaction trying to sck message.
      * @param positions              {@link MutablePair} the pair of positions 
and these batch size.
-     * @param isInCacheRequest       {@link Boolean} the boolean of the 
request in cache whether or not.
      * @return the future of this operation.
      * @throws TransactionConflictException if the ack with transaction is 
conflict with pending ack.
      * @throws NotAllowedException if Use this method incorrectly eg. not use
      * PositionImpl or cumulative ack with a list of positions.
      */
-    CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, 
List<PositionImpl> positions,
-                                                         boolean 
isInCacheRequest);
+    CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, 
List<PositionImpl> positions);
 
     /**
      * Commit a transaction.
@@ -92,11 +88,9 @@ public interface PendingAckHandle {
      * @param properties Additional user-defined properties that can be
      *                   associated with a particular cursor position.
      * @param lowWaterMark the low water mark of this transaction
-     * @param isInCacheRequest       {@link Boolean} the boolean of the 
request in cache whether or not.
      * @return the future of this operation.
      */
-    CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> 
properties,
-                                      long lowWaterMark, boolean 
isInCacheRequest);
+    CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> 
properties, long lowWaterMark);
 
     /**
      * Abort a transaction.
@@ -104,10 +98,9 @@ public interface PendingAckHandle {
      * @param txnId  {@link TxnID} to identify the transaction.
      * @param consumer {@link Consumer} which aborting transaction.
      * @param lowWaterMark the low water mark of this transaction
-     * @param isInCacheRequest       {@link Boolean} the boolean of the 
request in cache whether or not.
      * @return the future of this operation.
      */
-    CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long 
lowWaterMark, boolean isInCacheRequest);
+    CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long 
lowWaterMark);
 
     /**
      * Sync the position ack set, in order to clean up the cache of this 
position for pending ack handle.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
index b98e64e..728e7f0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
@@ -44,7 +44,7 @@ public class MLPendingAckReplyCallBack implements 
PendingAckReplyCallBack {
 
     @Override
     public void replayComplete() {
-        synchronized (pendingAckHandle) {
+        pendingAckHandle.getInternalPinnedExecutor().execute(() -> {
             log.info("Topic name : [{}], SubName : [{}] pending ack state 
reply success!",
                     pendingAckHandle.getTopicName(), 
pendingAckHandle.getSubName());
 
@@ -56,8 +56,8 @@ public class MLPendingAckReplyCallBack implements 
PendingAckReplyCallBack {
                 log.error("Topic name : [{}], SubName : [{}] pending ack state 
reply fail!",
                         pendingAckHandle.getTopicName(), 
pendingAckHandle.getSubName());
             }
-        }
-        pendingAckHandle.handleCacheRequest();
+            pendingAckHandle.handleCacheRequest();
+        });
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index 634655e..1190727 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -42,26 +42,22 @@ public class PendingAckHandleDisabled implements 
PendingAckHandle {
 
     @Override
     public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-                                                                
List<MutablePair<PositionImpl, Integer>> positions,
-                                                                boolean 
isInCacheRequest) {
+                                                                
List<MutablePair<PositionImpl, Integer>> positions) {
         return FutureUtil.failedFuture(new NotAllowedException("The 
transaction is disabled"));
     }
 
     @Override
-    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, 
List<PositionImpl> positions,
-                                                                boolean 
isInCacheRequest) {
+    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, 
List<PositionImpl> positions) {
         return FutureUtil.failedFuture(new NotAllowedException("The 
transaction is disabled"));
     }
 
     @Override
-    public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> 
properties, long lowWaterMark,
-                                             boolean isInCacheRequest) {
+    public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> 
properties, long lowWaterMark) {
         return FutureUtil.failedFuture(new NotAllowedException("The 
transaction is disabled"));
     }
 
     @Override
-    public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, 
long lowWaterMark,
-                                            boolean isInCacheRequest) {
+    public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, 
long lowWaterMark) {
         return FutureUtil.failedFuture(new NotAllowedException("The 
transaction is disabled"));
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index ba74d78..1c0b10f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -29,7 +29,10 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
@@ -42,7 +45,6 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedExcepti
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
@@ -111,13 +113,23 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
 
     private final BlockingQueue<Runnable> acceptQueue = new 
LinkedBlockingDeque<>();
 
+    @Getter
+    private final ExecutorService internalPinnedExecutor;
+
+
     public PendingAckHandleImpl(PersistentSubscription persistentSubscription) 
{
         super(State.None);
         this.topicName = persistentSubscription.getTopicName();
         this.subName = persistentSubscription.getName();
         this.persistentSubscription = persistentSubscription;
-
-        this.pendingAckStoreProvider = ((PersistentTopic) 
this.persistentSubscription.getTopic())
+        internalPinnedExecutor = persistentSubscription
+                .getTopic()
+                .getBrokerService()
+                .getPulsar()
+                .getTransactionExecutorProvider()
+                .getExecutor(this);
+
+        this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
                         
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
         
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init
 -> {
             if (init) {
@@ -130,22 +142,19 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
 
     private void initPendingAckStore() {
         if (changeToInitializingState()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!checkIfClose()) {
-                    this.pendingAckStoreFuture =
-                            
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                        pendingAckStore.replayAsync(this,
-                                ((PersistentTopic) 
persistentSubscription.getTopic()).getBrokerService()
-                                        
.getPulsar().getTransactionReplayExecutor());
-                    }).exceptionally(e -> {
-                        acceptQueue.clear();
-                        changeToErrorState();
-                        exceptionHandleFuture(e.getCause());
-                        log.error("PendingAckHandleImpl init fail! TopicName : 
{}, SubName: {}", topicName, subName, e);
-                        return null;
-                    });
-                }
+            if (!checkIfClose()) {
+                this.pendingAckStoreFuture =
+                        
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                    pendingAckStore.replayAsync(this,
+                            (ScheduledExecutorService) internalPinnedExecutor);
+                }).exceptionally(e -> {
+                    acceptQueue.clear();
+                    changeToErrorState();
+                    log.error("PendingAckHandleImpl init fail! TopicName : {}, 
SubName: {}", topicName, subName, e);
+                    exceptionHandleFuture(e.getCause());
+                    return null;
+                });
             }
         }
     }
@@ -153,50 +162,20 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
     private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
                                                         
List<MutablePair<PositionImpl, Integer>> positions,
                                                         
CompletableFuture<Void> completableFuture) {
-        acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
+        acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID, 
positions, completableFuture));
     }
 
-    @Override
-    public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-                                                                
List<MutablePair<PositionImpl, Integer>> positions,
-                                                                boolean 
isInCacheRequest) {
-
-        if (!checkIfReady()) {
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            synchronized (PendingAckHandleImpl.this) {
-                switch (state) {
-                    case Initializing:
-                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        return completableFuture;
-                    case None:
-                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        initPendingAckStore();
-                        return completableFuture;
-                    case Error:
-                        completableFuture.completeExceptionally(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                        return completableFuture;
-                    case Close:
-                        completableFuture.completeExceptionally(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                        return completableFuture;
-                    default:
-                        break;
-                }
-            }
-        }
-
+    public void internalIndividualAcknowledgeMessage(TxnID txnID, 
List<MutablePair<PositionImpl, Integer>> positions,
+                                                     CompletableFuture<Void> 
completableFuture) {
         if (txnID == null) {
-            return FutureUtil.failedFuture(new 
NotAllowedException("TransactionID can not be null."));
+            completableFuture.completeExceptionally(new 
NotAllowedException("Positions can not be null."));
+            return;
+
         }
         if (positions == null) {
-            return FutureUtil.failedFuture(new NotAllowedException("Positions 
can not be null."));
+            completableFuture.completeExceptionally(new 
NotAllowedException("Positions can not be null."));
+            return;
         }
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
 
         this.pendingAckStoreFuture.thenAccept(pendingAckStore ->
                 pendingAckStore.appendIndividualAck(txnID, 
positions).thenAccept(v -> {
@@ -289,67 +268,67 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
             completableFuture.completeExceptionally(e);
             return null;
         });
-        return completableFuture;
-    }
-
-    private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
-                                                        List<PositionImpl> 
positions,
-                                                        
CompletableFuture<Void> completableFuture) {
-        acceptQueue.add(() -> cumulativeAcknowledgeMessage(txnID, positions, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
     }
 
     @Override
-    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
-                                                                
List<PositionImpl> positions,
-                                                                boolean 
isInCacheRequest) {
-        if (!checkIfReady()) {
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            synchronized (PendingAckHandleImpl.this) {
+    public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
+                                                                
List<MutablePair<PositionImpl, Integer>> positions) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
                 switch (state) {
                     case Initializing:
-                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        return completableFuture;
+                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        return;
                     case None:
-                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
                         initPendingAckStore();
-                        return completableFuture;
+                        return;
                     case Error:
                         completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                        return completableFuture;
+                        return;
                     case Close:
                         completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                        return completableFuture;
+                        return;
                     default:
                         break;
-
                 }
             }
-        }
+            internalIndividualAcknowledgeMessage(txnID, positions, 
completableFuture);
+        });
+        return completableFuture;
+    }
+
+    private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
+                                                        List<PositionImpl> 
positions,
+                                                        
CompletableFuture<Void> completableFuture) {
+        acceptQueue.add(() -> internalCumulativeAcknowledgeMessage(txnID, 
positions, completableFuture));
+    }
 
+    public void internalCumulativeAcknowledgeMessage(TxnID txnID,
+                                                     List<PositionImpl> 
positions,
+                                                     CompletableFuture<Void> 
completableFuture) {
         if (txnID == null) {
-            return FutureUtil.failedFuture(new 
NotAllowedException("TransactionID can not be null."));
+            completableFuture.completeExceptionally(new 
NotAllowedException("TransactionID can not be null."));
+            return;
         }
         if (positions == null) {
-            return FutureUtil.failedFuture(new NotAllowedException("Positions 
can not be null."));
+            completableFuture.completeExceptionally(new 
NotAllowedException("Positions can not be null."));
+            return;
         }
 
         if (positions.size() != 1) {
             String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction:" + txnID
                     + " invalid cumulative ack received with multiple message 
ids.";
             log.error(errorMsg);
-            return FutureUtil.failedFuture(new NotAllowedException(errorMsg));
+            completableFuture.completeExceptionally(new 
NotAllowedException(errorMsg));
+            return;
         }
 
         PositionImpl position = positions.get(0);
 
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-
         this.pendingAckStoreFuture.thenAccept(pendingAckStore ->
                 pendingAckStore.appendCumulativeAck(txnID, 
position).thenAccept(v -> {
                     if (log.isDebugEnabled()) {
@@ -392,59 +371,46 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
             completableFuture.completeExceptionally(e);
             return null;
         });
-        return completableFuture;
-    }
-
-    private void addCommitTxnRequest(TxnID txnId, Map<String, Long> 
properties, long lowWaterMark,
-                                    CompletableFuture<Void> completableFuture) 
{
-        acceptQueue.add(() -> commitTxn(txnId, properties, lowWaterMark, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
     }
 
     @Override
-    public synchronized CompletableFuture<Void> commitTxn(TxnID txnID, 
Map<String, Long> properties,
-                                                          long lowWaterMark, 
boolean isInCacheRequest) {
-        if (!checkIfReady()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (state == State.Initializing) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
-                    return completableFuture;
-                } else if (state == State.None) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
-                    initPendingAckStore();
-                    return completableFuture;
-                } else if (checkIfReady()) {
-
-                } else {
-                    if (state == State.Error) {
-                        return FutureUtil.failedFuture(
+    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, 
List<PositionImpl> positions) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                switch (state) {
+                    case Initializing:
+                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        return;
+                    case None:
+                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        initPendingAckStore();
+                        return;
+                    case Error:
+                        completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                    } else {
-                        return FutureUtil.failedFuture(
+                        return;
+                    case Close:
+                        completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                    }
-
+                        return;
+                    default:
+                        break;
                 }
             }
-        }
+            internalCumulativeAcknowledgeMessage(txnID, positions, 
completableFuture);
+        });
 
-        if (!acceptQueue.isEmpty() && !isInCacheRequest) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!acceptQueue.isEmpty()) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
-                    return completableFuture;
-                }
-            }
-        }
+        return completableFuture;
+    }
 
-        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+    private void addCommitTxnRequest(TxnID txnId, Map<String, Long> 
properties, long lowWaterMark,
+                                    CompletableFuture<Void> completableFuture) 
{
+        acceptQueue.add(() -> internalCommitTxn(txnId, properties, 
lowWaterMark, completableFuture));
+    }
 
+    private void internalCommitTxn(TxnID txnID, Map<String, Long> properties, 
long lowWaterMark,
+                                   CompletableFuture<Void> commitFuture) {
         // It's valid to create transaction then commit without doing any 
operation, which will cause
         // pendingAckMessagesMap to be null.
         if (this.cumulativeAckOfTransaction != null) {
@@ -500,58 +466,44 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
                 return null;
             });
         }
-        return commitFuture;
-    }
-
-    private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long 
lowWaterMark,
-                                    CompletableFuture<Void> completableFuture) 
{
-        acceptQueue.add(() -> abortTxn(txnId, consumer, lowWaterMark, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
     }
 
     @Override
-    public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer 
consumer,
-                                                         long lowWaterMark, 
boolean isInCacheRequest) {
-        if (!checkIfReady()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (state == State.Initializing) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addAbortTxnRequest(txnId, consumer, lowWaterMark, 
completableFuture);
-                    return completableFuture;
-                } else if (state == State.None) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addAbortTxnRequest(txnId, consumer, lowWaterMark, 
completableFuture);
-                    initPendingAckStore();
-                    return completableFuture;
-                } else if (checkIfReady()) {
-
-                } else {
-                    if (state == State.Error) {
-                        return FutureUtil.failedFuture(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                    } else {
-                        return FutureUtil.failedFuture(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                    }
+    public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> 
properties, long lowWaterMark) {
+        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                switch (state) {
+                    case Initializing:
+                        addCommitTxnRequest(txnID, properties, lowWaterMark, 
commitFuture);
+                        return;
+                    case None:
+                        addCommitTxnRequest(txnID, properties, lowWaterMark, 
commitFuture);
+                        initPendingAckStore();
+                        return;
+                    case Error:
+                        if (state == State.Error) {
+                            commitFuture.completeExceptionally(
+                                    new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
+                        } else {
+                            commitFuture.completeExceptionally(
+                                    new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
+                        }
+                        return;
                 }
             }
-        }
-
+            internalCommitTxn(txnID, properties, lowWaterMark, commitFuture);
+        });
+        return commitFuture;
+    }
 
-        if (!acceptQueue.isEmpty() && !isInCacheRequest) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!acceptQueue.isEmpty()) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addAbortTxnRequest(txnId, consumer, lowWaterMark, 
completableFuture);
-                    return completableFuture;
-                }
-            }
-        }
+    private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long 
lowWaterMark,
+                                    CompletableFuture<Void> completableFuture) 
{
+        acceptQueue.add(() -> internalAbortTxn(txnId, consumer, lowWaterMark, 
completableFuture));
+    }
 
-        CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+    public CompletableFuture<Void> internalAbortTxn(TxnID txnId, Consumer 
consumer,
+                                 long lowWaterMark, CompletableFuture<Void> 
abortFuture) {
         if (this.cumulativeAckOfTransaction != null) {
             pendingAckStoreFuture.thenAccept(pendingAckStore ->
                     pendingAckStore.appendAbortMark(txnId, 
AckType.Cumulative).thenAccept(v -> {
@@ -611,6 +563,35 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
         return abortFuture;
     }
 
+    @Override
+    public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, 
long lowWaterMark) {
+        CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                switch (state) {
+                    case Initializing:
+                        addAbortTxnRequest(txnId, consumer, lowWaterMark, 
abortFuture);
+                        return;
+                    case None:
+                        addAbortTxnRequest(txnId, consumer, lowWaterMark, 
abortFuture);
+                        initPendingAckStore();
+                        return;
+                    default:
+                        if (state == State.Error) {
+                            abortFuture.completeExceptionally(
+                                    new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
+                        } else {
+                            abortFuture.completeExceptionally(
+                                    new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
+                        }
+                        return;
+                }
+            }
+            internalAbortTxn(txnId, consumer, lowWaterMark, abortFuture);
+        });
+        return abortFuture;
+    }
+
     private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
         if (individualAckOfTransaction != null && 
!individualAckOfTransaction.isEmpty()) {
             TxnID firstTxn = individualAckOfTransaction.firstKey();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index a76c637..b72fe76 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -56,7 +56,6 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
@@ -264,7 +263,7 @@ public class PersistentSubscriptionTest {
     }
 
     @Test
-    public void testCanAcknowledgeAndAbortForTransaction() throws 
BrokerServiceException, InterruptedException {
+    public void testCanAcknowledgeAndAbortForTransaction() throws Exception {
         List<MutablePair<PositionImpl, Integer>> positionsPair = new 
ArrayList<>();
         positionsPair.add(new MutablePair<>(new PositionImpl(2, 1), 0));
         positionsPair.add(new MutablePair<>(new PositionImpl(2, 3), 0));
@@ -293,7 +292,7 @@ public class PersistentSubscriptionTest {
         positions.add(new PositionImpl(1, 100));
 
         // Cumulative ack for txn1
-        persistentSubscription.transactionCumulativeAcknowledge(txnID1, 
positions);
+        persistentSubscription.transactionCumulativeAcknowledge(txnID1, 
positions).get();
 
         positions.clear();
         positions.add(new PositionImpl(2, 1));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 97f8f51d3..affaf45 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -60,6 +60,7 @@ import org.testng.annotations.Test;
  * Test for consuming transaction messages.
  */
 @Slf4j
+@Test(groups = "broker")
 public class PendingAckPersistentTest extends TransactionTestBase {
 
     private static final String PENDING_ACK_REPLAY_TOPIC = NAMESPACE1 + 
"/pending-ack-replay";

Reply via email to