This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_addpeer_ml_1_replic
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1326bf47b87da39b0b1b24c0b54d83716ac978bc
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Nov 7 16:50:18 2022 +0800

    fix the NPE when addPeer to a MultiLeader Group with 1 replic
---
 .../multileader/logdispatcher/LogDispatcher.java   | 24 ++++++++++++++--------
 1 file changed, 16 insertions(+), 8 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 427f2d3945..606731e6a7 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -81,16 +81,20 @@ public class LogDispatcher {
             .map(x -> new LogDispatcherThread(x, impl.getConfig(), 
DEFAULT_INITIAL_SYNC_INDEX))
             .collect(Collectors.toList());
     if (!threads.isEmpty()) {
-      // We use cached thread pool here because each LogDispatcherThread will 
occupy one thread.
-      // And every LogDispatcherThread won't release its thread in this pool 
because it won't stop
-      // unless LogDispatcher stop.
-      // Thus, the size of this threadPool will be the same as the count of 
LogDispatcherThread.
-      this.executorService =
-          IoTDBThreadPoolFactory.newCachedThreadPool(
-              "LogDispatcher-" + impl.getThisNode().getGroupId());
+      initLogSyncThreadPool();
     }
   }
 
+  private void initLogSyncThreadPool() {
+    // We use cached thread pool here because each LogDispatcherThread will 
occupy one thread.
+    // And every LogDispatcherThread won't release its thread in this pool 
because it won't stop
+    // unless LogDispatcher stop.
+    // Thus, the size of this threadPool will be the same as the count of 
LogDispatcherThread.
+    this.executorService =
+        IoTDBThreadPoolFactory.newCachedThreadPool(
+            "LogDispatcher-" + impl.getThisNode().getGroupId());
+  }
+
   public synchronized void start() {
     if (!threads.isEmpty()) {
       threads.forEach(executorService::submit);
@@ -118,9 +122,13 @@ public class LogDispatcher {
     if (stopped) {
       return;
     }
-    //
     LogDispatcherThread thread = new LogDispatcherThread(peer, 
impl.getConfig(), initialSyncIndex);
     threads.add(thread);
+    // If the initial replica is 1, the executorService won't be initialized. 
And when adding
+    // dispatcher thread, the executorService should be initialized manually
+    if (this.executorService == null) {
+      initLogSyncThreadPool();
+    }
     executorService.submit(thread);
   }
 

Reply via email to