congbobo184 commented on a change in pull request #13481:
URL: https://github.com/apache/pulsar/pull/13481#discussion_r784461740



##########
File path: 
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -2324,13 +2324,6 @@
     private String transactionPendingAckStoreProviderClassName =
             
"org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider";
 
-    @FieldContext(
-            category = CATEGORY_TRANSACTION,
-            doc = "Number of threads to use for pulsar transaction replay 
PendingAckStore or TransactionBuffer."
-                    + "Default is 5"
-    )
-    private int numTransactionReplayThreadPoolSize = 
Runtime.getRuntime().availableProcessors();

Review comment:
       why delete this?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -105,7 +105,7 @@ public TopicTransactionBuffer(PersistentTopic topic) {
         this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
                 
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
         this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
-        
this.topic.getBrokerService().getPulsar().getTransactionReplayExecutor()
+        
this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor()

Review comment:
       
this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this)

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -126,76 +133,51 @@ public PendingAckHandleImpl(PersistentSubscription 
persistentSubscription) {
                 completeHandleFuture();
             }
         });
+
+        internalPinnedExecutor = persistentSubscription
+                .getTopic()
+                .getBrokerService()
+                .getPulsar()
+                .getTransactionExecutorProvider()
+                .getExecutor();

Review comment:
       .getExecutor(this)

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -610,6 +570,37 @@ private void addAbortTxnRequest(TxnID txnId, Consumer 
consumer, long lowWaterMar
         return abortFuture;
     }
 
+    @Override
+    public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer,
+                                                         long lowWaterMark, 
boolean isInCacheRequest) {
+        CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                if (state == State.Initializing) {
+                    addAbortTxnRequest(txnId, consumer, lowWaterMark, 
abortFuture);
+                    return;
+                } else if (state == State.None) {
+                    addAbortTxnRequest(txnId, consumer, lowWaterMark, 
abortFuture);
+                    initPendingAckStore();
+                    return;
+                } else if (checkIfReady()) {

Review comment:
       we don't need to checkIfReady, because change to ready state use the 
same thread with abortTxn. like commitTxn or individual ack or cumulative ack.

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
##########
@@ -130,7 +130,6 @@ private ServiceConfiguration getConf() {
         conf.setManagedLedgerNumSchedulerThreads(1);
         conf.setManagedLedgerNumWorkerThreads(1);
         conf.setWebSocketNumIoThreads(1);
-        conf.setNumTransactionReplayThreadPoolSize(1);

Review comment:
       don't delete this

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -126,76 +133,51 @@ public PendingAckHandleImpl(PersistentSubscription 
persistentSubscription) {
                 completeHandleFuture();
             }
         });
+
+        internalPinnedExecutor = persistentSubscription
+                .getTopic()
+                .getBrokerService()
+                .getPulsar()
+                .getTransactionExecutorProvider()
+                .getExecutor();
     }
 
     private void initPendingAckStore() {
         if (changeToInitializingState()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!checkIfClose()) {
-                    this.pendingAckStoreFuture =
-                            
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                        pendingAckStore.replayAsync(this,
-                                ((PersistentTopic) 
persistentSubscription.getTopic()).getBrokerService()
-                                        
.getPulsar().getTransactionReplayExecutor());
-                    }).exceptionally(e -> {
-                        acceptQueue.clear();
-                        changeToErrorState();
-                        log.error("PendingAckHandleImpl init fail! TopicName : 
{}, SubName: {}", topicName, subName, e);
-                        return null;
-                    });
-                }
+            if (!checkIfClose()) {
+                this.pendingAckStoreFuture =
+                        
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                    pendingAckStore.replayAsync(this,
+                            (ScheduledExecutorService) 
persistentSubscription.getTopic().getBrokerService()
+                                    
.getPulsar().getTransactionExecutorProvider().getExecutor(this));

Review comment:
       use local variable internalPinnedExecutor

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -126,76 +133,51 @@ public PendingAckHandleImpl(PersistentSubscription 
persistentSubscription) {
                 completeHandleFuture();
             }
         });
