This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f855c5b47d2 IoTConsensusV2: Fix some remain concurrent risks for
stronger robustness. #15172
f855c5b47d2 is described below
commit f855c5b47d24d44fd43fe8520a5a6aadc086d315
Author: Peng Junzhi <[email protected]>
AuthorDate: Sun Mar 23 15:42:16 2025 +0800
IoTConsensusV2: Fix some remain concurrent risks for stronger robustness.
#15172
---
.../iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java | 3 +--
.../db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java | 4 ++--
.../pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java | 4 ++--
3 files changed, 5 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
index ce12e841cd9..ca6153d804a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
@@ -36,7 +36,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +44,7 @@ import java.util.stream.Collectors;
public class ReplicateProgressDataNodeManager implements
ReplicateProgressManager {
private static final int DATA_NODE_ID =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
- private static final Map<String, AtomicLong> groupId2ReplicateIndex = new
HashMap<>();
+ private static final Map<String, AtomicLong> groupId2ReplicateIndex = new
ConcurrentHashMap<>();
private final Map<ConsensusGroupId, ProgressIndex> groupId2MaxProgressIndex;
private final Map<ConsensusPipeName, Long>
consensusPipe2pinnedCommitIndexForMigration;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
index 6ff090475bc..d8dcfc83ab5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
@@ -39,10 +39,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -83,7 +83,7 @@ public class PageCacheDeletionBuffer implements
DeletionBuffer {
// Total size of this batch.
private final AtomicInteger totalSize = new AtomicInteger(0);
// All deletions that will be handled in a single persist task
- private final List<DeletionResource> pendingDeletionsInOneTask = new
ArrayList<>();
+ private final List<DeletionResource> pendingDeletionsInOneTask = new
CopyOnWriteArrayList<>();
// whether close method is called
private volatile boolean isClosed = false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 9f683e11a1d..6d1a86eba34 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -1331,8 +1331,8 @@ public class PipeConsensusReceiver {
private final PipeConsensusTsFileWriterPool tsFileWriterPool;
private final AtomicInteger WALEventCount = new AtomicInteger(0);
private final AtomicInteger tsFileEventCount = new AtomicInteger(0);
- private long onSyncedReplicateIndex = 0;
- private int connectorRebootTimes = 0;
+ private volatile long onSyncedReplicateIndex = 0;
+ private volatile int connectorRebootTimes = 0;
public RequestExecutor(
PipeConsensusReceiverMetrics metric, PipeConsensusTsFileWriterPool
tsFileWriterPool) {