This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch rc/2.0.10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1c692299ad0caff0bdad14138bed3a2c616d6081 Author: Yongzao <[email protected]> AuthorDate: Tue Jun 16 11:30:46 2026 +0800 Propagate snapshot load failure during IoTConsensus AddPeer (#17935) --- .../statemachine/ConfigRegionStateMachine.java | 10 +- .../persistence/executor/ConfigPlanExecutor.java | 7 +- .../org/apache/iotdb/consensus/IStateMachine.java | 5 +- .../consensus/iot/IoTConsensusServerImpl.java | 31 +- .../service/IoTConsensusRPCServiceProcessor.java | 12 +- .../ratis/ApplicationStateMachineProxy.java | 13 +- .../simple/SimpleConsensusServerImpl.java | 4 +- .../apache/iotdb/consensus/EmptyStateMachine.java | 4 +- .../iot/AddPeerSnapshotLoadFailureTest.java | 354 +++++++++++++++++++++ .../iotdb/consensus/iot/util/TestStateMachine.java | 4 +- .../apache/iotdb/consensus/ratis/TestUtils.java | 4 +- .../consensus/simple/SimpleConsensusTest.java | 4 +- .../dataregion/DataRegionStateMachine.java | 6 +- .../schemaregion/SchemaRegionStateMachine.java | 25 +- .../schemaengine/schemaregion/ISchemaRegion.java | 10 +- .../schemaregion/impl/SchemaRegionMemoryImpl.java | 6 +- .../schemaregion/impl/SchemaRegionPBTreeImpl.java | 6 +- .../schemaRegion/SchemaRegionManagementTest.java | 28 +- 18 files changed, 493 insertions(+), 40 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index b3029e66028..ad0a82bcf56 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -246,9 +246,14 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev } @Override - public void loadSnapshot(final File latestSnapshotRootDir) { + public boolean loadSnapshot(final File latestSnapshotRootDir) { + // The boolean result must reflect whether the ConfigRegion state-machine data was loaded, so + // callers (e.g. the AddPeer flow) can detect a real failure. The pipe-listener recomputation + // below is best-effort post-processing: a failure there is logged but must NOT be reported as a + // snapshot-load failure, otherwise it would (e.g.) abort ConfigNode (re)initialization on what + // is actually a healthy data load. + final boolean loadSucceeded = executor.loadSnapshot(latestSnapshotRootDir); try { - executor.loadSnapshot(latestSnapshotRootDir); // We recompute the snapshot for pipe listener when loading snapshot // to recover the newest snapshot in cache PipeConfigNodeAgent.runtime() @@ -261,6 +266,7 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev e); } } + return loadSucceeded; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index 1f2729bff39..e37c3ae9960 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -767,12 +767,12 @@ public class ConfigPlanExecutor { return result.get(); } - public void loadSnapshot(final File latestSnapshotRootDir) { + public boolean loadSnapshot(final File latestSnapshotRootDir) { if (!latestSnapshotRootDir.exists()) { LOGGER.error( ConfigNodeMessages.SNAPSHOT_DIRECTORY_IS_NOT_EXIST_CAN_NOT_LOAD_SNAPSHOT_WITH, latestSnapshotRootDir.getAbsolutePath()); - return; + return false; } final AtomicBoolean result = new AtomicBoolean(true); @@ -800,6 +800,9 @@ public class ConfigPlanExecutor { ConfigNodeMessages.CONFIGNODESNAPSHOT_LOAD_SNAPSHOT_SUCCESS_LATESTSNAPSHOTROOTDIR, latestSnapshotRootDir); } + // Propagate any snapshot-load failure so callers (e.g. the AddPeer flow) do not treat a + // partially or wholly failed load as success. + return result.get(); } private DataSet getSchemaNodeManagementPartition(ConfigPhysicalPlan req) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java index 3354c83699b..c7705d93896 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java @@ -112,8 +112,11 @@ public interface IStateMachine { * Load the latest snapshot from given dir. * * @param latestSnapshotRootDir dir where the latest snapshot sits + * @return {@code true} if the snapshot was loaded successfully, {@code false} otherwise. Callers + * (e.g. the IoTConsensus AddPeer flow) rely on this to avoid activating a new peer whose + * snapshot failed to load, which would otherwise silently lose data. */ - void loadSnapshot(File latestSnapshotRootDir); + boolean loadSnapshot(File latestSnapshotRootDir); /** * given a snapshot dir, ask statemachine to provide all snapshot files. By default, it will list diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 33ff3a4c273..7f28d24cd53 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -447,14 +447,29 @@ public class IoTConsensusServerImpl { } } - public void loadSnapshot(String snapshotId) { - // TODO: (xingtanzjr) throw exception if the snapshot load failed - recvFolderManager - .getFolders() - .forEach( - dir -> { - stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId)); - }); + public boolean loadSnapshot(String snapshotId) { + // Snapshot fragments are spread across the receive folders by the FolderManager (a DataRegion, + // for example, uses one receive folder per local data dir), so a given snapshot only exists + // under the folders that actually received fragments. Load from those folders and skip the + // others; otherwise the state machine would fail on a folder that never received this snapshot + // and turn a healthy multi-data-dir transfer into a spurious failure. + // + // Note: an empty region produces a snapshot with zero fragments, so none of the receive folders + // contains it. That is a legitimate (no-op) load, not a failure, so an absent snapshot must not + // be reported as failure here. + for (String dir : recvFolderManager.getFolders()) { + File snapshotDir = getSnapshotPath(dir, snapshotId); + if (!snapshotDir.exists()) { + continue; + } + if (!stateMachine.loadSnapshot(snapshotDir)) { + // Stop at the first failure. The snapshot is already broken on this replica, and loading + // the remaining folders is both pointless and harmful: a load wipes the data dirs before + // relinking. Report the failure so the AddPeer coordinator does not activate this peer. + return false; + } + } + return true; } private File getSnapshotPath(String curStorageDir, String snapshotRelativePath) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index c5babea464d..fe804e9d7d0 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -312,7 +312,17 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Ifa status.setMessage(message); return new TTriggerSnapshotLoadRes(status); } - impl.loadSnapshot(req.snapshotId); + if (!impl.loadSnapshot(req.snapshotId)) { + String message = + String.format( + "Failed to load snapshot %s for consensus group %s", req.snapshotId, groupId); + LOGGER.error(message); + // Surface a region-migration-specific code (the snapshot load runs as part of the AddPeer + // flow) rather than a generic internal error, so the coordinator's failure is meaningful. + TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); + status.setMessage(message); + return new TTriggerSnapshotLoadRes(status); + } KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION); return new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java index 1134d8fd6f2..3e2451e65dd 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java @@ -98,7 +98,7 @@ public class ApplicationStateMachineProxy extends BaseStateMachine { } @Override - public void reinitialize() { + public void reinitialize() throws IOException { setLastAppliedTermIndex(null); loadSnapshot(snapshotStorage.findLatestSnapshotDir()); if (getLifeCycleState() == LifeCycle.State.PAUSED) { @@ -261,14 +261,21 @@ public class ApplicationStateMachineProxy extends BaseStateMachine { } } - private void loadSnapshot(File latestSnapshotDir) { + private void loadSnapshot(File latestSnapshotDir) throws IOException { snapshotStorage.updateSnapshotCache(); if (latestSnapshotDir == null) { return; } // require the application statemachine to load the latest snapshot - applicationStateMachine.loadSnapshot(latestSnapshotDir); + if (!applicationStateMachine.loadSnapshot(latestSnapshotDir)) { + // The application state machine rejected this snapshot. Do not advance lastAppliedTermIndex: + // claiming the snapshot as applied would let Ratis proceed as if it were installed and run on + // incomplete data (silent data loss). Fail (re)initialization instead so the snapshot install + // is treated as failed and can be retried. + throw new IOException( + String.format("%s: failed to load snapshot from %s", this, latestSnapshotDir)); + } TermIndex snapshotTermIndex = Utils.getTermIndexFromDir(latestSnapshotDir); updateLastAppliedTermIndex(snapshotTermIndex.getTerm(), snapshotTermIndex.getIndex()); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java index 4ed4a7b41ce..b76bcdeaaeb 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java @@ -89,7 +89,7 @@ public class SimpleConsensusServerImpl implements IStateMachine { } @Override - public synchronized void loadSnapshot(File latestSnapshotRootDir) { - stateMachine.loadSnapshot(latestSnapshotRootDir); + public synchronized boolean loadSnapshot(File latestSnapshotRootDir) { + return stateMachine.loadSnapshot(latestSnapshotRootDir); } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java index aafa0be5bb5..997120b00f9 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java @@ -54,5 +54,7 @@ public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi } @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public boolean loadSnapshot(File latestSnapshotRootDir) { + return true; + } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java new file mode 100644 index 00000000000..64c81a84b58 --- /dev/null +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.iot; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.consensus.config.ConsensusConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.iot.util.TestEntry; +import org.apache.iotdb.consensus.iot.util.TestStateMachine; + +import org.apache.ratis.util.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Regression test for the snapshot-load-failure bug (a failed snapshot load on the AddPeer target + * was silently swallowed, so the new peer was activated and the migration falsely reported + * successful, losing data on the new replica). + * + * <p>It builds a real two-node IoTConsensus group, forces the target peer's {@link + * org.apache.iotdb.consensus.IStateMachine#loadSnapshot} to fail, and verifies that {@code + * addRemotePeer}: + * + * <ul> + * <li>actually reaches the snapshot-load step (so the failure is the one under test, not an + * earlier step), + * <li>fails with a {@link ConsensusException} instead of silently succeeding, + * <li>does not leave the target peer active with an incompletely-loaded snapshot. + * </ul> + */ +public class AddPeerSnapshotLoadFailureTest { + + private final Logger logger = LoggerFactory.getLogger(AddPeerSnapshotLoadFailureTest.class); + + private final ConsensusGroupId gid = new DataRegionId(1); + + private final int basePort = 9200; + + private final List<Peer> peers = + Arrays.asList( + new Peer(gid, 1, new TEndPoint("127.0.0.1", basePort - 1)), + new Peer(gid, 2, new TEndPoint("127.0.0.1", basePort))); + + private final List<File> peersStorage = + Arrays.asList( + new File("target" + File.separator + "snapshot-load-fail-1"), + new File("target" + File.separator + "snapshot-load-fail-2")); + + private final List<List<String>> peersRecvSnapshotDirs = + Arrays.asList( + Arrays.asList( + "target" + File.separator + "snapshot-load-fail-1-recv-1", + "target" + File.separator + "snapshot-load-fail-1-recv-2"), + Arrays.asList( + "target" + File.separator + "snapshot-load-fail-2-recv-1", + "target" + File.separator + "snapshot-load-fail-2-recv-2")); + + private final List<IoTConsensus> servers = new ArrayList<>(); + private final List<ControllableStateMachine> stateMachines = new ArrayList<>(); + + /** + * A {@link TestStateMachine} that takes a real (single-file) snapshot and whose snapshot load can + * be made to fail on demand. When not forced to fail, {@link #loadSnapshot} mirrors the real + * {@code SnapshotLoader} contract by reporting failure when the snapshot root does not exist — + * which is exactly what happens for the receive folders that never got a fragment of a snapshot + * in a multi-data-dir deployment. + */ + private static class ControllableStateMachine extends TestStateMachine { + private volatile boolean failLoadSnapshot = false; + private volatile boolean loadSnapshotInvoked = false; + private volatile boolean emptySnapshot = false; + + void setFailLoadSnapshot(boolean failLoadSnapshot) { + this.failLoadSnapshot = failLoadSnapshot; + } + + void setEmptySnapshot(boolean emptySnapshot) { + this.emptySnapshot = emptySnapshot; + } + + boolean isLoadSnapshotInvoked() { + return loadSnapshotInvoked; + } + + @Override + public boolean loadSnapshot(File latestSnapshotRootDir) { + loadSnapshotInvoked = true; + if (failLoadSnapshot) { + return false; + } + // Mirror SnapshotLoader: a receive folder that never received a fragment of this snapshot has + // no snapshot root, so loading from it must report failure. + return latestSnapshotRootDir.exists(); + } + + @Override + public boolean takeSnapshot(File snapshotDir) { + if (emptySnapshot) { + // Mirror an empty region: the snapshot has no fragments to transmit. The receiver then + // materializes no snapshot under any receive folder, which must still load successfully. + return true; + } + // Write a real (single) snapshot file so the transfer actually moves data and the receiver + // materializes the snapshot under a subset of its receive folders. + try { + Files.write( + new File(snapshotDir, "snapshot.data").toPath(), + "snapshot".getBytes(StandardCharsets.UTF_8)); + return true; + } catch (IOException e) { + return false; + } + } + + // TestStateMachine does not implement clearSnapshot (the IStateMachine default throws). The + // AddPeer flow calls it in a finally block to clean up the local snapshot, so we provide a + // no-op here; otherwise that cleanup would mask the ConsensusException we are asserting on. + @Override + public boolean clearSnapshot() { + return true; + } + } + + @Before + public void setUp() throws Exception { + for (File file : peersStorage) { + file.mkdirs(); + stateMachines.add(new ControllableStateMachine()); + } + peersRecvSnapshotDirs.forEach(innerList -> innerList.forEach(dir -> new File(dir).mkdirs())); + initServer(); + } + + @After + public void tearDown() throws Exception { + servers.parallelStream().forEach(IoTConsensus::stop); + servers.clear(); + for (File file : peersStorage) { + FileUtils.deleteFully(file); + } + peersRecvSnapshotDirs.forEach( + innerList -> + innerList.forEach( + dir -> { + try { + FileUtils.deleteFully(new File(dir)); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + + private void initServer() throws IOException { + Assume.assumeTrue(checkPortAvailable()); + try { + for (int i = 0; i < peers.size(); i++) { + int finalI = i; + servers.add( + (IoTConsensus) + ConsensusFactory.getConsensusImpl( + ConsensusFactory.IOT_CONSENSUS, + ConsensusConfig.newBuilder() + .setThisNodeId(peers.get(i).getNodeId()) + .setThisNode(peers.get(i).getEndpoint()) + .setStorageDir(peersStorage.get(i).getAbsolutePath()) + .setRecvSnapshotDirs(peersRecvSnapshotDirs.get(i)) + .setConsensusGroupType(TConsensusGroupType.DataRegion) + .build(), + groupId -> stateMachines.get(finalI)) + .orElseThrow( + () -> + new IllegalArgumentException( + String.format( + ConsensusFactory.CONSTRUCT_FAILED_MSG, + ConsensusFactory.IOT_CONSENSUS)))); + } + for (int i = 0; i < peers.size(); i++) { + servers.get(i).start(); + } + } catch (IOException e) { + if (e.getCause() instanceof StartupException) { + // just succeed when can not bind socket + logger.info("Can not start IoTConsensus because", e); + Assume.assumeTrue(false); + } else { + logger.error("Failed because", e); + Assert.fail("Failed because " + e.getMessage()); + } + } + } + + @Test + public void addRemotePeerMustFailWhenTargetSnapshotLoadFails() throws Exception { + // node 0 is the sole initial member; node 1 will be added as a new peer. Mirroring the real + // region-migration flow, the destination peer (node 1) is pre-created locally with the full + // target peer list (IoTConsensus, unlike Ratis, requires a non-empty peer list here). + servers.get(0).createLocalPeer(gid, peers.subList(0, 1)); + servers.get(1).createLocalPeer(gid, peers); + + // Put some data into the group so the snapshot transfer is meaningful. + for (int i = 0; i < 10; i++) { + servers.get(0).write(gid, new TestEntry(i, peers.get(0))); + } + + // Force the target peer (node 1) to fail loading the transferred snapshot. + stateMachines.get(1).setFailLoadSnapshot(true); + + // Before the fix, addRemotePeer swallowed the load failure and returned normally, leaving the + // target peer active with incomplete data. It must now surface the failure. + Assert.assertThrows( + ConsensusException.class, () -> servers.get(0).addRemotePeer(gid, peers.get(1))); + + // The failure must be the snapshot load itself, i.e. the AddPeer flow actually reached the + // load step on the target rather than aborting earlier. + Assert.assertTrue( + "Target peer's loadSnapshot was never invoked; the failure came from an earlier step", + stateMachines.get(1).isLoadSnapshotInvoked()); + + // The target peer must not be left active with an incompletely-loaded snapshot. + Assert.assertFalse( + "Target peer was activated despite a failed snapshot load", + servers.get(1).getImpl(gid).isActive()); + } + + /** + * A target peer configures one receive folder per local data dir, and snapshot fragments are + * spread across those folders by the FolderManager, so a small snapshot only materializes under a + * subset of them. A healthy transfer must therefore still succeed even though some receive + * folders never received any fragment of the snapshot (regression for the multi-data-dir + * false-failure reported on the load-failure-propagation PR). + */ + @Test + public void addRemotePeerSucceedsWhenSnapshotSpansSubsetOfRecvDirs() throws Exception { + servers.get(0).createLocalPeer(gid, peers.subList(0, 1)); + servers.get(1).createLocalPeer(gid, peers); + + for (int i = 0; i < 10; i++) { + servers.get(0).write(gid, new TestEntry(i, peers.get(0))); + } + + // Do NOT force a load failure: this is a healthy transfer. The (small) snapshot lands in only a + // subset of the target's receive folders, so the others legitimately have no snapshot root. + // Before the fix, loadSnapshot() loaded from every receive folder and treated the missing ones + // as failures, turning this healthy multi-data-dir transfer into a spurious failure. + servers.get(0).addRemotePeer(gid, peers.get(1)); + + Assert.assertTrue( + "Target peer's loadSnapshot was never invoked", + stateMachines.get(1).isLoadSnapshotInvoked()); + Assert.assertTrue( + "Target peer was not activated after a successful snapshot load", + servers.get(1).getImpl(gid).isActive()); + + // Sanity-check that the test actually exercised a partial spread: at least one of the target's + // receive folders must hold no snapshot at all, otherwise the skipped-folder path is untested. + long emptyRecvFolders = + peersRecvSnapshotDirs.get(1).stream() + .map(dir -> new File(dir, IoTConsensusServerImpl.SNAPSHOT_DIR_NAME)) + .filter(recvFolder -> isEmptyOrMissing(recvFolder)) + .count(); + Assert.assertTrue( + "Expected at least one receive folder without the snapshot, but every folder had it", + emptyRecvFolders > 0); + } + + private static boolean isEmptyOrMissing(File dir) { + String[] children = dir.list(); + return children == null || children.length == 0; + } + + /** + * An empty region produces a snapshot with zero fragments, so nothing is transmitted and the + * target materializes no snapshot under any receive folder. This must still be treated as a + * successful (no-op) load — otherwise migrating an empty region (e.g. while removing a DataNode) + * fails. Regression for the multi-data-dir AddPeer false-failure on empty regions. + */ + @Test + public void addRemotePeerSucceedsWhenSnapshotIsEmpty() throws Exception { + servers.get(0).createLocalPeer(gid, peers.subList(0, 1)); + servers.get(1).createLocalPeer(gid, peers); + + // No data is written, and the source takes an empty snapshot: zero fragments are transmitted. + stateMachines.get(0).setEmptySnapshot(true); + + // Before the fix, loadSnapshot() reported failure when no receive folder contained the + // snapshot, so adding a peer for an empty region failed and DataNode removal timed out. + servers.get(0).addRemotePeer(gid, peers.get(1)); + + Assert.assertTrue( + "Target peer was not activated after an empty snapshot load", + servers.get(1).getImpl(gid).isActive()); + + // Confirm the test really exercised the empty-snapshot path: no receive folder holds a + // snapshot. + long nonEmptyRecvFolders = + peersRecvSnapshotDirs.get(1).stream() + .map(dir -> new File(dir, IoTConsensusServerImpl.SNAPSHOT_DIR_NAME)) + .filter(recvFolder -> !isEmptyOrMissing(recvFolder)) + .count(); + Assert.assertEquals( + "Expected no receive folder to contain a snapshot for an empty region", + 0, + nonEmptyRecvFolders); + } + + private boolean checkPortAvailable() { + for (Peer peer : this.peers) { + try (ServerSocket ignored = new ServerSocket(peer.getEndpoint().port)) { + logger.info("check port {} success for node {}", peer.getEndpoint().port, peer.getNodeId()); + } catch (IOException e) { + logger.error("check port {} failed for node {}", peer.getEndpoint().port, peer.getNodeId()); + return false; + } + } + return true; + } +} diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java index 54acdec30bb..2a2adb25461 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java @@ -136,5 +136,7 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi { } @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public boolean loadSnapshot(File latestSnapshotRootDir) { + return true; + } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java index 69725d68be6..b0d0ef14ee7 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java @@ -171,13 +171,15 @@ public class TestUtils { } @Override - public void loadSnapshot(File latestSnapshotRootDir) { + public boolean loadSnapshot(File latestSnapshotRootDir) { File snapshot = new File(latestSnapshotRootDir.getAbsolutePath() + File.separator + "snapshot"); try (Scanner scanner = new Scanner(snapshot)) { integer.set(Integer.parseInt(scanner.next())); + return true; } catch (FileNotFoundException e) { logger.error("cannot find snapshot file {}", snapshot); + return false; } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java index d50ab992f6f..ff062fb63a8 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java @@ -123,7 +123,9 @@ public class SimpleConsensusTest { } @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public boolean loadSnapshot(File latestSnapshotRootDir) { + return true; + } } @Before diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java index 5c2c5207db6..0946756aaba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java @@ -124,7 +124,7 @@ public class DataRegionStateMachine extends BaseStateMachine { } @Override - public void loadSnapshot(File latestSnapshotRootDir) { + public boolean loadSnapshot(File latestSnapshotRootDir) { String databaseName = region.getDatabaseName(); String dataRegionIdString = region.getDataRegionIdString(); DataRegionId regionId = new DataRegionId(Integer.parseInt(dataRegionIdString)); @@ -141,14 +141,16 @@ public class DataRegionStateMachine extends BaseStateMachine { .loadSnapshotForStateMachine()); if (newRegion == null) { logger.error(DataNodeMiscMessages.FAIL_LOAD_SNAPSHOT, latestSnapshotRootDir); - return; + return false; } this.region = newRegion; ChunkCache.getInstance().clear(); TimeSeriesMetadataCache.getInstance().clear(); BloomFilterCache.getInstance().clear(); + return true; } catch (Exception e) { logger.error(DataNodeMiscMessages.EXCEPTION_REPLACING_DATA_REGION, e); + return false; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java index edab5862d11..2f5ae61e85e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java @@ -136,14 +136,23 @@ public class SchemaRegionStateMachine extends BaseStateMachine { } @Override - public void loadSnapshot(final File latestSnapshotRootDir) { - schemaRegion.loadSnapshot(latestSnapshotRootDir); - PipeDataNodeAgent.runtime() - .schemaListener(schemaRegion.getSchemaRegionId()) - .loadSnapshot(latestSnapshotRootDir); - // We recompute the snapshot for pipe listener when loading snapshot - // to recover the newest snapshot in cache - listen2Snapshot4PipeListener(false); + public boolean loadSnapshot(final File latestSnapshotRootDir) { + try { + // The boolean result must reflect whether the schema-region data was actually loaded, so + // callers (e.g. the AddPeer flow and the Ratis snapshot-install path) can detect a real + // failure instead of treating a fallback-to-empty load as success. + final boolean loadSucceeded = schemaRegion.loadSnapshot(latestSnapshotRootDir); + PipeDataNodeAgent.runtime() + .schemaListener(schemaRegion.getSchemaRegionId()) + .loadSnapshot(latestSnapshotRootDir); + // We recompute the snapshot for pipe listener when loading snapshot + // to recover the newest snapshot in cache + listen2Snapshot4PipeListener(false); + return loadSucceeded; + } catch (Exception e) { + logger.error("Failed to load snapshot from {}", latestSnapshotRootDir, e); + return false; + } } public void listen2Snapshot4PipeListener(final boolean isTmp) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java index 8d87435e108..540d5d4274c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java @@ -115,7 +115,15 @@ public interface ISchemaRegion { boolean createSnapshot(final File snapshotDir); - void loadSnapshot(final File latestSnapshotRootDir); + /** + * Load the latest schema snapshot from the given dir. + * + * @return {@code true} if the snapshot was loaded successfully, {@code false} if loading failed + * (in which case the region falls back to an empty/re-initialized state). Callers rely on + * this to honor the {@link org.apache.iotdb.consensus.IStateMachine#loadSnapshot} + * success/failure contract (e.g. the AddPeer flow and Ratis snapshot install). + */ + boolean loadSnapshot(final File latestSnapshotRootDir); // endregion diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index 0394a81680f..1f2408cca12 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -556,7 +556,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { // currently, this method is only used for cluster-ratis mode @Override - public void loadSnapshot(final File latestSnapshotRootDir) { + public boolean loadSnapshot(final File latestSnapshotRootDir) { clear(); logger.info(DataNodeSchemaMessages.START_LOADING_SNAPSHOT, schemaRegionId); @@ -651,6 +651,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { schemaRegionId, System.currentTimeMillis() - startTime); logger.info(DataNodeSchemaMessages.SUCCESSFULLY_LOAD_SNAPSHOT, schemaRegionId); + return true; } catch (final Exception e) { logger.error( DataNodeSchemaMessages.FAILED_TO_LOAD_SNAPSHOT, schemaRegionId, e.getMessage(), e); @@ -662,6 +663,9 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { logger.error( DataNodeSchemaMessages.ERROR_DURING_INIT_SCHEMA_REGION, schemaRegionId, exception); } + // The snapshot was not loaded (the region fell back to an empty re-initialized state). Report + // the failure so callers honoring the loadSnapshot success/failure contract can react. + return false; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java index df5ad41cff6..8d87ef3f061 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java @@ -534,7 +534,7 @@ public class SchemaRegionPBTreeImpl implements ISchemaRegion { // currently, this method is only used for cluster-ratis mode @Override - public void loadSnapshot(File latestSnapshotRootDir) { + public boolean loadSnapshot(File latestSnapshotRootDir) { clear(); logger.info(DataNodeSchemaMessages.START_LOADING_SNAPSHOT, schemaRegionId); @@ -579,6 +579,7 @@ public class SchemaRegionPBTreeImpl implements ISchemaRegion { schemaRegionId, System.currentTimeMillis() - startTime); logger.info(DataNodeSchemaMessages.SUCCESSFULLY_LOAD_SNAPSHOT, schemaRegionId); + return true; } catch (IOException | MetadataException e) { logger.error( DataNodeSchemaMessages.FAILED_TO_LOAD_SNAPSHOT, schemaRegionId, e.getMessage(), e); @@ -592,6 +593,9 @@ public class SchemaRegionPBTreeImpl implements ISchemaRegion { schemaRegionId, metadataException); } + // The snapshot was not loaded (the region fell back to an empty re-initialized state). Report + // the failure so callers honoring the loadSnapshot success/failure contract can react. + return false; } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java index 38f56b66dd9..0c0b3a5b953 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionManagementTest.java @@ -96,7 +96,7 @@ public class SchemaRegionManagementTest extends AbstractSchemaRegionTest { snapshotDir.mkdir(); schemaRegion.createSnapshot(snapshotDir); - schemaRegion.loadSnapshot(snapshotDir); + Assert.assertTrue(schemaRegion.loadSnapshot(snapshotDir)); List<ITimeSeriesSchemaInfo> result = SchemaRegionTestUtil.showTimeseries( @@ -112,7 +112,7 @@ public class SchemaRegionManagementTest extends AbstractSchemaRegionTest { simulateRestart(); ISchemaRegion newSchemaRegion = getSchemaRegion("root.sg", 0); - newSchemaRegion.loadSnapshot(snapshotDir); + Assert.assertTrue(newSchemaRegion.loadSnapshot(snapshotDir)); result = SchemaRegionTestUtil.showTimeseries( newSchemaRegion, new PartialPath("root.sg.**"), false, "tag-key", "tag-value"); @@ -171,7 +171,7 @@ public class SchemaRegionManagementTest extends AbstractSchemaRegionTest { snapshotDir.mkdir(); schemaRegion.createSnapshot(snapshotDir); - schemaRegion.loadSnapshot(snapshotDir); + Assert.assertTrue(schemaRegion.loadSnapshot(snapshotDir)); List<ITimeSeriesSchemaInfo> result = SchemaRegionTestUtil.showTimeseries( @@ -182,7 +182,7 @@ public class SchemaRegionManagementTest extends AbstractSchemaRegionTest { simulateRestart(); ISchemaRegion newSchemaRegion = getSchemaRegion("root.sg", 0); - newSchemaRegion.loadSnapshot(snapshotDir); + Assert.assertTrue(newSchemaRegion.loadSnapshot(snapshotDir)); result = SchemaRegionTestUtil.showTimeseries( newSchemaRegion, new PartialPath("root.sg.**"), false, "tag-key", "tag-value"); @@ -193,6 +193,26 @@ public class SchemaRegionManagementTest extends AbstractSchemaRegionTest { } } + @Test + public void testLoadSnapshotReportsFailureWhenSnapshotIsMissing() throws Exception { + String schemaRegionConsensusProtocolClass = config.getSchemaRegionConsensusProtocolClass(); + config.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS); + try { + ISchemaRegion schemaRegion = getSchemaRegion("root.sg", 0); + + // Loading from a directory that does not contain a snapshot must report failure rather than + // silently falling back to an empty region and reporting success. Callers (the AddPeer flow + // and the Ratis snapshot-install path) rely on this success/failure contract. + File missingSnapshotDir = + new File(config.getSchemaDir() + File.separator + "non-existent-snapshot"); + Assert.assertFalse(missingSnapshotDir.exists()); + + Assert.assertFalse(schemaRegion.loadSnapshot(missingSnapshotDir)); + } finally { + config.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass); + } + } + @Test @Ignore public void testSnapshotPerformance() throws Exception {