+
+        internalPinnedExecutor = persistentSubscription
+                .getTopic()
+                .getBrokerService()
+                .getPulsar()
+                .getTransactionExecutorProvider()
+                .getExecutor();
     }
 
     private void initPendingAckStore() {
         if (changeToInitializingState()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!checkIfClose()) {
-                    this.pendingAckStoreFuture =
-                            
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                        pendingAckStore.replayAsync(this,
-                                ((PersistentTopic) 
persistentSubscription.getTopic()).getBrokerService()
-                                        
.getPulsar().getTransactionReplayExecutor());
-                    }).exceptionally(e -> {
-                        acceptQueue.clear();
-                        changeToErrorState();
-                        log.error("PendingAckHandleImpl init fail! TopicName : 
{}, SubName: {}", topicName, subName, e);
-                        return null;
-                    });
-                }
+            if (!checkIfClose()) {
+                this.pendingAckStoreFuture =
+                        
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                    pendingAckStore.replayAsync(this,
+                            (ScheduledExecutorService) 
persistentSubscription.getTopic().getBrokerService()
+                                    
.getPulsar().getTransactionExecutorProvider().getExecutor(this));
+                }).exceptionally(e -> {
+                    acceptQueue.clear();
+                    changeToErrorState();
+                    log.error("PendingAckHandleImpl init fail! TopicName : {}, 
SubName: {}", topicName, subName, e);
+                    return null;
+                });
             }
         }
     }
 
     private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
                                                         
List<MutablePair<PositionImpl, Integer>> positions,
                                                         
CompletableFuture<Void> completableFuture) {
-        acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
+        acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID, 
positions, completableFuture));
     }
 
-    @Override
-    public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-                                                                
List<MutablePair<PositionImpl, Integer>> positions,
-                                                                boolean 
isInCacheRequest) {
-
-        if (!checkIfReady()) {
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            synchronized (PendingAckHandleImpl.this) {
-                switch (state) {
-                    case Initializing:
-                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        return completableFuture;
-                    case None:
-                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        initPendingAckStore();
-                        return completableFuture;
-                    case Error:
-                        completableFuture.completeExceptionally(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                        return completableFuture;
-                    case Close:
-                        completableFuture.completeExceptionally(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                        return completableFuture;
-                    default:
-                        break;
-                }
-            }
-        }
-
+    public CompletableFuture<Void> internalIndividualAcknowledgeMessage(TxnID 
txnID,

Review comment:
       don't need return future

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -394,56 +375,46 @@ private void addCumulativeAcknowledgeMessageRequest(TxnID 
txnID,
         return completableFuture;
     }
 
-    private void addCommitTxnRequest(TxnID txnId, Map<String, Long> 
properties, long lowWaterMark,
-                                    CompletableFuture<Void> completableFuture) 
{
-        acceptQueue.add(() -> commitTxn(txnId, properties, lowWaterMark, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
-    }
-
     @Override
-    public synchronized CompletableFuture<Void> commitTxn(TxnID txnID, 
Map<String, Long> properties,
-                                                          long lowWaterMark, 
boolean isInCacheRequest) {
-        if (!checkIfReady()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (state == State.Initializing) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
-                    return completableFuture;
-                } else if (state == State.None) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
-                    initPendingAckStore();
-                    return completableFuture;
-                } else if (checkIfReady()) {
-
-                } else {
-                    if (state == State.Error) {
-                        return FutureUtil.failedFuture(
+    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
+                                                                
List<PositionImpl> positions,
+                                                                boolean 
isInCacheRequest) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                switch (state) {
+                    case Initializing:
+                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        return;
+                    case None:
+                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        initPendingAckStore();
+                        return;
+                    case Error:
+                        completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                    } else {
-                        return FutureUtil.failedFuture(
+                        return;
+                    case Close:
+                        completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                    }
-
+                        return;
+                    default:
+                        break;
                 }
             }
-        }
+            internalCumulativeAcknowledgeMessage(txnID, positions, 
completableFuture);
+        });
 
-        if (!acceptQueue.isEmpty() && !isInCacheRequest) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!acceptQueue.isEmpty()) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
-                    return completableFuture;
-                }
-            }
-        }
+        return completableFuture;
+    }
 
-        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+    private void addCommitTxnRequest(TxnID txnId, Map<String, Long> 
properties, long lowWaterMark,
+                                    CompletableFuture<Void> completableFuture) 
{
+        acceptQueue.add(() -> internalCommitTxn(txnId, properties, 
lowWaterMark, completableFuture));
+    }
 
