congbobo184 commented on a change in pull request #13481: URL: https://github.com/apache/pulsar/pull/13481#discussion_r784461740
########## File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java ########## @@ -2324,13 +2324,6 @@ private String transactionPendingAckStoreProviderClassName = "org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider"; - @FieldContext( - category = CATEGORY_TRANSACTION, - doc = "Number of threads to use for pulsar transaction replay PendingAckStore or TransactionBuffer." - + "Default is 5" - ) - private int numTransactionReplayThreadPoolSize = Runtime.getRuntime().availableProcessors(); Review comment: why delete this? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java ########## @@ -105,7 +105,7 @@ public TopicTransactionBuffer(PersistentTopic topic) { this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar() .getConfiguration().getTransactionBufferSnapshotMinTimeInMillis(); this.maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); - this.topic.getBrokerService().getPulsar().getTransactionReplayExecutor() + this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor() Review comment: this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this) ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -126,76 +133,51 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) { completeHandleFuture(); } }); + + internalPinnedExecutor = persistentSubscription + .getTopic() + .getBrokerService() + .getPulsar() + .getTransactionExecutorProvider() + .getExecutor(); Review comment: .getExecutor(this) ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -610,6 +570,37 @@ private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long lowWaterMar return abortFuture; } + @Override + public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, + long lowWaterMark, boolean isInCacheRequest) { + CompletableFuture<Void> abortFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { + if (state == State.Initializing) { + addAbortTxnRequest(txnId, consumer, lowWaterMark, abortFuture); + return; + } else if (state == State.None) { + addAbortTxnRequest(txnId, consumer, lowWaterMark, abortFuture); + initPendingAckStore(); + return; + } else if (checkIfReady()) { Review comment: we don't need to checkIfReady, because change to ready state use the same thread with abortTxn. like commitTxn or individual ack or cumulative ack. ########## File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java ########## @@ -130,7 +130,6 @@ private ServiceConfiguration getConf() { conf.setManagedLedgerNumSchedulerThreads(1); conf.setManagedLedgerNumWorkerThreads(1); conf.setWebSocketNumIoThreads(1); - conf.setNumTransactionReplayThreadPoolSize(1); Review comment: don't delete this ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -126,76 +133,51 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) { completeHandleFuture(); } }); + + internalPinnedExecutor = persistentSubscription + .getTopic() + .getBrokerService() + .getPulsar() + .getTransactionExecutorProvider() + .getExecutor(); } private void initPendingAckStore() { if (changeToInitializingState()) { - synchronized (PendingAckHandleImpl.this) { - if (!checkIfClose()) { - this.pendingAckStoreFuture = - pendingAckStoreProvider.newPendingAckStore(persistentSubscription); - this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { - pendingAckStore.replayAsync(this, - ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService() - .getPulsar().getTransactionReplayExecutor()); - }).exceptionally(e -> { - acceptQueue.clear(); - changeToErrorState(); - log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e); - return null; - }); - } + if (!checkIfClose()) { + this.pendingAckStoreFuture = + pendingAckStoreProvider.newPendingAckStore(persistentSubscription); + this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { + pendingAckStore.replayAsync(this, + (ScheduledExecutorService) persistentSubscription.getTopic().getBrokerService() + .getPulsar().getTransactionExecutorProvider().getExecutor(this)); Review comment: use local variable internalPinnedExecutor ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -126,76 +133,51 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) { completeHandleFuture(); } }); + + internalPinnedExecutor = persistentSubscription + .getTopic() + .getBrokerService() + .getPulsar() + .getTransactionExecutorProvider() + .getExecutor(); } private void initPendingAckStore() { if (changeToInitializingState()) { - synchronized (PendingAckHandleImpl.this) { - if (!checkIfClose()) { - this.pendingAckStoreFuture = - pendingAckStoreProvider.newPendingAckStore(persistentSubscription); - this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { - pendingAckStore.replayAsync(this, - ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService() - .getPulsar().getTransactionReplayExecutor()); - }).exceptionally(e -> { - acceptQueue.clear(); - changeToErrorState(); - log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e); - return null; - }); - } + if (!checkIfClose()) { + this.pendingAckStoreFuture = + pendingAckStoreProvider.newPendingAckStore(persistentSubscription); + this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { + pendingAckStore.replayAsync(this, + (ScheduledExecutorService) persistentSubscription.getTopic().getBrokerService() + .getPulsar().getTransactionExecutorProvider().getExecutor(this)); + }).exceptionally(e -> { + acceptQueue.clear(); + changeToErrorState(); + log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e); + return null; + }); } } } private void addIndividualAcknowledgeMessageRequest(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions, CompletableFuture<Void> completableFuture) { - acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, true).thenAccept(v -> - completableFuture.complete(null)).exceptionally(e -> { - completableFuture.completeExceptionally(e); - return null; - })); + acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID, positions, completableFuture)); } - @Override - public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, - List<MutablePair<PositionImpl, Integer>> positions, - boolean isInCacheRequest) { - - if (!checkIfReady()) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - synchronized (PendingAckHandleImpl.this) { - switch (state) { - case Initializing: - addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); - return completableFuture; - case None: - addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); - initPendingAckStore(); - return completableFuture; - case Error: - completableFuture.completeExceptionally( - new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); - return completableFuture; - case Close: - completableFuture.completeExceptionally( - new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); - return completableFuture; - default: - break; - } - } - } - + public CompletableFuture<Void> internalIndividualAcknowledgeMessage(TxnID txnID, Review comment: don't need return future ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -394,56 +375,46 @@ private void addCumulativeAcknowledgeMessageRequest(TxnID txnID, return completableFuture; } - private void addCommitTxnRequest(TxnID txnId, Map<String, Long> properties, long lowWaterMark, - CompletableFuture<Void> completableFuture) { - acceptQueue.add(() -> commitTxn(txnId, properties, lowWaterMark, true).thenAccept(v -> - completableFuture.complete(null)).exceptionally(e -> { - completableFuture.completeExceptionally(e); - return null; - })); - } - @Override - public synchronized CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, - long lowWaterMark, boolean isInCacheRequest) { - if (!checkIfReady()) { - synchronized (PendingAckHandleImpl.this) { - if (state == State.Initializing) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - addCommitTxnRequest(txnID, properties, lowWaterMark, completableFuture); - return completableFuture; - } else if (state == State.None) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - addCommitTxnRequest(txnID, properties, lowWaterMark, completableFuture); - initPendingAckStore(); - return completableFuture; - } else if (checkIfReady()) { - - } else { - if (state == State.Error) { - return FutureUtil.failedFuture( + public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, + List<PositionImpl> positions, + boolean isInCacheRequest) { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { + switch (state) { + case Initializing: + addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture); + return; + case None: + addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture); + initPendingAckStore(); + return; + case Error: + completableFuture.completeExceptionally( new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); - } else { - return FutureUtil.failedFuture( + return; + case Close: + completableFuture.completeExceptionally( new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); - } - + return; + default: + break; } } - } + internalCumulativeAcknowledgeMessage(txnID, positions, completableFuture); + }); - if (!acceptQueue.isEmpty() && !isInCacheRequest) { - synchronized (PendingAckHandleImpl.this) { - if (!acceptQueue.isEmpty()) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - addCommitTxnRequest(txnID, properties, lowWaterMark, completableFuture); - return completableFuture; - } - } - } + return completableFuture; + } - CompletableFuture<Void> commitFuture = new CompletableFuture<>(); + private void addCommitTxnRequest(TxnID txnId, Map<String, Long> properties, long lowWaterMark, + CompletableFuture<Void> completableFuture) { + acceptQueue.add(() -> internalCommitTxn(txnId, properties, lowWaterMark, completableFuture)); + } + private CompletableFuture<Void> internalCommitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark, Review comment: don't need return future ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -126,76 +133,51 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) { completeHandleFuture(); } }); + + internalPinnedExecutor = persistentSubscription + .getTopic() + .getBrokerService() + .getPulsar() + .getTransactionExecutorProvider() + .getExecutor(); } private void initPendingAckStore() { if (changeToInitializingState()) { - synchronized (PendingAckHandleImpl.this) { - if (!checkIfClose()) { - this.pendingAckStoreFuture = - pendingAckStoreProvider.newPendingAckStore(persistentSubscription); - this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { - pendingAckStore.replayAsync(this, - ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService() - .getPulsar().getTransactionReplayExecutor()); - }).exceptionally(e -> { - acceptQueue.clear(); - changeToErrorState(); - log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e); - return null; - }); - } + if (!checkIfClose()) { + this.pendingAckStoreFuture = + pendingAckStoreProvider.newPendingAckStore(persistentSubscription); + this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { + pendingAckStore.replayAsync(this, + (ScheduledExecutorService) persistentSubscription.getTopic().getBrokerService() + .getPulsar().getTransactionExecutorProvider().getExecutor(this)); + }).exceptionally(e -> { + acceptQueue.clear(); + changeToErrorState(); + log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e); + return null; + }); } } } private void addIndividualAcknowledgeMessageRequest(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions, CompletableFuture<Void> completableFuture) { - acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, true).thenAccept(v -> - completableFuture.complete(null)).exceptionally(e -> { - completableFuture.completeExceptionally(e); - return null; - })); + acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID, positions, completableFuture)); } - @Override - public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, - List<MutablePair<PositionImpl, Integer>> positions, - boolean isInCacheRequest) { - - if (!checkIfReady()) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - synchronized (PendingAckHandleImpl.this) { - switch (state) { - case Initializing: - addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); - return completableFuture; - case None: - addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); - initPendingAckStore(); - return completableFuture; - case Error: - completableFuture.completeExceptionally( - new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); - return completableFuture; - case Close: - completableFuture.completeExceptionally( - new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); - return completableFuture; - default: - break; - } - } - } - + public CompletableFuture<Void> internalIndividualAcknowledgeMessage(TxnID txnID, + List<MutablePair<PositionImpl, Integer>> + positions, Review comment: code style ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -291,46 +273,47 @@ private void addIndividualAcknowledgeMessageRequest(TxnID txnID, return completableFuture; } - private void addCumulativeAcknowledgeMessageRequest(TxnID txnID, - List<PositionImpl> positions, - CompletableFuture<Void> completableFuture) { - acceptQueue.add(() -> cumulativeAcknowledgeMessage(txnID, positions, true).thenAccept(v -> - completableFuture.complete(null)).exceptionally(e -> { - completableFuture.completeExceptionally(e); - return null; - })); - } - @Override - public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, - List<PositionImpl> positions, + public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, + List<MutablePair<PositionImpl, Integer>> positions, boolean isInCacheRequest) { - if (!checkIfReady()) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - synchronized (PendingAckHandleImpl.this) { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { switch (state) { case Initializing: - addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture); - return completableFuture; + addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); + return; case None: - addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture); + addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); initPendingAckStore(); - return completableFuture; + return; case Error: completableFuture.completeExceptionally( new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); - return completableFuture; + return; case Close: completableFuture.completeExceptionally( new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); - return completableFuture; + return; default: break; - } } - } + internalIndividualAcknowledgeMessage(txnID, positions, completableFuture); + }); + return completableFuture; + } + private void addCumulativeAcknowledgeMessageRequest(TxnID txnID, + List<PositionImpl> positions, + CompletableFuture<Void> completableFuture) { + acceptQueue.add(() -> internalCumulativeAcknowledgeMessage(txnID, positions, completableFuture)); + } + + public CompletableFuture<Void> internalCumulativeAcknowledgeMessage(TxnID txnID, Review comment: don't need return future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org