This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 08c046e151e Fix IoTConsensus multi-folder snapshot load for DataRegion 
(#17974)
08c046e151e is described below

commit 08c046e151e350890f4a15338b094d897d8e13c7
Author: Hongzhi Gao <[email protected]>
AuthorDate: Thu Jun 18 09:00:35 2026 +0800

    Fix IoTConsensus multi-folder snapshot load for DataRegion (#17974)
    
    Collect receive folders that actually hold snapshot fragments and load them 
in a single call. Loading per folder wipes data dirs before relinking, so 
repeated loads only kept the last folder's data. Add SnapshotLoader multi-dir 
support and regression tests.
---
 .../org/apache/iotdb/consensus/IStateMachine.java  | 23 +++++
 .../consensus/iot/IoTConsensusServerImpl.java      | 27 +++---
 .../dataregion/DataRegionStateMachine.java         | 44 +++++++---
 .../dataregion/snapshot/SnapshotLoader.java        | 47 +++++++++++
 .../dataregion/snapshot/IoTDBSnapshotTest.java     | 98 ++++++++++++++++++++++
 5 files changed, 216 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index c7705d93896..c879c3bbb9b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -118,6 +118,29 @@ public interface IStateMachine {
    */
   boolean loadSnapshot(File latestSnapshotRootDir);
 
+  /**
+   * Load the latest snapshot whose fragments are spread across several root 
dirs. A single snapshot
+   * may be received into more than one folder (e.g. IoTConsensus spreads a 
snapshot's fragments
+   * across one receive folder per local data dir), so the whole snapshot must 
be loaded in one call
+   * rather than once per folder: a state machine whose load wipes its data 
dirs before relinking
+   * (such as the DataRegion one) would otherwise have each per-folder load 
erase the fragments
+   * linked by the previous folders, leaving only the last folder's data.
+   *
+   * <p>The default implementation simply loads each dir in turn, which is 
correct only for state
+   * machines whose per-dir load is independent; such state machines should 
override this when their
+   * load is destructive across dirs.
+   *
+   * @param latestSnapshotRootDirs the dirs that actually hold fragments of 
the snapshot
+   * @return {@code true} if the snapshot was loaded successfully, {@code 
false} otherwise.
+   */
+  default boolean loadSnapshot(List<File> latestSnapshotRootDirs) {
+    boolean success = true;
+    for (File dir : latestSnapshotRootDirs) {
+      success = loadSnapshot(dir) && success;
+    }
+    return success;
+  }
+
   /**
    * given a snapshot dir, ask statemachine to provide all snapshot files. By 
default, it will list
    * all files recursively under latestSnapshotDir
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 017b646e606..5b818db6301 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -514,26 +514,29 @@ public class IoTConsensusServerImpl {
   public boolean loadSnapshot(String snapshotId) {
     // Snapshot fragments are spread across the receive folders by the 
FolderManager (a DataRegion,
     // for example, uses one receive folder per local data dir), so a given 
snapshot only exists
-    // under the folders that actually received fragments. Load from those 
folders and skip the
-    // others; otherwise the state machine would fail on a folder that never 
received this snapshot
-    // and turn a healthy multi-data-dir transfer into a spurious failure.
+    // under the folders that actually received fragments. Collect exactly 
those folders and hand
+    // them to the state machine in a single load call.
+    //
+    // It must be a single call rather than one call per folder: the state 
machine's load is
+    // destructive (a DataRegion load wipes the data dirs before relinking), 
so loading folders one
+    // at a time would make each load erase the fragments linked by the 
previous folders, leaving
+    // only the last folder's data. The state machine instead clears the data 
dirs once and relinks
+    // every folder's fragments together.
     //
     // Note: an empty region produces a snapshot with zero fragments, so none 
of the receive folders
     // contains it. That is a legitimate (no-op) load, not a failure, so an 
absent snapshot must not
     // be reported as failure here.
+    List<File> snapshotDirs = new ArrayList<>();
     for (String dir : recvFolderManager.getFolders()) {
       File snapshotDir = getSnapshotPath(dir, snapshotId);
-      if (!snapshotDir.exists()) {
-        continue;
-      }
-      if (!stateMachine.loadSnapshot(snapshotDir)) {
-        // Stop at the first failure. The snapshot is already broken on this 
replica, and loading
-        // the remaining folders is both pointless and harmful: a load wipes 
the data dirs before
-        // relinking. Report the failure so the AddPeer coordinator does not 
activate this peer.
-        return false;
+      if (snapshotDir.exists()) {
+        snapshotDirs.add(snapshotDir);
       }
     }
-    return true;
+    if (snapshotDirs.isEmpty()) {
+      return true;
+    }
+    return stateMachine.loadSnapshot(snapshotDirs);
   }
 
   private File getSnapshotPath(String curStorageDir, String 
snapshotRelativePath) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 62bb498afa1..b99304055a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -52,6 +52,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Supplier;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -125,22 +126,43 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
 
   @Override
   public boolean loadSnapshot(File latestSnapshotRootDir) {
-    String databaseName = region.getDatabaseName();
+    final String databaseName = region.getDatabaseName();
+    final String dataRegionIdString = region.getDataRegionIdString();
+    return loadSnapshot(
+        () ->
+            new SnapshotLoader(
+                    latestSnapshotRootDir.getAbsolutePath(), databaseName, 
dataRegionIdString)
+                .loadSnapshotForStateMachine(),
+        latestSnapshotRootDir);
+  }
+
+  @Override
+  public boolean loadSnapshot(List<File> latestSnapshotRootDirs) {
+    final String databaseName = region.getDatabaseName();
+    final String dataRegionIdString = region.getDataRegionIdString();
+    // A single snapshot is spread across several receive folders, and loading 
wipes the data dirs
+    // before relinking. It must therefore be loaded in one shot (clear once, 
relink every folder)
+    // rather than once per folder, otherwise each per-folder load would erase 
the previous folders'
+    // fragments and leave only the last one's data.
+    final List<String> snapshotRootPaths = new ArrayList<>();
+    for (File dir : latestSnapshotRootDirs) {
+      snapshotRootPaths.add(dir.getAbsolutePath());
+    }
+    return loadSnapshot(
+        () ->
+            new SnapshotLoader(snapshotRootPaths, databaseName, 
dataRegionIdString)
+                .loadSnapshotForStateMachine(),
+        latestSnapshotRootDirs);
+  }
+
+  private boolean loadSnapshot(Supplier<DataRegion> snapshotLoader, Object 
snapshotRootForLog) {
     String dataRegionIdString = region.getDataRegionIdString();
     DataRegionId regionId = new 
DataRegionId(Integer.parseInt(dataRegionIdString));
     try {
       DataRegion newRegion =
-          StorageEngine.getInstance()
-              .setDataRegionForSnapshotLoad(
-                  regionId,
-                  () ->
-                      new SnapshotLoader(
-                              latestSnapshotRootDir.getAbsolutePath(),
-                              databaseName,
-                              dataRegionIdString)
-                          .loadSnapshotForStateMachine());
+          StorageEngine.getInstance().setDataRegionForSnapshotLoad(regionId, 
snapshotLoader);
       if (newRegion == null) {
-        logger.error(DataNodeMiscMessages.FAIL_LOAD_SNAPSHOT, 
latestSnapshotRootDir);
+        logger.error(DataNodeMiscMessages.FAIL_LOAD_SNAPSHOT, 
snapshotRootForLog);
         return false;
       }
       this.region = newRegion;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
index 053cb8a8b86..17136cd24fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
@@ -44,6 +44,7 @@ import java.nio.file.Path;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -54,11 +55,26 @@ public class SnapshotLoader {
   private Logger LOGGER = LoggerFactory.getLogger(SnapshotLoader.class);
   private String storageGroupName;
   private String snapshotPath;
+  private List<String> snapshotPaths;
   private String dataRegionId;
   private SnapshotLogAnalyzer logAnalyzer;
 
   public SnapshotLoader(String snapshotPath, String storageGroupName, String 
dataRegionId) {
     this.snapshotPath = snapshotPath;
+    this.snapshotPaths = Collections.singletonList(snapshotPath);
+    this.storageGroupName = storageGroupName;
+    this.dataRegionId = dataRegionId;
+  }
+
+  /**
+   * A snapshot received by IoTConsensus is spread across several receive 
folders (one per local
+   * data dir), so loading it means relinking the fragments from all of them. 
The data dirs must be
+   * cleared exactly once, before relinking from any folder; see {@link
+   * #loadSnapshotFromMultipleDirs()}.
+   */
+  public SnapshotLoader(List<String> snapshotPaths, String storageGroupName, 
String dataRegionId) {
+    this.snapshotPaths = snapshotPaths;
+    this.snapshotPath = snapshotPaths.isEmpty() ? null : snapshotPaths.get(0);
     this.storageGroupName = storageGroupName;
     this.dataRegionId = dataRegionId;
   }
@@ -100,6 +116,10 @@ public class SnapshotLoader {
    * @return
    */
   public DataRegion loadSnapshotForStateMachine() {
+    if (snapshotPaths.size() > 1) {
+      return loadSnapshotFromMultipleDirs();
+    }
+
     LOGGER.info(
         StorageEngineMessages.LOADING_SNAPSHOT_FOR, storageGroupName, 
dataRegionId, snapshotPath);
 
@@ -112,6 +132,33 @@ public class SnapshotLoader {
     }
   }
 
+  /**
+   * Load a snapshot whose fragments are spread across several dirs (the 
IoTConsensus receive
+   * folders). The snapshot log is not transferred during an IoTConsensus 
snapshot, so every
+   * received fragment dir takes the without-log path. Crucially, the data 
dirs are cleared exactly
+   * once before relinking from all dirs: clearing per-dir (as one load call 
per dir would) erases
+   * the fragments linked by the previous dirs and leaves only the last dir's 
data. Because each dir
+   * contributes a disjoint set of files, the relink order does not affect the 
result.
+   */
+  private DataRegion loadSnapshotFromMultipleDirs() {
+    LOGGER.info(
+        StorageEngineMessages.LOADING_SNAPSHOT_FOR, storageGroupName, 
dataRegionId, snapshotPaths);
+    try {
+      deleteAllFilesInDataDirs();
+      LOGGER.info(StorageEngineMessages.REMOVE_ALL_DATA_FILES_IN_ORIGINAL_DIR);
+      for (String path : snapshotPaths) {
+        File snapshotDir = new File(path);
+        createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir);
+        loadCompressionRatio(snapshotDir);
+      }
+      return loadSnapshot();
+    } catch (IOException | DiskSpaceInsufficientException e) {
+      LOGGER.error(
+          StorageEngineMessages.EXCEPTION_LOADING_SNAPSHOT_FOR, 
storageGroupName, dataRegionId, e);
+      return null;
+    }
+  }
+
   private DataRegion loadSnapshotWithoutLog() {
     try {
       try {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
index 29fe4f13586..d042d2a2ae8 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
@@ -245,6 +245,104 @@ public class IoTDBSnapshotTest {
     }
   }
 
+  /**
+   * Regression test for the multi-receive-folder snapshot load. IoTConsensus 
spreads a single
+   * snapshot's fragments across one receive folder per local data dir, so 
loading must clear the
+   * data dirs once and relink the fragments from every folder. Before the fix 
each folder was
+   * loaded with its own clear-then-relink, so every folder but the last had 
its just-linked
+   * fragments wiped by the next folder's clear, losing all but the last 
folder's data.
+   */
+  @Test
+  public void testLoadSnapshotSpreadAcrossReceiveFolders()
+      throws IOException, WriteProcessException {
+    loadSnapshotSpreadAcrossReceiveFolders(false);
+  }
+
+  /**
+   * The fragments of one snapshot are disjoint across the receive folders, so 
the order in which
+   * the folders are relinked must not change the loaded data. This loads the 
same spread snapshot
+   * with the receive folders presented in the opposite order and expects the 
identical result.
+   */
+  @Test
+  public void testLoadSnapshotFromReceiveFoldersIsOrderIndependent()
+      throws IOException, WriteProcessException {
+    loadSnapshotSpreadAcrossReceiveFolders(true);
+  }
+
+  private void loadSnapshotSpreadAcrossReceiveFolders(boolean reversedOrder)
+      throws IOException, WriteProcessException {
+    String[][] originDataDirs = 
IoTDBDescriptor.getInstance().getConfig().getTierDataDirs();
+    IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs);
+    TierManager.getInstance().resetFolders();
+    String recvBase0 = "target" + File.separator + "recv-snapshot-0";
+    String recvBase1 = "target" + File.separator + "recv-snapshot-1";
+    // Each receive folder holds the snapshot under a "snapshot" subdir, 
exactly as the IoTConsensus
+    // receiver materializes it, and carries no snapshot log (the log is never 
transferred).
+    File recvFolder0 = new File(recvBase0, SNAPSHOT_DIR_NAME);
+    File recvFolder1 = new File(recvBase1, SNAPSHOT_DIR_NAME);
+    try {
+      Assert.assertTrue(recvFolder0.mkdirs());
+      Assert.assertTrue(recvFolder1.mkdirs());
+
+      // Spread the fragments across the two receive folders: even-indexed 
files in the first,
+      // odd-indexed files in the second, so neither folder holds the whole 
snapshot.
+      int fileNum = 6;
+      for (int i = 0; i < fileNum; i++) {
+        writeSnapshotFragment((i % 2 == 0 ? recvFolder0 : 
recvFolder1).getAbsolutePath(), i);
+      }
+
+      List<String> snapshotDirs =
+          reversedOrder
+              ? Arrays.asList(recvFolder1.getAbsolutePath(), 
recvFolder0.getAbsolutePath())
+              : Arrays.asList(recvFolder0.getAbsolutePath(), 
recvFolder1.getAbsolutePath());
+
+      DataRegion dataRegion =
+          new SnapshotLoader(snapshotDirs, testSgName, 
"0").loadSnapshotForStateMachine();
+
+      Assert.assertNotNull(dataRegion);
+      // Every fragment from every receive folder must be present, regardless 
of relink order.
+      assertEquals(fileNum, 
dataRegion.getTsFileManager().getTsFileList(true).size());
+    } finally {
+      FileUtils.recursivelyDeleteFolder(recvBase0);
+      FileUtils.recursivelyDeleteFolder(recvBase1);
+      
IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(originDataDirs);
+      TierManager.getInstance().resetFolders();
+    }
+  }
+
+  /**
+   * Materialize a single snapshot fragment (a TsFile and its resource) under 
{@code
+   * <recvSnapshotDir>/sequence/<sg>/0/0/}, i.e. the layout the without-log 
loader expects from a
+   * received snapshot.
+   */
+  private void writeSnapshotFragment(String recvSnapshotDir, int i)
+      throws IOException, WriteProcessException {
+    String filePath =
+        recvSnapshotDir
+            + File.separator
+            + "sequence"
+            + File.separator
+            + testSgName
+            + File.separator
+            + "0"
+            + File.separator
+            + "0"
+            + File.separator
+            + String.format("%d-%d-0-0.tsfile", i + 1, i + 1);
+    File newFile = new File(filePath);
+    Assert.assertTrue(newFile.getParentFile().exists() || 
newFile.getParentFile().mkdirs());
+    TsFileGeneratorUtils.generateMixTsFile(filePath, 5, 5, 10, i * 100, (i + 
1) * 100, 10, 10);
+    TsFileResource resource = new TsFileResource(new File(filePath));
+    resource.updateStartTime(
+        IDeviceID.Factory.DEFAULT_FACTORY.create(testSgName + PATH_SEPARATOR + 
"d" + i), i * 100);
+    resource.updateEndTime(
+        IDeviceID.Factory.DEFAULT_FACTORY.create(testSgName + PATH_SEPARATOR + 
"d" + i),
+        (i + 1) * 100);
+    resource.updatePlanIndexes(i);
+    resource.setStatusForTest(TsFileResourceStatus.NORMAL);
+    resource.serialize();
+  }
+
   @Ignore("Need manual execution to specify different disks")
   @Test
   public void testLoadSnapshotNoHardLink()

Reply via email to