This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a4ed6dd6ab8 Propagate snapshot load failure during IoTConsensus 
AddPeer (#17935)
a4ed6dd6ab8 is described below

commit a4ed6dd6ab8bbae1671ffdacf7dd9dca64c556e7
Author: Yongzao <[email protected]>
AuthorDate: Tue Jun 16 11:30:46 2026 +0800

    Propagate snapshot load failure during IoTConsensus AddPeer (#17935)
---
 .../statemachine/ConfigRegionStateMachine.java     |  10 +-
 .../persistence/executor/ConfigPlanExecutor.java   |   7 +-
 .../org/apache/iotdb/consensus/IStateMachine.java  |   5 +-
 .../consensus/iot/IoTConsensusServerImpl.java      |  31 +-
 .../service/IoTConsensusRPCServiceProcessor.java   |  12 +-
 .../ratis/ApplicationStateMachineProxy.java        |  13 +-
 .../simple/SimpleConsensusServerImpl.java          |   4 +-
 .../apache/iotdb/consensus/EmptyStateMachine.java  |   4 +-
 .../iot/AddPeerSnapshotLoadFailureTest.java        | 354 +++++++++++++++++++++
 .../iotdb/consensus/iot/util/TestStateMachine.java |   4 +-
 .../apache/iotdb/consensus/ratis/TestUtils.java    |   4 +-
 .../consensus/simple/SimpleConsensusTest.java      |   4 +-
 .../dataregion/DataRegionStateMachine.java         |   6 +-
 .../schemaregion/SchemaRegionStateMachine.java     |  25 +-
 .../schemaengine/schemaregion/ISchemaRegion.java   |  10 +-
 .../schemaregion/impl/SchemaRegionMemoryImpl.java  |   6 +-
 .../schemaregion/impl/SchemaRegionPBTreeImpl.java  |   6 +-
 .../schemaRegion/SchemaRegionManagementTest.java   |  28 +-
 18 files changed, 493 insertions(+), 40 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index b3029e66028..ad0a82bcf56 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -246,9 +246,14 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
   }
 
   @Override
-  public void loadSnapshot(final File latestSnapshotRootDir) {
+  public boolean loadSnapshot(final File latestSnapshotRootDir) {
+    // The boolean result must reflect whether the ConfigRegion state-machine 
data was loaded, so
+    // callers (e.g. the AddPeer flow) can detect a real failure. The 
pipe-listener recomputation
+    // below is best-effort post-processing: a failure there is logged but 
must NOT be reported as a
+    // snapshot-load failure, otherwise it would (e.g.) abort ConfigNode 
(re)initialization on what
+    // is actually a healthy data load.
+    final boolean loadSucceeded = executor.loadSnapshot(latestSnapshotRootDir);
     try {
-      executor.loadSnapshot(latestSnapshotRootDir);
       // We recompute the snapshot for pipe listener when loading snapshot
       // to recover the newest snapshot in cache
       PipeConfigNodeAgent.runtime()
@@ -261,6 +266,7 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
             e);
       }
     }
+    return loadSucceeded;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index b6d1bba67bd..ed23fda8a6f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -772,12 +772,12 @@ public class ConfigPlanExecutor {
     return result.get();
   }
 
-  public void loadSnapshot(final File latestSnapshotRootDir) {
+  public boolean loadSnapshot(final File latestSnapshotRootDir) {
     if (!latestSnapshotRootDir.exists()) {
       LOGGER.error(
           
ConfigNodeMessages.SNAPSHOT_DIRECTORY_IS_NOT_EXIST_CAN_NOT_LOAD_SNAPSHOT_WITH,
           latestSnapshotRootDir.getAbsolutePath());
-      return;
+      return false;
     }
 
     final AtomicBoolean result = new AtomicBoolean(true);
@@ -805,6 +805,9 @@ public class ConfigPlanExecutor {
           
ConfigNodeMessages.CONFIGNODESNAPSHOT_LOAD_SNAPSHOT_SUCCESS_LATESTSNAPSHOTROOTDIR,
           latestSnapshotRootDir);
     }
+    // Propagate any snapshot-load failure so callers (e.g. the AddPeer flow) 
do not treat a
+    // partially or wholly failed load as success.
+    return result.get();
   }
 
   private DataSet getSchemaNodeManagementPartition(ConfigPhysicalPlan req) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 3354c83699b..c7705d93896 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -112,8 +112,11 @@ public interface IStateMachine {
    * Load the latest snapshot from given dir.
    *
    * @param latestSnapshotRootDir dir where the latest snapshot sits
+   * @return {@code true} if the snapshot was loaded successfully, {@code 
false} otherwise. Callers
+   *     (e.g. the IoTConsensus AddPeer flow) rely on this to avoid activating 
a new peer whose
+   *     snapshot failed to load, which would otherwise silently lose data.
    */
-  void loadSnapshot(File latestSnapshotRootDir);
+  boolean loadSnapshot(File latestSnapshotRootDir);
 
   /**
    * given a snapshot dir, ask statemachine to provide all snapshot files. By 
default, it will list
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index f3c7d4e50ae..017b646e606 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -511,14 +511,29 @@ public class IoTConsensusServerImpl {
     }
   }
 
-  public void loadSnapshot(String snapshotId) {
-    // TODO: (xingtanzjr) throw exception if the snapshot load failed
-    recvFolderManager
-        .getFolders()
-        .forEach(
-            dir -> {
-              stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId));
-            });
+  public boolean loadSnapshot(String snapshotId) {
+    // Snapshot fragments are spread across the receive folders by the 
FolderManager (a DataRegion,
+    // for example, uses one receive folder per local data dir), so a given 
snapshot only exists
+    // under the folders that actually received fragments. Load from those 
folders and skip the
+    // others; otherwise the state machine would fail on a folder that never 
received this snapshot
+    // and turn a healthy multi-data-dir transfer into a spurious failure.
+    //
+    // Note: an empty region produces a snapshot with zero fragments, so none 
of the receive folders
+    // contains it. That is a legitimate (no-op) load, not a failure, so an 
absent snapshot must not
+    // be reported as failure here.
+    for (String dir : recvFolderManager.getFolders()) {
+      File snapshotDir = getSnapshotPath(dir, snapshotId);
+      if (!snapshotDir.exists()) {
+        continue;
+      }
+      if (!stateMachine.loadSnapshot(snapshotDir)) {
+        // Stop at the first failure. The snapshot is already broken on this 
replica, and loading
+        // the remaining folders is both pointless and harmful: a load wipes 
the data dirs before
+        // relinking. Report the failure so the AddPeer coordinator does not 
activate this peer.
+        return false;
+      }
+    }
+    return true;
   }
 
   private File getSnapshotPath(String curStorageDir, String 
snapshotRelativePath) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index c9a9901dbcf..82394b6a244 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -343,7 +343,17 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Ifa
       status.setMessage(message);
       return new TTriggerSnapshotLoadRes(status);
     }
-    impl.loadSnapshot(req.snapshotId);
+    if (!impl.loadSnapshot(req.snapshotId)) {
+      String message =
+          String.format(
+              "Failed to load snapshot %s for consensus group %s", 
req.snapshotId, groupId);
+      LOGGER.error(message);
+      // Surface a region-migration-specific code (the snapshot load runs as 
part of the AddPeer
+      // flow) rather than a generic internal error, so the coordinator's 
failure is meaningful.
+      TSStatus status = new 
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+      status.setMessage(message);
+      return new TTriggerSnapshotLoadRes(status);
+    }
     KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION);
     return new TTriggerSnapshotLoadRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 1134d8fd6f2..3e2451e65dd 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -98,7 +98,7 @@ public class ApplicationStateMachineProxy extends 
BaseStateMachine {
   }
 
   @Override
-  public void reinitialize() {
+  public void reinitialize() throws IOException {
     setLastAppliedTermIndex(null);
     loadSnapshot(snapshotStorage.findLatestSnapshotDir());
     if (getLifeCycleState() == LifeCycle.State.PAUSED) {
@@ -261,14 +261,21 @@ public class ApplicationStateMachineProxy extends 
BaseStateMachine {
     }
   }
 
-  private void loadSnapshot(File latestSnapshotDir) {
+  private void loadSnapshot(File latestSnapshotDir) throws IOException {
     snapshotStorage.updateSnapshotCache();
     if (latestSnapshotDir == null) {
       return;
     }
 
     // require the application statemachine to load the latest snapshot
-    applicationStateMachine.loadSnapshot(latestSnapshotDir);
+    if (!applicationStateMachine.loadSnapshot(latestSnapshotDir)) {
+      // The application state machine rejected this snapshot. Do not advance 
lastAppliedTermIndex:
+      // claiming the snapshot as applied would let Ratis proceed as if it 
were installed and run on
+      // incomplete data (silent data loss). Fail (re)initialization instead 
so the snapshot install
+      // is treated as failed and can be retried.
+      throw new IOException(
+          String.format("%s: failed to load snapshot from %s", this, 
latestSnapshotDir));
+    }
     TermIndex snapshotTermIndex = Utils.getTermIndexFromDir(latestSnapshotDir);
     updateLastAppliedTermIndex(snapshotTermIndex.getTerm(), 
snapshotTermIndex.getIndex());
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
index 4ed4a7b41ce..b76bcdeaaeb 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
@@ -89,7 +89,7 @@ public class SimpleConsensusServerImpl implements 
IStateMachine {
   }
 
   @Override
-  public synchronized void loadSnapshot(File latestSnapshotRootDir) {
-    stateMachine.loadSnapshot(latestSnapshotRootDir);
+  public synchronized boolean loadSnapshot(File latestSnapshotRootDir) {
+    return stateMachine.loadSnapshot(latestSnapshotRootDir);
   }
 }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index aafa0be5bb5..997120b00f9 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -54,5 +54,7 @@ public class EmptyStateMachine implements IStateMachine, 
IStateMachine.EventApi
   }
 
   @Override
-  public void loadSnapshot(File latestSnapshotRootDir) {}
+  public boolean loadSnapshot(File latestSnapshotRootDir) {
+    return true;
+  }
 }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java
new file mode 100644
index 00000000000..64c81a84b58
--- /dev/null
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.iot.util.TestEntry;
+import org.apache.iotdb.consensus.iot.util.TestStateMachine;
+
+import org.apache.ratis.util.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Regression test for the snapshot-load-failure bug (a failed snapshot load 
on the AddPeer target
+ * was silently swallowed, so the new peer was activated and the migration 
falsely reported
+ * successful, losing data on the new replica).
+ *
+ * <p>It builds a real two-node IoTConsensus group, forces the target peer's 
{@link
+ * org.apache.iotdb.consensus.IStateMachine#loadSnapshot} to fail, and 
verifies that {@code
+ * addRemotePeer}:
+ *
+ * <ul>
+ *   <li>actually reaches the snapshot-load step (so the failure is the one 
under test, not an
+ *       earlier step),
+ *   <li>fails with a {@link ConsensusException} instead of silently 
succeeding,
+ *   <li>does not leave the target peer active with an incompletely-loaded 
snapshot.
+ * </ul>
+ */
+public class AddPeerSnapshotLoadFailureTest {
+
+  private final Logger logger = 
LoggerFactory.getLogger(AddPeerSnapshotLoadFailureTest.class);
+
+  private final ConsensusGroupId gid = new DataRegionId(1);
+
+  private final int basePort = 9200;
+
+  private final List<Peer> peers =
+      Arrays.asList(
+          new Peer(gid, 1, new TEndPoint("127.0.0.1", basePort - 1)),
+          new Peer(gid, 2, new TEndPoint("127.0.0.1", basePort)));
+
+  private final List<File> peersStorage =
+      Arrays.asList(
+          new File("target" + File.separator + "snapshot-load-fail-1"),
+          new File("target" + File.separator + "snapshot-load-fail-2"));
+
+  private final List<List<String>> peersRecvSnapshotDirs =
+      Arrays.asList(
+          Arrays.asList(
+              "target" + File.separator + "snapshot-load-fail-1-recv-1",
+              "target" + File.separator + "snapshot-load-fail-1-recv-2"),
+          Arrays.asList(
+              "target" + File.separator + "snapshot-load-fail-2-recv-1",
+              "target" + File.separator + "snapshot-load-fail-2-recv-2"));
+
+  private final List<IoTConsensus> servers = new ArrayList<>();
+  private final List<ControllableStateMachine> stateMachines = new 
ArrayList<>();
+
+  /**
+   * A {@link TestStateMachine} that takes a real (single-file) snapshot and 
whose snapshot load can
+   * be made to fail on demand. When not forced to fail, {@link #loadSnapshot} 
mirrors the real
+   * {@code SnapshotLoader} contract by reporting failure when the snapshot 
root does not exist —
+   * which is exactly what happens for the receive folders that never got a 
fragment of a snapshot
+   * in a multi-data-dir deployment.
+   */
+  private static class ControllableStateMachine extends TestStateMachine {
+    private volatile boolean failLoadSnapshot = false;
+    private volatile boolean loadSnapshotInvoked = false;
+    private volatile boolean emptySnapshot = false;
+
+    void setFailLoadSnapshot(boolean failLoadSnapshot) {
+      this.failLoadSnapshot = failLoadSnapshot;
+    }
+
+    void setEmptySnapshot(boolean emptySnapshot) {
+      this.emptySnapshot = emptySnapshot;
+    }
+
+    boolean isLoadSnapshotInvoked() {
+      return loadSnapshotInvoked;
+    }
+
+    @Override
+    public boolean loadSnapshot(File latestSnapshotRootDir) {
+      loadSnapshotInvoked = true;
+      if (failLoadSnapshot) {
+        return false;
+      }
+      // Mirror SnapshotLoader: a receive folder that never received a 
fragment of this snapshot has
+      // no snapshot root, so loading from it must report failure.
+      return latestSnapshotRootDir.exists();
+    }
+
+    @Override
+    public boolean takeSnapshot(File snapshotDir) {
+      if (emptySnapshot) {
+        // Mirror an empty region: the snapshot has no fragments to transmit. 
The receiver then
+        // materializes no snapshot under any receive folder, which must still 
load successfully.
+        return true;
+      }
+      // Write a real (single) snapshot file so the transfer actually moves 
data and the receiver
+      // materializes the snapshot under a subset of its receive folders.
+      try {
+        Files.write(
+            new File(snapshotDir, "snapshot.data").toPath(),
+            "snapshot".getBytes(StandardCharsets.UTF_8));
+        return true;
+      } catch (IOException e) {
+        return false;
+      }
+    }
+
+    // TestStateMachine does not implement clearSnapshot (the IStateMachine 
default throws). The
+    // AddPeer flow calls it in a finally block to clean up the local 
snapshot, so we provide a
+    // no-op here; otherwise that cleanup would mask the ConsensusException we 
are asserting on.
+    @Override
+    public boolean clearSnapshot() {
+      return true;
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    for (File file : peersStorage) {
+      file.mkdirs();
+      stateMachines.add(new ControllableStateMachine());
+    }
+    peersRecvSnapshotDirs.forEach(innerList -> innerList.forEach(dir -> new 
File(dir).mkdirs()));
+    initServer();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    servers.parallelStream().forEach(IoTConsensus::stop);
+    servers.clear();
+    for (File file : peersStorage) {
+      FileUtils.deleteFully(file);
+    }
+    peersRecvSnapshotDirs.forEach(
+        innerList ->
+            innerList.forEach(
+                dir -> {
+                  try {
+                    FileUtils.deleteFully(new File(dir));
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                }));
+  }
+
+  private void initServer() throws IOException {
+    Assume.assumeTrue(checkPortAvailable());
+    try {
+      for (int i = 0; i < peers.size(); i++) {
+        int finalI = i;
+        servers.add(
+            (IoTConsensus)
+                ConsensusFactory.getConsensusImpl(
+                        ConsensusFactory.IOT_CONSENSUS,
+                        ConsensusConfig.newBuilder()
+                            .setThisNodeId(peers.get(i).getNodeId())
+                            .setThisNode(peers.get(i).getEndpoint())
+                            
.setStorageDir(peersStorage.get(i).getAbsolutePath())
+                            .setRecvSnapshotDirs(peersRecvSnapshotDirs.get(i))
+                            
.setConsensusGroupType(TConsensusGroupType.DataRegion)
+                            .build(),
+                        groupId -> stateMachines.get(finalI))
+                    .orElseThrow(
+                        () ->
+                            new IllegalArgumentException(
+                                String.format(
+                                    ConsensusFactory.CONSTRUCT_FAILED_MSG,
+                                    ConsensusFactory.IOT_CONSENSUS))));
+      }
+      for (int i = 0; i < peers.size(); i++) {
+        servers.get(i).start();
+      }
+    } catch (IOException e) {
+      if (e.getCause() instanceof StartupException) {
+        // just succeed when can not bind socket
+        logger.info("Can not start IoTConsensus because", e);
+        Assume.assumeTrue(false);
+      } else {
+        logger.error("Failed because", e);
+        Assert.fail("Failed because " + e.getMessage());
+      }
+    }
+  }
+
+  @Test
+  public void addRemotePeerMustFailWhenTargetSnapshotLoadFails() throws 
Exception {
+    // node 0 is the sole initial member; node 1 will be added as a new peer. 
Mirroring the real
+    // region-migration flow, the destination peer (node 1) is pre-created 
locally with the full
+    // target peer list (IoTConsensus, unlike Ratis, requires a non-empty peer 
list here).
+    servers.get(0).createLocalPeer(gid, peers.subList(0, 1));
+    servers.get(1).createLocalPeer(gid, peers);
+
+    // Put some data into the group so the snapshot transfer is meaningful.
+    for (int i = 0; i < 10; i++) {
+      servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
+    }
+
+    // Force the target peer (node 1) to fail loading the transferred snapshot.
+    stateMachines.get(1).setFailLoadSnapshot(true);
+
+    // Before the fix, addRemotePeer swallowed the load failure and returned 
normally, leaving the
+    // target peer active with incomplete data. It must now surface the 
failure.
+    Assert.assertThrows(
+        ConsensusException.class, () -> servers.get(0).addRemotePeer(gid, 
peers.get(1)));
+
+    // The failure must be the snapshot load itself, i.e. the AddPeer flow 
actually reached the
+    // load step on the target rather than aborting earlier.
+    Assert.assertTrue(
+        "Target peer's loadSnapshot was never invoked; the failure came from 
an earlier step",
+        stateMachines.get(1).isLoadSnapshotInvoked());
+
+    // The target peer must not be left active with an incompletely-loaded 
snapshot.
+    Assert.assertFalse(
+        "Target peer was activated despite a failed snapshot load",
+        servers.get(1).getImpl(gid).isActive());
+  }
+
+  /**
+   * A target peer configures one receive folder per local data dir, and 
snapshot fragments are
+   * spread across those folders by the FolderManager, so a small snapshot 
only materializes under a
+   * subset of them. A healthy transfer must therefore still succeed even 
though some receive
+   * folders never received any fragment of the snapshot (regression for the 
multi-data-dir
+   * false-failure reported on the load-failure-propagation PR).
+   */
+  @Test
+  public void addRemotePeerSucceedsWhenSnapshotSpansSubsetOfRecvDirs() throws 
Exception {
+    servers.get(0).createLocalPeer(gid, peers.subList(0, 1));
+    servers.get(1).createLocalPeer(gid, peers);
+
+    for (int i = 0; i < 10; i++) {
+      servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
+    }
+
+    // Do NOT force a load failure: this is a healthy transfer. The (small) 
snapshot lands in only a
+    // subset of the target's receive folders, so the others legitimately have 
no snapshot root.
+    // Before the fix, loadSnapshot() loaded from every receive folder and 
treated the missing ones
+    // as failures, turning this healthy multi-data-dir transfer into a 
spurious failure.
+    servers.get(0).addRemotePeer(gid, peers.get(1));
+
+    Assert.assertTrue(
+        "Target peer's loadSnapshot was never invoked",
+        stateMachines.get(1).isLoadSnapshotInvoked());
+    Assert.assertTrue(
+        "Target peer was not activated after a successful snapshot load",
+        servers.get(1).getImpl(gid).isActive());
+
+    // Sanity-check that the test actually exercised a partial spread: at 
least one of the target's
+    // receive folders must hold no snapshot at all, otherwise the 
skipped-folder path is untested.
+    long emptyRecvFolders =
+        peersRecvSnapshotDirs.get(1).stream()
+            .map(dir -> new File(dir, 
IoTConsensusServerImpl.SNAPSHOT_DIR_NAME))
+            .filter(recvFolder -> isEmptyOrMissing(recvFolder))
+            .count();
+    Assert.assertTrue(
+        "Expected at least one receive folder without the snapshot, but every 
folder had it",
+        emptyRecvFolders > 0);
+  }
+
+  private static boolean isEmptyOrMissing(File dir) {
+    String[] children = dir.list();
+    return children == null || children.length == 0;
+  }
+
+  /**
+   * An empty region produces a snapshot with zero fragments, so nothing is 
transmitted and the
+   * target materializes no snapshot under any receive folder. This must still 
be treated as a
+   * successful (no-op) load — otherwise migrating an empty region (e.g. while 
removing a DataNode)
+   * fails. Regression for the multi-data-dir AddPeer false-failure on empty 
regions.
+   */
+  @Test
+  public void addRemotePeerSucceedsWhenSnapshotIsEmpty() throws Exception {
+    servers.get(0).createLocalPeer(gid, peers.subList(0, 1));
+    servers.get(1).createLocalPeer(gid, peers);
+
+    // No data is written, and the source takes an empty snapshot: zero 
fragments are transmitted.
+    stateMachines.get(0).setEmptySnapshot(true);
+
+    // Before the fix, loadSnapshot() reported failure when no receive folder 
contained the
+    // snapshot, so adding a peer for an empty region failed and DataNode 
removal timed out.
+    servers.get(0).addRemotePeer(gid, peers.get(1));
+
+    Assert.assertTrue(
+        "Target peer was not activated after an empty snapshot load",
+        servers.get(1).getImpl(gid).isActive());
+
+    // Confirm the test really exercised the empty-snapshot path: no receive 
folder holds a
+    // snapshot.
+    long nonEmptyRecvFolders =
+        peersRecvSnapshotDirs.get(1).stream()
+            .map(dir -> new File(dir, 
IoTConsensusServerImpl.SNAPSHOT_DIR_NAME))
+            .filter(recvFolder -> !isEmptyOrMissing(recvFolder))
+            .count();
+    Assert.assertEquals(
+        "Expected no receive folder to contain a snapshot for an empty region",
+        0,
+        nonEmptyRecvFolders);
+  }
+
+  private boolean checkPortAvailable() {
+    for (Peer peer : this.peers) {
+      try (ServerSocket ignored = new ServerSocket(peer.getEndpoint().port)) {
+        logger.info("check port {} success for node {}", 
peer.getEndpoint().port, peer.getNodeId());
+      } catch (IOException e) {
+        logger.error("check port {} failed for node {}", 
peer.getEndpoint().port, peer.getNodeId());
+        return false;
+      }
+    }
+    return true;
+  }
+}
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
index a879a034784..9454dcb0c43 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
@@ -142,5 +142,7 @@ public class TestStateMachine implements IStateMachine, 
IStateMachine.EventApi {
   }
 
   @Override
-  public void loadSnapshot(File latestSnapshotRootDir) {}
+  public boolean loadSnapshot(File latestSnapshotRootDir) {
+    return true;
+  }
 }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 5cac2173396..3217d1cc58e 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -177,13 +177,15 @@ public class TestUtils {
     }
 
     @Override
-    public void loadSnapshot(File latestSnapshotRootDir) {
+    public boolean loadSnapshot(File latestSnapshotRootDir) {
       File snapshot =
           new File(latestSnapshotRootDir.getAbsolutePath() + File.separator + 
"snapshot");
       try (Scanner scanner = new Scanner(snapshot)) {
         integer.set(Integer.parseInt(scanner.next()));
+        return true;
       } catch (FileNotFoundException e) {
         logger.error("cannot find snapshot file {}", snapshot);
+        return false;
       }
     }
 
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
index d50ab992f6f..ff062fb63a8 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
@@ -123,7 +123,9 @@ public class SimpleConsensusTest {
     }
 
     @Override
-    public void loadSnapshot(File latestSnapshotRootDir) {}
+    public boolean loadSnapshot(File latestSnapshotRootDir) {
+      return true;
+    }
   }
 
   @Before
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 2de1ec9fdc0..62bb498afa1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -124,7 +124,7 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
   }
 
   @Override
