This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a5a99911434904f0cdfccaf50ad723a2cb9bbf44
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Fri Feb 25 16:14:52 2022 +0800

    [Transaction] Adopt Single_thread to handle TcClient connecting (#13969)
    
    ### Motivation
    
    The broker will only reconnect the same TC once at the same time, and other 
connection requests during the reconnection period will be processed together 
after the connection is completed.
    There may be concurrency problems in the queue for request addition and the 
clearing of the queue.
    
    ### Modification
    
    Use SingleThread to deal TcClient connecting.
    
    (cherry picked from commit 29259e1b5c33856cd6bd9413e331f4592fd3007c)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   5 +
 .../broker/TransactionMetadataStoreService.java    | 152 ++++++++++++---------
 2 files changed, 91 insertions(+), 66 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index adeec4c..7fcf0d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -403,6 +403,11 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 brokerAdditionalServlets = null;
             }
 
+            if (this.transactionMetadataStoreService != null) {
+                this.transactionMetadataStoreService.close();
+                this.transactionMetadataStoreService = null;
+            }
+
             GracefulExecutorServicesShutdown executorServicesShutdown =
                     GracefulExecutorServicesShutdown
                             .initiate()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 9b60679..b3ced3c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -24,6 +24,7 @@ import static 
org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTI
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
@@ -32,7 +33,10 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -84,9 +88,14 @@ public class TransactionMetadataStoreService {
     // one connect request open the transactionMetaStore the other request 
will add to the queue, when the open op
     // finished the request will be poll and complete the future
     private final 
ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> 
pendingConnectRequests;
+    private final ExecutorService internalPinnedExecutor;
 
     private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
 
+    private final ThreadFactory threadFactory =
+            new DefaultThreadFactory("transaction-coordinator-thread-factory");
+
+
     public TransactionMetadataStoreService(TransactionMetadataStoreProvider 
transactionMetadataStoreProvider,
                                            PulsarService pulsarService, 
TransactionBufferClient tbClient,
                                            HashedWheelTimer timer) {
@@ -98,6 +107,7 @@ public class TransactionMetadataStoreService {
         this.transactionOpRetryTimer = timer;
         this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
         this.pendingConnectRequests = new ConcurrentLongHashMap<>();
+        this.internalPinnedExecutor = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
     }
 
     @Deprecated
@@ -152,80 +162,86 @@ public class TransactionMetadataStoreService {
     }
 
     public CompletableFuture<Void> 
handleTcClientConnect(TransactionCoordinatorID tcId) {
-        if (stores.get(tcId) != null) {
-            return CompletableFuture.completedFuture(null);
-        } else {
-            return 
pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
-                    .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) 
tcId.getId()).toString()).thenCompose(v -> {
-                        CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
-                        .computeIfAbsent(tcId.getId(), (id) -> new 
Semaphore(1));
-                Deque<CompletableFuture<Void>> deque = pendingConnectRequests
-                        .computeIfAbsent(tcId.getId(), (id) -> new 
ConcurrentLinkedDeque<>());
-                if (tcLoadSemaphore.tryAcquire()) {
-                    // when tcLoadSemaphore.release(), this command will 
acquire semaphore, so we should jude the store
-                    // exist again.
-                    if (stores.get(tcId) != null) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    openTransactionMetadataStore(tcId).thenAccept((store) -> {
-                        stores.put(tcId, store);
-                        LOG.info("Added new transaction meta store {}", tcId);
-                        long endTime = System.currentTimeMillis() + 
HANDLE_PENDING_CONNECT_TIME_OUT;
-                        while (true) {
-                            // prevent thread in a busy loop.
-                            if (System.currentTimeMillis() < endTime) {
-                                CompletableFuture<Void> future = deque.poll();
-                                if (future != null) {
-                                    // complete queue request future
-                                    future.complete(null);
-                                } else {
-                                    break;
-                                }
-                            } else {
-                                deque.clear();
-                                break;
-                            }
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (stores.get(tcId) != null) {
+                completableFuture.complete(null);
+            } else {
+                
pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
+                        .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) 
tcId.getId()).toString())
+                        .thenRun(() -> internalPinnedExecutor.execute(() -> {
+                    final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+                            .computeIfAbsent(tcId.getId(), (id) -> new 
Semaphore(1));
+                    Deque<CompletableFuture<Void>> deque = 
pendingConnectRequests
+                            .computeIfAbsent(tcId.getId(), (id) -> new 
ConcurrentLinkedDeque<>());
+                    if (tcLoadSemaphore.tryAcquire()) {
+                        // when tcLoadSemaphore.release(), this command will 
acquire semaphore,
+                        // so we should jude the store exist again.
+                        if (stores.get(tcId) != null) {
+                            completableFuture.complete(null);
                         }
 
-                        completableFuture.complete(null);
-                        tcLoadSemaphore.release();
-                    }).exceptionally(e -> {
-                        completableFuture.completeExceptionally(e.getCause());
-                        // release before handle request queue, in order to 
client reconnect infinite loop
-                        tcLoadSemaphore.release();
-                        long endTime = System.currentTimeMillis() + 
HANDLE_PENDING_CONNECT_TIME_OUT;
-                        while (true) {
-                            // prevent thread in a busy loop.
-                            if (System.currentTimeMillis() < endTime) {
-                                CompletableFuture<Void> future = deque.poll();
-                                if (future != null) {
-                                    // this means that this tc client 
connection connect fail
-                                    future.completeExceptionally(e);
+                        openTransactionMetadataStore(tcId).thenAccept((store) 
-> internalPinnedExecutor.execute(() -> {
+                            stores.put(tcId, store);
+                            LOG.info("Added new transaction meta store {}", 
tcId);
+                            long endTime = System.currentTimeMillis() + 
HANDLE_PENDING_CONNECT_TIME_OUT;
+                            while (true) {
+                                // prevent thread in a busy loop.
+                                if (System.currentTimeMillis() < endTime) {
+                                    CompletableFuture<Void> future = 
deque.poll();
+                                    if (future != null) {
+                                        // complete queue request future
+                                        future.complete(null);
+                                    } else {
+                                        break;
+                                    }
                                 } else {
+                                    deque.clear();
                                     break;
                                 }
-                            } else {
-                                deque.clear();
-                                break;
                             }
+
+                            completableFuture.complete(null);
+                            tcLoadSemaphore.release();
+                        })).exceptionally(e -> {
+                            internalPinnedExecutor.execute(() -> {
+                                        
completableFuture.completeExceptionally(e.getCause());
+                                        // release before handle request queue,
+                                        //in order to client reconnect 
infinite loop
+                                        tcLoadSemaphore.release();
+                                        long endTime = 
System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
+                                        while (true) {
+                                            // prevent thread in a busy loop.
+                                            if (System.currentTimeMillis() < 
endTime) {
+                                                CompletableFuture<Void> future 
= deque.poll();
+                                                if (future != null) {
+                                                    // this means that this tc 
client connection connect fail
+                                                    
future.completeExceptionally(e);
+                                                } else {
+                                                    break;
+                                                }
+                                            } else {
+                                                deque.clear();
+                                                break;
+                                            }
+                                        }
+                                        LOG.error("Add transaction metadata 
store with id {} error", tcId.getId(), e);
+                                    });
+                                    return null;
+                                });
+                    } else {
+                        // only one command can open transaction metadata 
store,
+                        // other will be added to the deque, when the op of 
openTransactionMetadataStore finished
+                        // then handle the requests witch in the queue
+                        deque.add(completableFuture);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Handle tc client connect added into 
pending queue! tcId : {}", tcId.toString());
                         }
-                        LOG.error("Add transaction metadata store with id {} 
error", tcId.getId(), e);
-                        return null;
-                    });
-                } else {
-                    // only one command can open transaction metadata store,
-                    // other will be added to the deque, when the op of 
openTransactionMetadataStore finished
-                    // then handle the requests witch in the queue
-                    deque.add(completableFuture);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Handle tc client connect added into pending 
queue! tcId : {}", tcId.toString());
                     }
-                }
-                return completableFuture;
-            });
-        }
+                }));
+            }
+        });
+        return completableFuture;
     }
 
     public CompletableFuture<TransactionMetadataStore> 
openTransactionMetadataStore(TransactionCoordinatorID tcId) {
@@ -537,4 +553,8 @@ public class TransactionMetadataStoreService {
     public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() 
{
         return Collections.unmodifiableMap(stores);
     }
+
+    public void close () {
+        this.internalPinnedExecutor.shutdown();
+    }
 }

Reply via email to