This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a4ed6dd6ab8 Propagate snapshot load failure during IoTConsensus
AddPeer (#17935)
a4ed6dd6ab8 is described below
commit a4ed6dd6ab8bbae1671ffdacf7dd9dca64c556e7
Author: Yongzao <[email protected]>
AuthorDate: Tue Jun 16 11:30:46 2026 +0800
Propagate snapshot load failure during IoTConsensus AddPeer (#17935)
---
.../statemachine/ConfigRegionStateMachine.java | 10 +-
.../persistence/executor/ConfigPlanExecutor.java | 7 +-
.../org/apache/iotdb/consensus/IStateMachine.java | 5 +-
.../consensus/iot/IoTConsensusServerImpl.java | 31 +-
.../service/IoTConsensusRPCServiceProcessor.java | 12 +-
.../ratis/ApplicationStateMachineProxy.java | 13 +-
.../simple/SimpleConsensusServerImpl.java | 4 +-
.../apache/iotdb/consensus/EmptyStateMachine.java | 4 +-
.../iot/AddPeerSnapshotLoadFailureTest.java | 354 +++++++++++++++++++++
.../iotdb/consensus/iot/util/TestStateMachine.java | 4 +-
.../apache/iotdb/consensus/ratis/TestUtils.java | 4 +-
.../consensus/simple/SimpleConsensusTest.java | 4 +-
.../dataregion/DataRegionStateMachine.java | 6 +-
.../schemaregion/SchemaRegionStateMachine.java | 25 +-
.../schemaengine/schemaregion/ISchemaRegion.java | 10 +-
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 6 +-
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 6 +-
.../schemaRegion/SchemaRegionManagementTest.java | 28 +-
18 files changed, 493 insertions(+), 40 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index b3029e66028..ad0a82bcf56 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -246,9 +246,14 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
}
@Override
- public void loadSnapshot(final File latestSnapshotRootDir) {
+ public boolean loadSnapshot(final File latestSnapshotRootDir) {
+ // The boolean result must reflect whether the ConfigRegion state-machine
data was loaded, so
+ // callers (e.g. the AddPeer flow) can detect a real failure. The
pipe-listener recomputation
+ // below is best-effort post-processing: a failure there is logged but
must NOT be reported as a
+ // snapshot-load failure, otherwise it would (e.g.) abort ConfigNode
(re)initialization on what
+ // is actually a healthy data load.
+ final boolean loadSucceeded = executor.loadSnapshot(latestSnapshotRootDir);
try {
- executor.loadSnapshot(latestSnapshotRootDir);
// We recompute the snapshot for pipe listener when loading snapshot
// to recover the newest snapshot in cache
PipeConfigNodeAgent.runtime()
@@ -261,6 +266,7 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
e);
}
}
+ return loadSucceeded;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index b6d1bba67bd..ed23fda8a6f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -772,12 +772,12 @@ public class ConfigPlanExecutor {
return result.get();
}
- public void loadSnapshot(final File latestSnapshotRootDir) {
+ public boolean loadSnapshot(final File latestSnapshotRootDir) {
if (!latestSnapshotRootDir.exists()) {
LOGGER.error(
ConfigNodeMessages.SNAPSHOT_DIRECTORY_IS_NOT_EXIST_CAN_NOT_LOAD_SNAPSHOT_WITH,
latestSnapshotRootDir.getAbsolutePath());
- return;
+ return false;
}
final AtomicBoolean result = new AtomicBoolean(true);
@@ -805,6 +805,9 @@ public class ConfigPlanExecutor {
ConfigNodeMessages.CONFIGNODESNAPSHOT_LOAD_SNAPSHOT_SUCCESS_LATESTSNAPSHOTROOTDIR,
latestSnapshotRootDir);
}
+ // Propagate any snapshot-load failure so callers (e.g. the AddPeer flow)
do not treat a
+ // partially or wholly failed load as success.
+ return result.get();
}
private DataSet getSchemaNodeManagementPartition(ConfigPhysicalPlan req) {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 3354c83699b..c7705d93896 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -112,8 +112,11 @@ public interface IStateMachine {
* Load the latest snapshot from given dir.
*
* @param latestSnapshotRootDir dir where the latest snapshot sits
+ * @return {@code true} if the snapshot was loaded successfully, {@code
false} otherwise. Callers
+ * (e.g. the IoTConsensus AddPeer flow) rely on this to avoid activating
a new peer whose
+ * snapshot failed to load, which would otherwise silently lose data.
*/
- void loadSnapshot(File latestSnapshotRootDir);
+ boolean loadSnapshot(File latestSnapshotRootDir);
/**
* given a snapshot dir, ask statemachine to provide all snapshot files. By
default, it will list
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index f3c7d4e50ae..017b646e606 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -511,14 +511,29 @@ public class IoTConsensusServerImpl {
}
}
- public void loadSnapshot(String snapshotId) {
- // TODO: (xingtanzjr) throw exception if the snapshot load failed
- recvFolderManager
- .getFolders()
- .forEach(
- dir -> {
- stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId));
- });
+ public boolean loadSnapshot(String snapshotId) {
+ // Snapshot fragments are spread across the receive folders by the
FolderManager (a DataRegion,
+ // for example, uses one receive folder per local data dir), so a given
snapshot only exists
+ // under the folders that actually received fragments. Load from those
folders and skip the
+ // others; otherwise the state machine would fail on a folder that never
received this snapshot
+ // and turn a healthy multi-data-dir transfer into a spurious failure.
+ //
+ // Note: an empty region produces a snapshot with zero fragments, so none
of the receive folders
+ // contains it. That is a legitimate (no-op) load, not a failure, so an
absent snapshot must not
+ // be reported as failure here.
+ for (String dir : recvFolderManager.getFolders()) {
+ File snapshotDir = getSnapshotPath(dir, snapshotId);
+ if (!snapshotDir.exists()) {
+ continue;
+ }
+ if (!stateMachine.loadSnapshot(snapshotDir)) {
+ // Stop at the first failure. The snapshot is already broken on this
replica, and loading
+ // the remaining folders is both pointless and harmful: a load wipes
the data dirs before
+ // relinking. Report the failure so the AddPeer coordinator does not
activate this peer.
+ return false;
+ }
+ }
+ return true;
}
private File getSnapshotPath(String curStorageDir, String
snapshotRelativePath) {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index c9a9901dbcf..82394b6a244 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -343,7 +343,17 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Ifa
status.setMessage(message);
return new TTriggerSnapshotLoadRes(status);
}
- impl.loadSnapshot(req.snapshotId);
+ if (!impl.loadSnapshot(req.snapshotId)) {
+ String message =
+ String.format(
+ "Failed to load snapshot %s for consensus group %s",
req.snapshotId, groupId);
+ LOGGER.error(message);
+ // Surface a region-migration-specific code (the snapshot load runs as
part of the AddPeer
+ // flow) rather than a generic internal error, so the coordinator's
failure is meaningful.
+ TSStatus status = new
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage(message);
+ return new TTriggerSnapshotLoadRes(status);
+ }
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION);
return new TTriggerSnapshotLoadRes(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 1134d8fd6f2..3e2451e65dd 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -98,7 +98,7 @@ public class ApplicationStateMachineProxy extends
BaseStateMachine {
}
@Override
- public void reinitialize() {
+ public void reinitialize() throws IOException {
setLastAppliedTermIndex(null);
loadSnapshot(snapshotStorage.findLatestSnapshotDir());
if (getLifeCycleState() == LifeCycle.State.PAUSED) {
@@ -261,14 +261,21 @@ public class ApplicationStateMachineProxy extends
BaseStateMachine {
}
}
- private void loadSnapshot(File latestSnapshotDir) {
+ private void loadSnapshot(File latestSnapshotDir) throws IOException {
snapshotStorage.updateSnapshotCache();
if (latestSnapshotDir == null) {
return;
}
// require the application statemachine to load the latest snapshot
- applicationStateMachine.loadSnapshot(latestSnapshotDir);
+ if (!applicationStateMachine.loadSnapshot(latestSnapshotDir)) {
+ // The application state machine rejected this snapshot. Do not advance
lastAppliedTermIndex:
+ // claiming the snapshot as applied would let Ratis proceed as if it
were installed and run on
+ // incomplete data (silent data loss). Fail (re)initialization instead
so the snapshot install
+ // is treated as failed and can be retried.
+ throw new IOException(
+ String.format("%s: failed to load snapshot from %s", this,
latestSnapshotDir));
+ }
TermIndex snapshotTermIndex = Utils.getTermIndexFromDir(latestSnapshotDir);
updateLastAppliedTermIndex(snapshotTermIndex.getTerm(),
snapshotTermIndex.getIndex());
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
index 4ed4a7b41ce..b76bcdeaaeb 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
@@ -89,7 +89,7 @@ public class SimpleConsensusServerImpl implements
IStateMachine {
}
@Override
- public synchronized void loadSnapshot(File latestSnapshotRootDir) {
- stateMachine.loadSnapshot(latestSnapshotRootDir);
+ public synchronized boolean loadSnapshot(File latestSnapshotRootDir) {
+ return stateMachine.loadSnapshot(latestSnapshotRootDir);
}
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index aafa0be5bb5..997120b00f9 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -54,5 +54,7 @@ public class EmptyStateMachine implements IStateMachine,
IStateMachine.EventApi
}
@Override
- public void loadSnapshot(File latestSnapshotRootDir) {}
+ public boolean loadSnapshot(File latestSnapshotRootDir) {
+ return true;
+ }
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java
new file mode 100644
index 00000000000..64c81a84b58
--- /dev/null
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.iot.util.TestEntry;
+import org.apache.iotdb.consensus.iot.util.TestStateMachine;
+
+import org.apache.ratis.util.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Regression test for the snapshot-load-failure bug (a failed snapshot load
on the AddPeer target
+ * was silently swallowed, so the new peer was activated and the migration
falsely reported
+ * successful, losing data on the new replica).
+ *
+ * <p>It builds a real two-node IoTConsensus group, forces the target peer's
{@link
+ * org.apache.iotdb.consensus.IStateMachine#loadSnapshot} to fail, and
verifies that {@code
+ * addRemotePeer}:
+ *
+ * <ul>
+ * <li>actually reaches the snapshot-load step (so the failure is the one
under test, not an
+ * earlier step),
+ * <li>fails with a {@link ConsensusException} instead of silently
succeeding,
+ * <li>does not leave the target peer active with an incompletely-loaded
snapshot.
+ * </ul>
+ */
+public class AddPeerSnapshotLoadFailureTest {
+
+ private final Logger logger =
LoggerFactory.getLogger(AddPeerSnapshotLoadFailureTest.class);
+
+ private final ConsensusGroupId gid = new DataRegionId(1);
+
+ private final int basePort = 9200;
+
+ private final List<Peer> peers =
+ Arrays.asList(
+ new Peer(gid, 1, new TEndPoint("127.0.0.1", basePort - 1)),
+ new Peer(gid, 2, new TEndPoint("127.0.0.1", basePort)));
+
+ private final List<File> peersStorage =
+ Arrays.asList(
+ new File("target" + File.separator + "snapshot-load-fail-1"),
+ new File("target" + File.separator + "snapshot-load-fail-2"));
+
+ private final List<List<String>> peersRecvSnapshotDirs =
+ Arrays.asList(
+ Arrays.asList(
+ "target" + File.separator + "snapshot-load-fail-1-recv-1",
+ "target" + File.separator + "snapshot-load-fail-1-recv-2"),
+ Arrays.asList(
+ "target" + File.separator + "snapshot-load-fail-2-recv-1",
+ "target" + File.separator + "snapshot-load-fail-2-recv-2"));
+
+ private final List<IoTConsensus> servers = new ArrayList<>();
+ private final List<ControllableStateMachine> stateMachines = new
ArrayList<>();
+
+ /**
+ * A {@link TestStateMachine} that takes a real (single-file) snapshot and
whose snapshot load can
+ * be made to fail on demand. When not forced to fail, {@link #loadSnapshot}
mirrors the real
+ * {@code SnapshotLoader} contract by reporting failure when the snapshot
root does not exist —
+ * which is exactly what happens for the receive folders that never got a
fragment of a snapshot
+ * in a multi-data-dir deployment.
+ */
+ private static class ControllableStateMachine extends TestStateMachine {
+ private volatile boolean failLoadSnapshot = false;
+ private volatile boolean loadSnapshotInvoked = false;
+ private volatile boolean emptySnapshot = false;
+
+ void setFailLoadSnapshot(boolean failLoadSnapshot) {
+ this.failLoadSnapshot = failLoadSnapshot;
+ }
+
+ void setEmptySnapshot(boolean emptySnapshot) {
+ this.emptySnapshot = emptySnapshot;
+ }
+
+ boolean isLoadSnapshotInvoked() {
+ return loadSnapshotInvoked;
+ }
+
+ @Override
+ public boolean loadSnapshot(File latestSnapshotRootDir) {
+ loadSnapshotInvoked = true;
+ if (failLoadSnapshot) {
+ return false;
+ }
+ // Mirror SnapshotLoader: a receive folder that never received a
fragment of this snapshot has
+ // no snapshot root, so loading from it must report failure.
+ return latestSnapshotRootDir.exists();
+ }
+
+ @Override
+ public boolean takeSnapshot(File snapshotDir) {
+ if (emptySnapshot) {
+ // Mirror an empty region: the snapshot has no fragments to transmit.
The receiver then
+ // materializes no snapshot under any receive folder, which must still
load successfully.
+ return true;
+ }
+ // Write a real (single) snapshot file so the transfer actually moves
data and the receiver
+ // materializes the snapshot under a subset of its receive folders.
+ try {
+ Files.write(
+ new File(snapshotDir, "snapshot.data").toPath(),
+ "snapshot".getBytes(StandardCharsets.UTF_8));
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ // TestStateMachine does not implement clearSnapshot (the IStateMachine
default throws). The
+ // AddPeer flow calls it in a finally block to clean up the local
snapshot, so we provide a
+ // no-op here; otherwise that cleanup would mask the ConsensusException we
are asserting on.
+ @Override
+ public boolean clearSnapshot() {
+ return true;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ for (File file : peersStorage) {
+ file.mkdirs();
+ stateMachines.add(new ControllableStateMachine());
+ }
+ peersRecvSnapshotDirs.forEach(innerList -> innerList.forEach(dir -> new
File(dir).mkdirs()));
+ initServer();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ servers.parallelStream().forEach(IoTConsensus::stop);
+ servers.clear();
+ for (File file : peersStorage) {
+ FileUtils.deleteFully(file);
+ }
+ peersRecvSnapshotDirs.forEach(
+ innerList ->
+ innerList.forEach(
+ dir -> {
+ try {
+ FileUtils.deleteFully(new File(dir));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ }
+
+ private void initServer() throws IOException {
+ Assume.assumeTrue(checkPortAvailable());
+ try {
+ for (int i = 0; i < peers.size(); i++) {
+ int finalI = i;
+ servers.add(
+ (IoTConsensus)
+ ConsensusFactory.getConsensusImpl(
+ ConsensusFactory.IOT_CONSENSUS,
+ ConsensusConfig.newBuilder()
+ .setThisNodeId(peers.get(i).getNodeId())
+ .setThisNode(peers.get(i).getEndpoint())
+
.setStorageDir(peersStorage.get(i).getAbsolutePath())
+ .setRecvSnapshotDirs(peersRecvSnapshotDirs.get(i))
+
.setConsensusGroupType(TConsensusGroupType.DataRegion)
+ .build(),
+ groupId -> stateMachines.get(finalI))
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ ConsensusFactory.CONSTRUCT_FAILED_MSG,
+ ConsensusFactory.IOT_CONSENSUS))));
+ }
+ for (int i = 0; i < peers.size(); i++) {
+ servers.get(i).start();
+ }
+ } catch (IOException e) {
+ if (e.getCause() instanceof StartupException) {
+ // just succeed when can not bind socket
+ logger.info("Can not start IoTConsensus because", e);
+ Assume.assumeTrue(false);
+ } else {
+ logger.error("Failed because", e);
+ Assert.fail("Failed because " + e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void addRemotePeerMustFailWhenTargetSnapshotLoadFails() throws
Exception {
+ // node 0 is the sole initial member; node 1 will be added as a new peer.
Mirroring the real
+ // region-migration flow, the destination peer (node 1) is pre-created
locally with the full
+ // target peer list (IoTConsensus, unlike Ratis, requires a non-empty peer
list here).
+ servers.get(0).createLocalPeer(gid, peers.subList(0, 1));
+ servers.get(1).createLocalPeer(gid, peers);
+
+ // Put some data into the group so the snapshot transfer is meaningful.
+ for (int i = 0; i < 10; i++) {
+ servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
+ }
+
+ // Force the target peer (node 1) to fail loading the transferred snapshot.
+ stateMachines.get(1).setFailLoadSnapshot(true);
+
+ // Before the fix, addRemotePeer swallowed the load failure and returned
normally, leaving the
+ // target peer active with incomplete data. It must now surface the
failure.
+ Assert.assertThrows(
+ ConsensusException.class, () -> servers.get(0).addRemotePeer(gid,
peers.get(1)));
+
+ // The failure must be the snapshot load itself, i.e. the AddPeer flow
actually reached the
+ // load step on the target rather than aborting earlier.
+ Assert.assertTrue(
+ "Target peer's loadSnapshot was never invoked; the failure came from
an earlier step",
+ stateMachines.get(1).isLoadSnapshotInvoked());
+
+ // The target peer must not be left active with an incompletely-loaded
snapshot.
+ Assert.assertFalse(
+ "Target peer was activated despite a failed snapshot load",
+ servers.get(1).getImpl(gid).isActive());
+ }
+
+ /**
+ * A target peer configures one receive folder per local data dir, and
snapshot fragments are
+ * spread across those folders by the FolderManager, so a small snapshot
only materializes under a
+ * subset of them. A healthy transfer must therefore still succeed even
though some receive
+ * folders never received any fragment of the snapshot (regression for the
multi-data-dir
+ * false-failure reported on the load-failure-propagation PR).
+ */
+ @Test
+ public void addRemotePeerSucceedsWhenSnapshotSpansSubsetOfRecvDirs() throws
Exception {
+ servers.get(0).createLocalPeer(gid, peers.subList(0, 1));
+ servers.get(1).createLocalPeer(gid, peers);
+
+ for (int i = 0; i < 10; i++) {
+ servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
+ }
+
+ // Do NOT force a load failure: this is a healthy transfer. The (small)
snapshot lands in only a
+ // subset of the target's receive folders, so the others legitimately have
no snapshot root.
+ // Before the fix, loadSnapshot() loaded from every receive folder and
treated the missing ones
+ // as failures, turning this healthy multi-data-dir transfer into a
spurious failure.
+ servers.get(0).addRemotePeer(gid, peers.get(1));
+
+ Assert.assertTrue(
+ "Target peer's loadSnapshot was never invoked",
+ stateMachines.get(1).isLoadSnapshotInvoked());
+ Assert.assertTrue(
+ "Target peer was not activated after a successful snapshot load",
+ servers.get(1).getImpl(gid).isActive());
+
+ // Sanity-check that the test actually exercised a partial spread: at
least one of the target's
+ // receive folders must hold no snapshot at all, otherwise the
skipped-folder path is untested.
+ long emptyRecvFolders =
+ peersRecvSnapshotDirs.get(1).stream()
+ .map(dir -> new File(dir,
IoTConsensusServerImpl.SNAPSHOT_DIR_NAME))
+ .filter(recvFolder -> isEmptyOrMissing(recvFolder))
+ .count();
+ Assert.assertTrue(
+ "Expected at least one receive folder without the snapshot, but every
folder had it",
+ emptyRecvFolders > 0);
+ }
+
+ private static boolean isEmptyOrMissing(File dir) {
+ String[] children = dir.list();
+ return children == null || children.length == 0;
+ }
+
+ /**
+ * An empty region produces a snapshot with zero fragments, so nothing is
transmitted and the
+ * target materializes no snapshot under any receive folder. This must still
be treated as a
+ * successful (no-op) load — otherwise migrating an empty region (e.g. while
removing a DataNode)
+ * fails. Regression for the multi-data-dir AddPeer false-failure on empty
regions.
+ */
+ @Test
+ public void addRemotePeerSucceedsWhenSnapshotIsEmpty() throws Exception {
+ servers.get(0).createLocalPeer(gid, peers.subList(0, 1));
+ servers.get(1).createLocalPeer(gid, peers);
+
+ // No data is written, and the source takes an empty snapshot: zero
fragments are transmitted.
+ stateMachines.get(0).setEmptySnapshot(true);
+
+ // Before the fix, loadSnapshot() reported failure when no receive folder
contained the
+ // snapshot, so adding a peer for an empty region failed and DataNode
removal timed out.
+ servers.get(0).addRemotePeer(gid, peers.get(1));
+
+ Assert.assertTrue(
+ "Target peer was not activated after an empty snapshot load",
+ servers.get(1).getImpl(gid).isActive());
+
+ // Confirm the test really exercised the empty-snapshot path: no receive
folder holds a
+ // snapshot.
+ long nonEmptyRecvFolders =
+ peersRecvSnapshotDirs.get(1).stream()
+ .map(dir -> new File(dir,
IoTConsensusServerImpl.SNAPSHOT_DIR_NAME))
+ .filter(recvFolder -> !isEmptyOrMissing(recvFolder))
+ .count();
+ Assert.assertEquals(
+ "Expected no receive folder to contain a snapshot for an empty region",
+ 0,
+ nonEmptyRecvFolders);
+ }
+
+ private boolean checkPortAvailable() {
+ for (Peer peer : this.peers) {
+ try (ServerSocket ignored = new ServerSocket(peer.getEndpoint().port)) {
+ logger.info("check port {} success for node {}",
peer.getEndpoint().port, peer.getNodeId());
+ } catch (IOException e) {
+ logger.error("check port {} failed for node {}",
peer.getEndpoint().port, peer.getNodeId());
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
index a879a034784..9454dcb0c43 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
@@ -142,5 +142,7 @@ public class TestStateMachine implements IStateMachine,
IStateMachine.EventApi {
}
@Override
- public void loadSnapshot(File latestSnapshotRootDir) {}
+ public boolean loadSnapshot(File latestSnapshotRootDir) {
+ return true;
+ }
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 5cac2173396..3217d1cc58e 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -177,13 +177,15 @@ public class TestUtils {
}
@Override
- public void loadSnapshot(File latestSnapshotRootDir) {
+ public boolean loadSnapshot(File latestSnapshotRootDir) {
File snapshot =
new File(latestSnapshotRootDir.getAbsolutePath() + File.separator +
"snapshot");
try (Scanner scanner = new Scanner(snapshot)) {
integer.set(Integer.parseInt(scanner.next()));
+ return true;
} catch (FileNotFoundException e) {
logger.error("cannot find snapshot file {}", snapshot);
+ return false;
}
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
index d50ab992f6f..ff062fb63a8 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
@@ -123,7 +123,9 @@ public class SimpleConsensusTest {
}
@Override
- public void loadSnapshot(File latestSnapshotRootDir) {}
+ public boolean loadSnapshot(File latestSnapshotRootDir) {
+ return true;
+ }
}
@Before
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 2de1ec9fdc0..62bb498afa1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -124,7 +124,7 @@ public class DataRegionStateMachine extends
BaseStateMachine {
}
@Override
- public void loadSnapshot(File latestSnapshotRootDir) {
+ public boolean loadSnapshot(File latestSnapshotRootDir) {
String databaseName = region.getDatabaseName();
String dataRegionIdString = region.getDataRegionIdString();
DataRegionId regionId = new
DataRegionId(Integer.parseInt(dataRegionIdString));
@@ -141,14 +141,16 @@ public class DataRegionStateMachine extends
BaseStateMachine {
.loadSnapshotForStateMachine());
if (newRegion == null) {
logger.error(DataNodeMiscMessages.FAIL_LOAD_SNAPSHOT,
latestSnapshotRootDir);
- return;
+ return false;
}
this.region = newRegion;
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
BloomFilterCache.getInstance().clear();
+ return true;
} catch (Exception e) {
logger.error(DataNodeMiscMessages.EXCEPTION_REPLACING_DATA_REGION, e);
+ return false;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
index edab5862d11..2f5ae61e85e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
@@ -136,14 +136,23 @@ public class SchemaRegionStateMachine extends
BaseStateMachine {
}
@Override
- public void loadSnapshot(final File latestSnapshotRootDir) {
- schemaRegion.loadSnapshot(latestSnapshotRootDir);
- PipeDataNodeAgent.runtime()
- .schemaListener(schemaRegion.getSchemaRegionId())
- .loadSnapshot(latestSnapshotRootDir);
- // We recompute the snapshot for pipe listener when loading snapshot
- // to recover the newest snapshot in cache
- listen2Snapshot4PipeListener(false);
+ public boolean loadSnapshot(final File latestSnapshotRootDir) {
+ try {
+ // The boolean result must reflect whether the schema-region data was
actually loaded, so
+ // callers (e.g. the AddPeer flow and the Ratis snapshot-install path)
can detect a real
+ // failure instead of treating a fallback-to-empty load as success.
+ final boolean loadSucceeded =
schemaRegion.loadSnapshot(latestSnapshotRootDir);
+ PipeDataNodeAgent.runtime()
+ .schemaListener(schemaRegion.getSchemaRegionId())
+ .loadSnapshot(latestSnapshotRootDir);
+ // We recompute the snapshot for pipe listener when loading snapshot
+ // to recover the newest snapshot in cache
+ listen2Snapshot4PipeListener(false);
+ return loadSucceeded;
+ } catch (Exception e) {
+ logger.error("Failed to load snapshot from {}", latestSnapshotRootDir,
e);
+ return false;
+ }
}
public void listen2Snapshot4PipeListener(final boolean isTmp) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
index 8d87435e108..540d5d4274c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
@@ -115,7 +115,15 @@ public interface ISchemaRegion {
boolean createSnapshot(final File snapshotDir);
- void loadSnapshot(final File latestSnapshotRootDir);
+ /**
+ * Load the latest schema snapshot from the given dir.
+ *
+ * @return {@code true} if the snapshot was loaded successfully, {@code
false} if loading failed
+ * (in which case the region falls back to an empty/re-initialized
state). Callers rely on
+ * this to honor the {@link
org.apache.iotdb.consensus.IStateMachine#loadSnapshot}
+ * success/failure contract (e.g. the AddPeer flow and Ratis snapshot
install).
+ */
+ boolean loadSnapshot(final File latestSnapshotRootDir);
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 0394a81680f..1f2408cca12 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -556,7 +556,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
// currently, this method is only used for cluster-ratis mode
@Override
- public void loadSnapshot(final File latestSnapshotRootDir) {
+ public boolean loadSnapshot(final File latestSnapshotRootDir) {
clear();
logger.info(DataNodeSchemaMessages.START_LOADING_SNAPSHOT, schemaRegionId);
@@ -651,6 +651,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
schemaRegionId,
System.currentTimeMillis() - startTime);
logger.info(DataNodeSchemaMessages.SUCCESSFULLY_LOAD_SNAPSHOT,
schemaRegionId);
+ return true;
} catch (final Exception e) {
logger.error(
DataNodeSchemaMessages.FAILED_TO_LOAD_SNAPSHOT, schemaRegionId,
e.getMessage(), e);
@@ -662,6 +663,9 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
logger.error(
DataNodeSchemaMessages.ERROR_DURING_INIT_SCHEMA_REGION,
schemaRegionId, exception);
}
+ // The snapshot was not loaded (the region fell back to an empty
re-initialized state). Report
+ // the failure so callers honoring the loadSnapshot success/failure
contract can react.
+ return false;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index df5ad41cff6..8d87ef3f061 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -534,7 +534,7 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
// currently, this method is only used for cluster-ratis mode
@Override
- public void loadSnapshot(File latestSnapshotRootDir) {
+ public boolean loadSnapshot(File latestSnapshotRootDir) {
clear();
logger.info(DataNodeSchemaMessages.START_LOADING_SNAPSHOT, schemaRegionId);
@@ -579,6 +579,7 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
schemaRegionId,
System.currentTimeMillis() - startTime);
logger.info(DataNodeSchemaMessages.SUCCESSFULLY_LOAD_SNAPSHOT,
schemaRegionId);
+ return true;
} catch (IOException | MetadataException e) {
logger.error(
DataNodeSchemaMessages.FAILED_TO_LOAD_SNAPSHOT, schemaRegionId,
e.getMessage(), e);
@@ -592,6 +593,9 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
schemaRegionId,
metadataException);
}
+ // The snapshot was not loaded (the region fell back to an empty
re-initialized state). Report
+ // the failure so callers honoring the loadSnapshot success/failure
contract can react.
+ return false;
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
index 38f56b66dd9..0c0b3a5b953 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java
@@ -96,7 +96,7 @@ public class SchemaRegionManagementTest extends
AbstractSchemaRegionTest {
snapshotDir.mkdir();
schemaRegion.createSnapshot(snapshotDir);
- schemaRegion.loadSnapshot(snapshotDir);
+ Assert.assertTrue(schemaRegion.loadSnapshot(snapshotDir));
List<ITimeSeriesSchemaInfo> result =
SchemaRegionTestUtil.showTimeseries(
@@ -112,7 +112,7 @@ public class SchemaRegionManagementTest extends
AbstractSchemaRegionTest {
simulateRestart();
ISchemaRegion newSchemaRegion = getSchemaRegion("root.sg", 0);
- newSchemaRegion.loadSnapshot(snapshotDir);
+ Assert.assertTrue(newSchemaRegion.loadSnapshot(snapshotDir));
result =
SchemaRegionTestUtil.showTimeseries(
newSchemaRegion, new PartialPath("root.sg.**"), false,
"tag-key", "tag-value");
@@ -171,7 +171,7 @@ public class SchemaRegionManagementTest extends
AbstractSchemaRegionTest {
snapshotDir.mkdir();
schemaRegion.createSnapshot(snapshotDir);
- schemaRegion.loadSnapshot(snapshotDir);
+ Assert.assertTrue(schemaRegion.loadSnapshot(snapshotDir));
List<ITimeSeriesSchemaInfo> result =
SchemaRegionTestUtil.showTimeseries(
@@ -182,7 +182,7 @@ public class SchemaRegionManagementTest extends
AbstractSchemaRegionTest {
simulateRestart();
ISchemaRegion newSchemaRegion = getSchemaRegion("root.sg", 0);
- newSchemaRegion.loadSnapshot(snapshotDir);
+ Assert.assertTrue(newSchemaRegion.loadSnapshot(snapshotDir));
result =
SchemaRegionTestUtil.showTimeseries(
newSchemaRegion, new PartialPath("root.sg.**"), false,
"tag-key", "tag-value");
@@ -193,6 +193,26 @@ public class SchemaRegionManagementTest extends
AbstractSchemaRegionTest {
}
}
+ @Test
+ public void testLoadSnapshotReportsFailureWhenSnapshotIsMissing() throws
Exception {
+ String schemaRegionConsensusProtocolClass =
config.getSchemaRegionConsensusProtocolClass();
+
config.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+ try {
+ ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0);
+
+ // Loading from a directory that does not contain a snapshot must report
failure rather than
+ // silently falling back to an empty region and reporting success.
Callers (the AddPeer flow
+ // and the Ratis snapshot-install path) rely on this success/failure
contract.
+ File missingSnapshotDir =
+ new File(config.getSchemaDir() + File.separator +
"non-existent-snapshot");
+ Assert.assertFalse(missingSnapshotDir.exists());
+
+ Assert.assertFalse(schemaRegion.loadSnapshot(missingSnapshotDir));
+ } finally {
+
config.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass);
+ }
+ }
+
@Test
@Ignore
public void testSnapshotPerformance() throws Exception {