-  public void loadSnapshot(File latestSnapshotRootDir) {
+  public boolean loadSnapshot(File latestSnapshotRootDir) {
     String databaseName = region.getDatabaseName();
     String dataRegionIdString = region.getDataRegionIdString();
     DataRegionId regionId = new 
DataRegionId(Integer.parseInt(dataRegionIdString));
@@ -141,14 +141,16 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
                           .loadSnapshotForStateMachine());
       if (newRegion == null) {
         logger.error(DataNodeMiscMessages.FAIL_LOAD_SNAPSHOT, 
latestSnapshotRootDir);
-        return;
+        return false;
       }
       this.region = newRegion;
       ChunkCache.getInstance().clear();
       TimeSeriesMetadataCache.getInstance().clear();
       BloomFilterCache.getInstance().clear();
+      return true;
     } catch (Exception e) {
       logger.error(DataNodeMiscMessages.EXCEPTION_REPLACING_DATA_REGION, e);
+      return false;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
index edab5862d11..2f5ae61e85e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
@@ -136,14 +136,23 @@ public class SchemaRegionStateMachine extends 
BaseStateMachine {
   }
 
   @Override
-  public void loadSnapshot(final File latestSnapshotRootDir) {
-    schemaRegion.loadSnapshot(latestSnapshotRootDir);
-    PipeDataNodeAgent.runtime()
-        .schemaListener(schemaRegion.getSchemaRegionId())
-        .loadSnapshot(latestSnapshotRootDir);
-    // We recompute the snapshot for pipe listener when loading snapshot
-    // to recover the newest snapshot in cache
-    listen2Snapshot4PipeListener(false);
+  public boolean loadSnapshot(final File latestSnapshotRootDir) {
+    try {
+      // The boolean result must reflect whether the schema-region data was 
actually loaded, so
+      // callers (e.g. the AddPeer flow and the Ratis snapshot-install path) 
can detect a real
+      // failure instead of treating a fallback-to-empty load as success.
+      final boolean loadSucceeded = 
schemaRegion.loadSnapshot(latestSnapshotRootDir);
+      PipeDataNodeAgent.runtime()
+          .schemaListener(schemaRegion.getSchemaRegionId())
+          .loadSnapshot(latestSnapshotRootDir);
+      // We recompute the snapshot for pipe listener when loading snapshot
+      // to recover the newest snapshot in cache
+      listen2Snapshot4PipeListener(false);
+      return loadSucceeded;
+    } catch (Exception e) {
+      logger.error("Failed to load snapshot from {}", latestSnapshotRootDir, 
e);
+      return false;
+    }
   }
 
   public void listen2Snapshot4PipeListener(final boolean isTmp) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
index 8d87435e108..540d5d4274c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
@@ -115,7 +115,15 @@ public interface ISchemaRegion {
 
   boolean createSnapshot(final File snapshotDir);
 
-  void loadSnapshot(final File latestSnapshotRootDir);
+  /**
+   * Load the latest schema snapshot from the given dir.
+   *
+   * @return {@code true} if the snapshot was loaded successfully, {@code 
false} if loading failed
+   *     (in which case the region falls back to an empty/re-initialized 
state). Callers rely on
+   *     this to honor the {@link 
org.apache.iotdb.consensus.IStateMachine#loadSnapshot}
+   *     success/failure contract (e.g. the AddPeer flow and Ratis snapshot 
install).
+   */
+  boolean loadSnapshot(final File latestSnapshotRootDir);
 
   // endregion
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 0394a81680f..1f2408cca12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -556,7 +556,7 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
 
   // currently, this method is only used for cluster-ratis mode
   @Override
-  public void loadSnapshot(final File latestSnapshotRootDir) {
+  public boolean loadSnapshot(final File latestSnapshotRootDir) {
     clear();
 
     logger.info(DataNodeSchemaMessages.START_LOADING_SNAPSHOT, schemaRegionId);
@@ -651,6 +651,7 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
           schemaRegionId,
           System.currentTimeMillis() - startTime);
       logger.info(DataNodeSchemaMessages.SUCCESSFULLY_LOAD_SNAPSHOT, 
schemaRegionId);
+      return true;
     } catch (final Exception e) {
       logger.error(
           DataNodeSchemaMessages.FAILED_TO_LOAD_SNAPSHOT, schemaRegionId, 
e.getMessage(), e);
@@ -662,6 +663,9 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
         logger.error(
             DataNodeSchemaMessages.ERROR_DURING_INIT_SCHEMA_REGION, 
schemaRegionId, exception);
       }
+      // The snapshot was not loaded (the region fell back to an empty 
re-initialized state). Report
+      // the failure so callers honoring the loadSnapshot success/failure 
contract can react.
+      return false;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index df5ad41cff6..8d87ef3f061 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -534,7 +534,7 @@ public class SchemaRegionPBTreeImpl implements 
ISchemaRegion {
 
   // currently, this method is only used for cluster-ratis mode
   @Override
-  public void loadSnapshot(File latestSnapshotRootDir) {
+  public boolean loadSnapshot(File latestSnapshotRootDir) {
     clear();
 
     logger.info(DataNodeSchemaMessages.START_LOADING_SNAPSHOT, schemaRegionId);
@@ -579,6 +579,7 @@ public class SchemaRegionPBTreeImpl implements 
ISchemaRegion {
           schemaRegionId,
           System.currentTimeMillis() - startTime);
       logger.info(DataNodeSchemaMessages.SUCCESSFULLY_LOAD_SNAPSHOT, 
schemaRegionId);
+      return true;
     } catch (IOException | MetadataException e) {
       logger.error(
           DataNodeSchemaMessages.FAILED_TO_LOAD_SNAPSHOT, schemaRegionId, 
e.getMessage(), e);
@@ -592,6 +593,9 @@ public class SchemaRegionPBTreeImpl implements 
ISchemaRegion {
             schemaRegionId,
             metadataException);
       }
+      // The snapshot was not loaded (the region fell back to an empty 
re-initialized state). Report
+      // the failure so callers honoring the loadSnapshot success/failure 
contract can react.
+      return false;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
index 38f56b66dd9..0c0b3a5b953 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
@@ -96,7 +96,7 @@ public class SchemaRegionManagementTest extends 
AbstractSchemaRegionTest {
       snapshotDir.mkdir();
       schemaRegion.createSnapshot(snapshotDir);
 
-      schemaRegion.loadSnapshot(snapshotDir);
+      Assert.assertTrue(schemaRegion.loadSnapshot(snapshotDir));
 
       List<ITimeSeriesSchemaInfo> result =
           SchemaRegionTestUtil.showTimeseries(
@@ -112,7 +112,7 @@ public class SchemaRegionManagementTest extends 
AbstractSchemaRegionTest {
       simulateRestart();
 
       ISchemaRegion newSchemaRegion = getSchemaRegion("root.sg", 0);
-      newSchemaRegion.loadSnapshot(snapshotDir);
+      Assert.assertTrue(newSchemaRegion.loadSnapshot(snapshotDir));
       result =
           SchemaRegionTestUtil.showTimeseries(
               newSchemaRegion, new PartialPath("root.sg.**"), false, 
"tag-key", "tag-value");
@@ -171,7 +171,7 @@ public class SchemaRegionManagementTest extends 
AbstractSchemaRegionTest {
       snapshotDir.mkdir();
       schemaRegion.createSnapshot(snapshotDir);
 
-      schemaRegion.loadSnapshot(snapshotDir);
+      Assert.assertTrue(schemaRegion.loadSnapshot(snapshotDir));
 
       List<ITimeSeriesSchemaInfo> result =
           SchemaRegionTestUtil.showTimeseries(
@@ -182,7 +182,7 @@ public class SchemaRegionManagementTest extends 
AbstractSchemaRegionTest {
       simulateRestart();
 
       ISchemaRegion newSchemaRegion = getSchemaRegion("root.sg", 0);
-      newSchemaRegion.loadSnapshot(snapshotDir);
+      Assert.assertTrue(newSchemaRegion.loadSnapshot(snapshotDir));
       result =
           SchemaRegionTestUtil.showTimeseries(
               newSchemaRegion, new PartialPath("root.sg.**"), false, 
"tag-key", "tag-value");
@@ -193,6 +193,26 @@ public class SchemaRegionManagementTest extends 
AbstractSchemaRegionTest {
     }
   }
 
+  @Test
+  public void testLoadSnapshotReportsFailureWhenSnapshotIsMissing() throws 
Exception {
+    String schemaRegionConsensusProtocolClass = 
config.getSchemaRegionConsensusProtocolClass();
+    
config.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+    try {
+      ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
+
+      // Loading from a directory that does not contain a snapshot must report 
failure rather than
+      // silently falling back to an empty region and reporting success. 
Callers (the AddPeer flow
+      // and the Ratis snapshot-install path) rely on this success/failure 
contract.
+      File missingSnapshotDir =
+          new File(config.getSchemaDir() + File.separator + 
"non-existent-snapshot");
+      Assert.assertFalse(missingSnapshotDir.exists());
+
+      Assert.assertFalse(schemaRegion.loadSnapshot(missingSnapshotDir));
+    } finally {
+      
config.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass);
+    }
+  }
+
   @Test
   @Ignore
   public void testSnapshotPerformance() throws Exception {


Reply via email to