+    private CompletableFuture<Void> internalCommitTxn(TxnID txnID, Map<String, 
Long> properties, long lowWaterMark,

Review comment:
       don't need return future

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -126,76 +133,51 @@ public PendingAckHandleImpl(PersistentSubscription 
persistentSubscription) {
                 completeHandleFuture();
             }
         });
+
+        internalPinnedExecutor = persistentSubscription
+                .getTopic()
+                .getBrokerService()
+                .getPulsar()
+                .getTransactionExecutorProvider()
+                .getExecutor();
     }
 
     private void initPendingAckStore() {
         if (changeToInitializingState()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!checkIfClose()) {
-                    this.pendingAckStoreFuture =
-                            
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                        pendingAckStore.replayAsync(this,
-                                ((PersistentTopic) 
persistentSubscription.getTopic()).getBrokerService()
-                                        
.getPulsar().getTransactionReplayExecutor());
-                    }).exceptionally(e -> {
-                        acceptQueue.clear();
-                        changeToErrorState();
-                        log.error("PendingAckHandleImpl init fail! TopicName : 
{}, SubName: {}", topicName, subName, e);
-                        return null;
-                    });
-                }
+            if (!checkIfClose()) {
+                this.pendingAckStoreFuture =
+                        
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                    pendingAckStore.replayAsync(this,
+                            (ScheduledExecutorService) 
persistentSubscription.getTopic().getBrokerService()
+                                    
.getPulsar().getTransactionExecutorProvider().getExecutor(this));
+                }).exceptionally(e -> {
+                    acceptQueue.clear();
+                    changeToErrorState();
+                    log.error("PendingAckHandleImpl init fail! TopicName : {}, 
SubName: {}", topicName, subName, e);
+                    return null;
+                });
             }
         }
     }
 
     private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
                                                         
List<MutablePair<PositionImpl, Integer>> positions,
                                                         
CompletableFuture<Void> completableFuture) {
-        acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
+        acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID, 
positions, completableFuture));
     }
 
-    @Override
-    public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-                                                                
List<MutablePair<PositionImpl, Integer>> positions,
-                                                                boolean 
isInCacheRequest) {
-
-        if (!checkIfReady()) {
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            synchronized (PendingAckHandleImpl.this) {
-                switch (state) {
-                    case Initializing:
-                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        return completableFuture;
-                    case None:
-                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        initPendingAckStore();
-                        return completableFuture;
-                    case Error:
-                        completableFuture.completeExceptionally(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                        return completableFuture;
-                    case Close:
-                        completableFuture.completeExceptionally(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                        return completableFuture;
-                    default:
-                        break;
-                }
-            }
-        }
-
+    public CompletableFuture<Void> internalIndividualAcknowledgeMessage(TxnID 
txnID,
+                                                     
List<MutablePair<PositionImpl, Integer>>
+                                                                               
  positions,

Review comment:
       code style

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -291,46 +273,47 @@ private void addIndividualAcknowledgeMessageRequest(TxnID 
txnID,
         return completableFuture;
     }
 
-    private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
-                                                        List<PositionImpl> 
positions,
-                                                        
CompletableFuture<Void> completableFuture) {
-        acceptQueue.add(() -> cumulativeAcknowledgeMessage(txnID, positions, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
-    }
-
     @Override
-    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
-                                                                
List<PositionImpl> positions,
+    public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
+                                                                
List<MutablePair<PositionImpl, Integer>> positions,
                                                                 boolean 
isInCacheRequest) {
-        if (!checkIfReady()) {
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            synchronized (PendingAckHandleImpl.this) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
                 switch (state) {
                     case Initializing:
-                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        return completableFuture;
+                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        return;
                     case None:
-                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
                         initPendingAckStore();
-                        return completableFuture;
+                        return;
                     case Error:
                         completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                        return completableFuture;
+                        return;
                     case Close:
                         completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                        return completableFuture;
+                        return;
                     default:
                         break;
-
                 }
             }
-        }
+            internalIndividualAcknowledgeMessage(txnID, positions, 
completableFuture);
+        });
+        return completableFuture;
+    }
 
+    private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
+                                                        List<PositionImpl> 
positions,
+                                                        
CompletableFuture<Void> completableFuture) {
+        acceptQueue.add(() -> internalCumulativeAcknowledgeMessage(txnID, 
positions, completableFuture));
+    }
+
+    public CompletableFuture<Void> internalCumulativeAcknowledgeMessage(TxnID 
txnID,

Review comment:
       don't need return future




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to