This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1020258bdf8 PipeConsensus: fix concurrent problems regarding to 
initiate receiver and progressIndex & optimize receiver logs & fix npe (#12790)
1020258bdf8 is described below

commit 1020258bdf8e9a587317bb5ee526437507164cac
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Jun 24 21:21:55 2024 -0500

    PipeConsensus: fix concurrent problems regarding to initiate receiver and 
progressIndex & optimize receiver logs & fix npe (#12790)
---
 .../pipeconsensus/PipeConsensusAsyncConnector.java | 12 ++--
 .../consensus/ProgressIndexDataNodeManager.java    | 20 +++---
 .../pipeconsensus/PipeConsensusReceiver.java       | 71 +++++++++++++---------
 .../pipeconsensus/PipeConsensusReceiverAgent.java  | 53 ++++++++++++----
 .../consensus/index/impl/RecoverProgressIndex.java |  5 +-
 5 files changed, 108 insertions(+), 53 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 526773bf81a..2262a3c8b84 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -122,6 +122,12 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
       throws Exception {
     super.customize(parameters, configuration);
 
+    // initialize metric components
+    pipeConsensusConnectorMetrics = new PipeConsensusConnectorMetrics(this);
+    PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
+        .addConsensusPipeConnector(this);
+    
MetricService.getInstance().addMetricSet(this.pipeConsensusConnectorMetrics);
+
     // Get consensusGroupId from parameters passed by PipeConsensusImpl
     consensusGroupId = parameters.getInt(CONNECTOR_CONSENSUS_GROUP_ID_KEY);
     // Get consensusPipeName from parameters passed by PipeConsensusImpl
@@ -147,12 +153,6 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
 
     // currently, tablet batch is false by default in PipeConsensus;
     isTabletBatchModeEnabled = false;
-
-    // initialize metric components
-    pipeConsensusConnectorMetrics = new PipeConsensusConnectorMetrics(this);
-    PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
-        .addConsensusPipeConnector(this);
-    
MetricService.getInstance().addMetricSet(this.pipeConsensusConnectorMetrics);
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java
index 371349560b4..2f50365f07b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java
@@ -74,9 +74,13 @@ public class ProgressIndexDataNodeManager implements 
ProgressIndexManager {
                     
maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
                         extractLocalSimpleProgressIndex(progressIndex));
               }
-              groupId2MaxProgressIndex
-                  .computeIfAbsent(dataRegionId, o -> 
MinimumProgressIndex.INSTANCE)
-                  
.updateToMinimumEqualOrIsAfterProgressIndex(maxProgressIndex);
+              // Renew a variable to pass the examination of compiler
+              final ProgressIndex finalMaxProgressIndex = maxProgressIndex;
+              groupId2MaxProgressIndex.compute(
+                  dataRegionId,
+                  (key, value) ->
+                      (value == null ? MinimumProgressIndex.INSTANCE : value)
+                          
.updateToMinimumEqualOrIsAfterProgressIndex(finalMaxProgressIndex));
             });
 
     // TODO: update deletion progress index
