This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a5a99911434904f0cdfccaf50ad723a2cb9bbf44 Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Fri Feb 25 16:14:52 2022 +0800 [Transaction] Adopt Single_thread to handle TcClient connecting (#13969) ### Motivation The broker will only reconnect the same TC once at the same time, and other connection requests during the reconnection period will be processed together after the connection is completed. There may be concurrency problems in the queue for request addition and the clearing of the queue. ### Modification Use SingleThread to deal TcClient connecting. (cherry picked from commit 29259e1b5c33856cd6bd9413e331f4592fd3007c) --- .../org/apache/pulsar/broker/PulsarService.java | 5 + .../broker/TransactionMetadataStoreService.java | 152 ++++++++++++--------- 2 files changed, 91 insertions(+), 66 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index adeec4c..7fcf0d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -403,6 +403,11 @@ public class PulsarService implements AutoCloseable, ShutdownService { brokerAdditionalServlets = null; } + if (this.transactionMetadataStoreService != null) { + this.transactionMetadataStoreService.close(); + this.transactionMetadataStoreService = null; + } + GracefulExecutorServicesShutdown executorServicesShutdown = GracefulExecutorServicesShutdown .initiate() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 9b60679..b3ced3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -24,6 +24,7 @@ import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTI import com.google.common.annotations.VisibleForTesting; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; @@ -32,7 +33,10 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; @@ -84,9 +88,14 @@ public class TransactionMetadataStoreService { // one connect request open the transactionMetaStore the other request will add to the queue, when the open op // finished the request will be poll and complete the future private final ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> pendingConnectRequests; + private final ExecutorService internalPinnedExecutor; private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L; + private final ThreadFactory threadFactory = + new DefaultThreadFactory("transaction-coordinator-thread-factory"); + + public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider, PulsarService pulsarService, TransactionBufferClient tbClient, HashedWheelTimer timer) { @@ -98,6 +107,7 @@ public class TransactionMetadataStoreService { this.transactionOpRetryTimer = timer; this.tcLoadSemaphores = new ConcurrentLongHashMap<>(); this.pendingConnectRequests = new ConcurrentLongHashMap<>(); + this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); } @Deprecated @@ -152,80 +162,86 @@ public class TransactionMetadataStoreService { } public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) { - if (stores.get(tcId) != null) { - return CompletableFuture.completedFuture(null); - } else { - return pulsarService.getBrokerService().checkTopicNsOwnership(TopicName - .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString()).thenCompose(v -> { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - final Semaphore tcLoadSemaphore = this.tcLoadSemaphores - .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1)); - Deque<CompletableFuture<Void>> deque = pendingConnectRequests - .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>()); - if (tcLoadSemaphore.tryAcquire()) { - // when tcLoadSemaphore.release(), this command will acquire semaphore, so we should jude the store - // exist again. - if (stores.get(tcId) != null) { - return CompletableFuture.completedFuture(null); - } - - openTransactionMetadataStore(tcId).thenAccept((store) -> { - stores.put(tcId, store); - LOG.info("Added new transaction meta store {}", tcId); - long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT; - while (true) { - // prevent thread in a busy loop. - if (System.currentTimeMillis() < endTime) { - CompletableFuture<Void> future = deque.poll(); - if (future != null) { - // complete queue request future - future.complete(null); - } else { - break; - } - } else { - deque.clear(); - break; - } + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (stores.get(tcId) != null) { + completableFuture.complete(null); + } else { + pulsarService.getBrokerService().checkTopicNsOwnership(TopicName + .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString()) + .thenRun(() -> internalPinnedExecutor.execute(() -> { + final Semaphore tcLoadSemaphore = this.tcLoadSemaphores + .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1)); + Deque<CompletableFuture<Void>> deque = pendingConnectRequests + .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>()); + if (tcLoadSemaphore.tryAcquire()) { + // when tcLoadSemaphore.release(), this command will acquire semaphore, + // so we should jude the store exist again. + if (stores.get(tcId) != null) { + completableFuture.complete(null); } - completableFuture.complete(null); - tcLoadSemaphore.release(); - }).exceptionally(e -> { - completableFuture.completeExceptionally(e.getCause()); - // release before handle request queue, in order to client reconnect infinite loop - tcLoadSemaphore.release(); - long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT; - while (true) { - // prevent thread in a busy loop. - if (System.currentTimeMillis() < endTime) { - CompletableFuture<Void> future = deque.poll(); - if (future != null) { - // this means that this tc client connection connect fail - future.completeExceptionally(e); + openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> { + stores.put(tcId, store); + LOG.info("Added new transaction meta store {}", tcId); + long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT; + while (true) { + // prevent thread in a busy loop. + if (System.currentTimeMillis() < endTime) { + CompletableFuture<Void> future = deque.poll(); + if (future != null) { + // complete queue request future + future.complete(null); + } else { + break; + } } else { + deque.clear(); break; } - } else { - deque.clear(); - break; } + + completableFuture.complete(null); + tcLoadSemaphore.release(); + })).exceptionally(e -> { + internalPinnedExecutor.execute(() -> { + completableFuture.completeExceptionally(e.getCause()); + // release before handle request queue, + //in order to client reconnect infinite loop + tcLoadSemaphore.release(); + long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT; + while (true) { + // prevent thread in a busy loop. + if (System.currentTimeMillis() < endTime) { + CompletableFuture<Void> future = deque.poll(); + if (future != null) { + // this means that this tc client connection connect fail + future.completeExceptionally(e); + } else { + break; + } + } else { + deque.clear(); + break; + } + } + LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e); + }); + return null; + }); + } else { + // only one command can open transaction metadata store, + // other will be added to the deque, when the op of openTransactionMetadataStore finished + // then handle the requests witch in the queue + deque.add(completableFuture); + if (LOG.isDebugEnabled()) { + LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId.toString()); } - LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e); - return null; - }); - } else { - // only one command can open transaction metadata store, - // other will be added to the deque, when the op of openTransactionMetadataStore finished - // then handle the requests witch in the queue - deque.add(completableFuture); - if (LOG.isDebugEnabled()) { - LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId.toString()); } - } - return completableFuture; - }); - } + })); + } + }); + return completableFuture; } public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) { @@ -537,4 +553,8 @@ public class TransactionMetadataStoreService { public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() { return Collections.unmodifiableMap(stores); } + + public void close () { + this.internalPinnedExecutor.shutdown(); + } }