This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1020258bdf8 PipeConsensus: fix concurrent problems regarding to
initiate receiver and progressIndex & optimize receiver logs & fix npe (#12790)
1020258bdf8 is described below
commit 1020258bdf8e9a587317bb5ee526437507164cac
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Jun 24 21:21:55 2024 -0500
PipeConsensus: fix concurrent problems regarding to initiate receiver and
progressIndex & optimize receiver logs & fix npe (#12790)
---
.../pipeconsensus/PipeConsensusAsyncConnector.java | 12 ++--
.../consensus/ProgressIndexDataNodeManager.java | 20 +++---
.../pipeconsensus/PipeConsensusReceiver.java | 71 +++++++++++++---------
.../pipeconsensus/PipeConsensusReceiverAgent.java | 53 ++++++++++++----
.../consensus/index/impl/RecoverProgressIndex.java | 5 +-
5 files changed, 108 insertions(+), 53 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 526773bf81a..2262a3c8b84 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -122,6 +122,12 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
throws Exception {
super.customize(parameters, configuration);
+ // initialize metric components
+ pipeConsensusConnectorMetrics = new PipeConsensusConnectorMetrics(this);
+ PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
+ .addConsensusPipeConnector(this);
+
MetricService.getInstance().addMetricSet(this.pipeConsensusConnectorMetrics);
+
// Get consensusGroupId from parameters passed by PipeConsensusImpl
consensusGroupId = parameters.getInt(CONNECTOR_CONSENSUS_GROUP_ID_KEY);
// Get consensusPipeName from parameters passed by PipeConsensusImpl
@@ -147,12 +153,6 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
// currently, tablet batch is false by default in PipeConsensus;
isTabletBatchModeEnabled = false;
-
- // initialize metric components
- pipeConsensusConnectorMetrics = new PipeConsensusConnectorMetrics(this);
- PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
- .addConsensusPipeConnector(this);
-
MetricService.getInstance().addMetricSet(this.pipeConsensusConnectorMetrics);
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java
index 371349560b4..2f50365f07b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java
@@ -74,9 +74,13 @@ public class ProgressIndexDataNodeManager implements
ProgressIndexManager {
maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
extractLocalSimpleProgressIndex(progressIndex));
}
- groupId2MaxProgressIndex
- .computeIfAbsent(dataRegionId, o ->
MinimumProgressIndex.INSTANCE)
-
.updateToMinimumEqualOrIsAfterProgressIndex(maxProgressIndex);
+ // Renew a variable to pass the examination of compiler
+ final ProgressIndex finalMaxProgressIndex = maxProgressIndex;
+ groupId2MaxProgressIndex.compute(
+ dataRegionId,
+ (key, value) ->
+ (value == null ? MinimumProgressIndex.INSTANCE : value)
+
.updateToMinimumEqualOrIsAfterProgressIndex(finalMaxProgressIndex));
});
// TODO: update deletion progress index
@@ -115,10 +119,12 @@ public class ProgressIndexDataNodeManager implements
ProgressIndexManager {
@Override
public ProgressIndex assignProgressIndex(ConsensusGroupId consensusGroupId) {
- return groupId2MaxProgressIndex
- .computeIfAbsent(consensusGroupId, o -> MinimumProgressIndex.INSTANCE)
- .updateToMinimumEqualOrIsAfterProgressIndex(
- PipeDataNodeAgent.runtime().assignProgressIndexForPipeConsensus());
+ return groupId2MaxProgressIndex.compute(
+ consensusGroupId,
+ (key, value) ->
+ ((value == null ? MinimumProgressIndex.INSTANCE : value)
+ .updateToMinimumEqualOrIsAfterProgressIndex(
+
PipeDataNodeAgent.runtime().assignProgressIndexForPipeConsensus())));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index d6b106705a1..7728143887a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -867,41 +867,56 @@ public class PipeConsensusReceiver {
}
// initiate receiverFileDirWithIdSuffix
+ String receiverFileBaseDir;
try {
- final String receiverFileBaseDir = getReceiverFileBaseDir();
- if (Objects.isNull(receiverFileBaseDir)) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: Failed to get pipeConsensus receiver
file base directory, because your folderManager is null. May because the disk
is full.",
- consensusPipeName.toString());
- throw new DiskSpaceInsufficientException(receiverBaseDirsName);
- }
- // Create a new receiver file dir
- final File newReceiverDir = new File(receiverFileBaseDir,
consensusPipeName.toString());
- if (newReceiverDir.exists()) {
- FileUtils.deleteDirectory(newReceiverDir);
- LOGGER.info(
- "PipeConsensus-PipeName-{}: Origin receiver file dir {} was
deleted.",
- consensusPipeName,
- newReceiverDir.getPath());
- }
- if (!newReceiverDir.mkdirs()) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: Failed to create receiver file dir {}.
May because authority or dir already exists etc.",
- consensusPipeName,
- newReceiverDir.getPath());
- throw new IOException(
- String.format(
- "PipeConsensus-PipeName-%s: Failed to create receiver file dir
%s. May because authority or dir already exists etc.",
- consensusPipeName, newReceiverDir.getPath()));
- }
- receiverFileDirWithIdSuffix.set(newReceiverDir);
-
+ receiverFileBaseDir = getReceiverFileBaseDir();
} catch (Exception e) {
LOGGER.warn(
"Failed to init pipeConsensus receiver file folder manager because
all disks of folders are full.",
e);
throw e;
}
+
+ if (Objects.isNull(receiverFileBaseDir)) {
+ LOGGER.warn(
+ "PipeConsensus-PipeName-{}: Failed to get pipeConsensus receiver
file base directory, because your folderManager is null. May because the disk
is full.",
+ consensusPipeName.toString());
+ throw new DiskSpaceInsufficientException(receiverBaseDirsName);
+ }
+ // Create a new receiver file dir
+ final File newReceiverDir = new File(receiverFileBaseDir,
consensusPipeName.toString());
+ // Check whether systemDir exists in case of system concurrently exit when
receiver try to make
+ // new dirs.
+ final File systemDir = new
File(IoTDBDescriptor.getInstance().getConfig().getSystemDir());
+ if (!systemDir.exists()) {
+ LOGGER.warn(
+ "PipeConsensus-PipeName-{}: Failed to create receiver file dir {}.
Because parent system dir have been deleted due to system concurrently exit.",
+ consensusPipeName,
+ newReceiverDir.getPath());
+ throw new IOException(
+ String.format(
+ "PipeConsensus-PipeName-%s: Failed to create receiver file dir
%s. Because parent system dir have been deleted due to system concurrently
exit.",
+ consensusPipeName, newReceiverDir.getPath()));
+ }
+ // Remove exists dir
+ if (newReceiverDir.exists()) {
+ FileUtils.deleteDirectory(newReceiverDir);
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: Origin receiver file dir {} was
deleted.",
+ consensusPipeName,
+ newReceiverDir.getPath());
+ }
+ if (!newReceiverDir.mkdirs()) {
+ LOGGER.warn(
+ "PipeConsensus-PipeName-{}: Failed to create receiver file dir {}.
May because authority or dir already exists etc.",
+ consensusPipeName,
+ newReceiverDir.getPath());
+ throw new IOException(
+ String.format(
+ "PipeConsensus-PipeName-%s: Failed to create receiver file dir
%s. May because authority or dir already exists etc.",
+ consensusPipeName, newReceiverDir.getPath()));
+ }
+ receiverFileDirWithIdSuffix.set(newReceiverDir);
}
public PipeConsensusRequestVersion getVersion() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
index d4c7f37e697..ff3307c5f56 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class PipeConsensusReceiverAgent implements ConsensusPipeReceiver {
@@ -51,6 +52,8 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
TriFunction<PipeConsensus, ConsensusGroupId, ConsensusPipeName,
PipeConsensusReceiver>>
RECEIVER_CONSTRUCTORS = new HashMap<>();
+ private static final long WAIT_INITIALIZE_RECEIVER_INTERVAL_IN_MS = 100;
+
private final int thisNodeId =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
/**
@@ -108,12 +111,18 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
// 2. Route to given consensusPipeTask's receiver
ConsensusPipeName consensusPipeName =
new ConsensusPipeName(consensusGroupId, leaderDataNodeId, thisNodeId);
+ AtomicBoolean isFirstGetReceiver = new AtomicBoolean(false);
AtomicReference<PipeConsensusReceiver> receiverReference =
consensusPipe2ReceiverMap.computeIfAbsent(
- consensusPipeName, key -> new AtomicReference<>(null));
+ consensusPipeName,
+ key -> {
+ isFirstGetReceiver.set(true);
+ return new AtomicReference<>(null);
+ });
if (receiverReference.get() == null) {
- return internalSetAndGetReceiver(consensusGroupId, consensusPipeName,
reqVersion);
+ return internalSetAndGetReceiver(
+ consensusGroupId, consensusPipeName, reqVersion, isFirstGetReceiver);
}
final byte receiverThreadLocalVersion =
receiverReference.get().getVersion().getVersion();
@@ -124,13 +133,17 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
receiverThreadLocalVersion,
reqVersion);
receiverReference.set(null);
- return internalSetAndGetReceiver(consensusGroupId, consensusPipeName,
reqVersion);
+ return internalSetAndGetReceiver(
+ consensusGroupId, consensusPipeName, reqVersion, isFirstGetReceiver);
}
return receiverReference.get();
}
private PipeConsensusReceiver internalSetAndGetReceiver(
- ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName,
byte reqVersion) {
+ ConsensusGroupId consensusGroupId,
+ ConsensusPipeName consensusPipeName,
+ byte reqVersion,
+ AtomicBoolean isFirstGetReceiver) {
// 1. Route to given consensusGroup's receiver map
Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>>
consensusPipe2ReciverMap =
replicaReceiverMap.get(consensusGroupId);
@@ -138,18 +151,36 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
AtomicReference<PipeConsensusReceiver> receiverReference =
consensusPipe2ReciverMap.get(consensusPipeName);
- if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
- receiverReference.set(
- RECEIVER_CONSTRUCTORS
- .get(reqVersion)
- .apply(pipeConsensus, consensusGroupId, consensusPipeName));
+ if (isFirstGetReceiver.get()) {
+ if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
+ receiverReference.set(
+ RECEIVER_CONSTRUCTORS
+ .get(reqVersion)
+ .apply(pipeConsensus, consensusGroupId, consensusPipeName));
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Unsupported pipeConsensus request version %d",
reqVersion));
+ }
} else {
- throw new UnsupportedOperationException(
- String.format("Unsupported pipeConsensus request version %d",
reqVersion));
+ waitUntilReceiverGetInitiated(receiverReference);
}
return receiverReference.get();
}
+ private void waitUntilReceiverGetInitiated(
+ AtomicReference<PipeConsensusReceiver> receiverReference) {
+ try {
+ while (receiverReference.get() == null) {
+ Thread.sleep(WAIT_INITIALIZE_RECEIVER_INTERVAL_IN_MS);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn(
+ "PipeConsensusReceiver thread is interrupted when waiting for
receiver get initiated, may because system exit.",
+ e);
+ }
+ }
+
/** Release receiver of given pipeConsensusTask */
@Override
public final void handleDropPipeConsensusTask(ConsensusPipeName pipeName) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
index a67a7a9ec32..5a567fd4981 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
@@ -52,7 +53,9 @@ public class RecoverProgressIndex extends ProgressIndex {
}
public Map<Integer, SimpleProgressIndex> getDataNodeId2LocalIndex() {
- return dataNodeId2LocalIndex;
+ return ImmutableMap.<Integer, SimpleProgressIndex>builder()
+ .putAll(dataNodeId2LocalIndex)
+ .build();
}
@Override