@@ -115,10 +119,12 @@ public class ProgressIndexDataNodeManager implements 
ProgressIndexManager {
 
   @Override
   public ProgressIndex assignProgressIndex(ConsensusGroupId consensusGroupId) {
-    return groupId2MaxProgressIndex
-        .computeIfAbsent(consensusGroupId, o -> MinimumProgressIndex.INSTANCE)
-        .updateToMinimumEqualOrIsAfterProgressIndex(
-            PipeDataNodeAgent.runtime().assignProgressIndexForPipeConsensus());
+    return groupId2MaxProgressIndex.compute(
+        consensusGroupId,
+        (key, value) ->
+            ((value == null ? MinimumProgressIndex.INSTANCE : value)
+                .updateToMinimumEqualOrIsAfterProgressIndex(
+                    
PipeDataNodeAgent.runtime().assignProgressIndexForPipeConsensus())));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index d6b106705a1..7728143887a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -867,41 +867,56 @@ public class PipeConsensusReceiver {
     }
 
     // initiate receiverFileDirWithIdSuffix
+    String receiverFileBaseDir;
     try {
-      final String receiverFileBaseDir = getReceiverFileBaseDir();
-      if (Objects.isNull(receiverFileBaseDir)) {
-        LOGGER.warn(
-            "PipeConsensus-PipeName-{}: Failed to get pipeConsensus receiver 
file base directory, because your folderManager is null. May because the disk 
is full.",
-            consensusPipeName.toString());
-        throw new DiskSpaceInsufficientException(receiverBaseDirsName);
-      }
-      // Create a new receiver file dir
-      final File newReceiverDir = new File(receiverFileBaseDir, 
consensusPipeName.toString());
-      if (newReceiverDir.exists()) {
-        FileUtils.deleteDirectory(newReceiverDir);
-        LOGGER.info(
-            "PipeConsensus-PipeName-{}: Origin receiver file dir {} was 
deleted.",
-            consensusPipeName,
-            newReceiverDir.getPath());
-      }
-      if (!newReceiverDir.mkdirs()) {
-        LOGGER.warn(
-            "PipeConsensus-PipeName-{}: Failed to create receiver file dir {}. 
May because authority or dir already exists etc.",
-            consensusPipeName,
-            newReceiverDir.getPath());
-        throw new IOException(
-            String.format(
-                "PipeConsensus-PipeName-%s: Failed to create receiver file dir 
%s. May because authority or dir already exists etc.",
-                consensusPipeName, newReceiverDir.getPath()));
-      }
-      receiverFileDirWithIdSuffix.set(newReceiverDir);
-
+      receiverFileBaseDir = getReceiverFileBaseDir();
     } catch (Exception e) {
       LOGGER.warn(
           "Failed to init pipeConsensus receiver file folder manager because 
all disks of folders are full.",
           e);
       throw e;
     }
+
+    if (Objects.isNull(receiverFileBaseDir)) {
+      LOGGER.warn(
+          "PipeConsensus-PipeName-{}: Failed to get pipeConsensus receiver 
file base directory, because your folderManager is null. May because the disk 
is full.",
+          consensusPipeName.toString());
+      throw new DiskSpaceInsufficientException(receiverBaseDirsName);
+    }
+    // Create a new receiver file dir
+    final File newReceiverDir = new File(receiverFileBaseDir, 
consensusPipeName.toString());
+    // Check whether systemDir exists in case of system concurrently exit when 
receiver try to make
+    // new dirs.
+    final File systemDir = new 
File(IoTDBDescriptor.getInstance().getConfig().getSystemDir());
+    if (!systemDir.exists()) {
+      LOGGER.warn(
+          "PipeConsensus-PipeName-{}: Failed to create receiver file dir {}. 
Because parent system dir have been deleted due to system concurrently exit.",
+          consensusPipeName,
+          newReceiverDir.getPath());
+      throw new IOException(
+          String.format(
+              "PipeConsensus-PipeName-%s: Failed to create receiver file dir 
%s. Because parent system dir have been deleted due to system concurrently 
exit.",
+              consensusPipeName, newReceiverDir.getPath()));
+    }
+    // Remove exists dir
+    if (newReceiverDir.exists()) {
+      FileUtils.deleteDirectory(newReceiverDir);
+      LOGGER.info(
+          "PipeConsensus-PipeName-{}: Origin receiver file dir {} was 
deleted.",
+          consensusPipeName,
+          newReceiverDir.getPath());
+    }
+    if (!newReceiverDir.mkdirs()) {
+      LOGGER.warn(
+          "PipeConsensus-PipeName-{}: Failed to create receiver file dir {}. 
May because authority or dir already exists etc.",
+          consensusPipeName,
+          newReceiverDir.getPath());
+      throw new IOException(
+          String.format(
+              "PipeConsensus-PipeName-%s: Failed to create receiver file dir 
%s. May because authority or dir already exists etc.",
+              consensusPipeName, newReceiverDir.getPath()));
+    }
+    receiverFileDirWithIdSuffix.set(newReceiverDir);
   }
 
   public PipeConsensusRequestVersion getVersion() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
index d4c7f37e697..ff3307c5f56 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeConsensusReceiverAgent implements ConsensusPipeReceiver {
@@ -51,6 +52,8 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
           TriFunction<PipeConsensus, ConsensusGroupId, ConsensusPipeName, 
PipeConsensusReceiver>>
       RECEIVER_CONSTRUCTORS = new HashMap<>();
 
+  private static final long WAIT_INITIALIZE_RECEIVER_INTERVAL_IN_MS = 100;
+
   private final int thisNodeId = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
 
   /**
@@ -108,12 +111,18 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
     // 2. Route to given consensusPipeTask's receiver
     ConsensusPipeName consensusPipeName =
         new ConsensusPipeName(consensusGroupId, leaderDataNodeId, thisNodeId);
+    AtomicBoolean isFirstGetReceiver = new AtomicBoolean(false);
     AtomicReference<PipeConsensusReceiver> receiverReference =
         consensusPipe2ReceiverMap.computeIfAbsent(
-            consensusPipeName, key -> new AtomicReference<>(null));
+            consensusPipeName,
+            key -> {
+              isFirstGetReceiver.set(true);
+              return new AtomicReference<>(null);
+            });
 
     if (receiverReference.get() == null) {
-      return internalSetAndGetReceiver(consensusGroupId, consensusPipeName, 
reqVersion);
+      return internalSetAndGetReceiver(
+          consensusGroupId, consensusPipeName, reqVersion, isFirstGetReceiver);
     }
 
     final byte receiverThreadLocalVersion = 
receiverReference.get().getVersion().getVersion();
@@ -124,13 +133,17 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
           receiverThreadLocalVersion,
           reqVersion);
       receiverReference.set(null);
-      return internalSetAndGetReceiver(consensusGroupId, consensusPipeName, 
reqVersion);
+      return internalSetAndGetReceiver(
+          consensusGroupId, consensusPipeName, reqVersion, isFirstGetReceiver);
     }
     return receiverReference.get();
   }
 
   private PipeConsensusReceiver internalSetAndGetReceiver(
-      ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName, 
byte reqVersion) {
+      ConsensusGroupId consensusGroupId,
+      ConsensusPipeName consensusPipeName,
+      byte reqVersion,
+      AtomicBoolean isFirstGetReceiver) {
     // 1. Route to given consensusGroup's receiver map
     Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>> 
consensusPipe2ReciverMap =
         replicaReceiverMap.get(consensusGroupId);
@@ -138,18 +151,36 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
     AtomicReference<PipeConsensusReceiver> receiverReference =
         consensusPipe2ReciverMap.get(consensusPipeName);
 
-    if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
-      receiverReference.set(
-          RECEIVER_CONSTRUCTORS
-              .get(reqVersion)
-              .apply(pipeConsensus, consensusGroupId, consensusPipeName));
+    if (isFirstGetReceiver.get()) {
+      if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
+        receiverReference.set(
+            RECEIVER_CONSTRUCTORS
+                .get(reqVersion)
+                .apply(pipeConsensus, consensusGroupId, consensusPipeName));
+      } else {
+        throw new UnsupportedOperationException(
+            String.format("Unsupported pipeConsensus request version %d", 
reqVersion));
+      }
     } else {
-      throw new UnsupportedOperationException(
-          String.format("Unsupported pipeConsensus request version %d", 
reqVersion));
+      waitUntilReceiverGetInitiated(receiverReference);
     }
     return receiverReference.get();
   }
 
+  private void waitUntilReceiverGetInitiated(
+      AtomicReference<PipeConsensusReceiver> receiverReference) {
+    try {
+      while (receiverReference.get() == null) {
+        Thread.sleep(WAIT_INITIALIZE_RECEIVER_INTERVAL_IN_MS);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn(
+          "PipeConsensusReceiver thread is interrupted when waiting for 
receiver get initiated, may because system exit.",
+          e);
+    }
+  }
+
   /** Release receiver of given pipeConsensusTask */
   @Override
   public final void handleDropPipeConsensusTask(ConsensusPipeName pipeName) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
index a67a7a9ec32..5a567fd4981 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nonnull;
@@ -52,7 +53,9 @@ public class RecoverProgressIndex extends ProgressIndex {
   }
 
   public Map<Integer, SimpleProgressIndex> getDataNodeId2LocalIndex() {
-    return dataNodeId2LocalIndex;
+    return ImmutableMap.<Integer, SimpleProgressIndex>builder()
+        .putAll(dataNodeId2LocalIndex)
+        .build();
   }
 
   @Override

Reply via email to