This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 80b4340e0d4 IGNITE-27600 Sync deployment units on start (#7545)
80b4340e0d4 is described below
commit 80b4340e0d4d0a13dc3d71c0bc7aacde9a328f60
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Mar 24 05:59:34 2026 +0300
IGNITE-27600 Sync deployment units on start (#7545)
---
.../deployment/ItDeploymentUnitFailoverTest.java | 55 ++++++++-
.../internal/deployunit/DeploymentManagerImpl.java | 2 +-
.../internal/deployunit/StaticUnitDeployer.java | 136 ++++++++++++++++-----
.../ignite/internal/deployunit/UnitDownloader.java | 1 +
...serverTest.java => StaticUnitDeployerTest.java} | 42 +++++--
5 files changed, 191 insertions(+), 45 deletions(-)
diff --git
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
index 4097aa50001..27fe8899f6f 100644
---
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
+++
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
@@ -21,15 +21,18 @@ import static
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.deleteIfExists;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import java.nio.file.Path;
import java.util.List;
import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.deployunit.IgniteDeployment;
import org.apache.ignite.internal.deployunit.NodesToDeploy;
+import
org.apache.ignite.internal.deployunit.configuration.DeploymentExtensionConfiguration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -91,7 +94,55 @@ public class ItDeploymentUnitFailoverTest extends
ClusterPerTestIntegrationTest
assertThat(unit.undeployAsync(), willCompleteSuccessfully());
- IgniteImpl cmgNode = unwrapIgniteImpl(startNode(nodeIndex));
- unit.waitUnitClean(cmgNode);
+ IgniteImpl restartedNode = unwrapIgniteImpl(startNode(nodeIndex));
+ unit.waitUnitClean(restartedNode);
+ }
+
+ @Test
+ void testRecoveryAfterCleanWorkDir() {
+ int nodeIndex = 1;
+ IgniteImpl node = igniteImpl(nodeIndex);
+
+ String id = "id1";
+ Version version = Version.parseVersion("1.0.0");
+
+ // Deploy a unit to the cluster including the target node
+ Unit unit = files.deployAndVerify(
+ id,
+ version,
+ false,
+ List.of(files.smallFile()),
+ new NodesToDeploy(List.of(node.name())),
+ igniteImpl(0)
+ );
+
+ // Wait for deployment on target node
+ await().until(() -> node.deployment().nodeStatusAsync(id, version),
willBe(DEPLOYED));
+
+ // Verify unit files exist on the target node
+ unit.waitUnitReplica(node);
+
+ // Get the deployment directory path before stopping the node
+ Path deploymentDir = getDeploymentDirectory(node);
+
+ // Stop the target node
+ stopNode(nodeIndex);
+
+ // Clear the deployment directory to simulate a clean restart
+ deleteIfExists(deploymentDir);
+
+ // Start the node again
+ IgniteImpl restartedNode = unwrapIgniteImpl(startNode(nodeIndex));
+
+ // Verify the unit is recovered from another node
+ unit.waitUnitReplica(restartedNode);
+ }
+
+ private static Path getDeploymentDirectory(IgniteImpl ignite) {
+ String deploymentFolder = ignite.nodeConfiguration()
+
.getConfiguration(DeploymentExtensionConfiguration.KEY).deployment()
+ .location().value();
+
+ return ignite.workDir().resolve(deploymentFolder);
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 6dbe43347ee..98ee9322314 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -433,7 +433,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
messaging.subscribe();
failover.registerTopologyChangeCallback(nodeStatusCallback);
undeployer.start(UNDEPLOYER_DELAY.getSeconds(), TimeUnit.SECONDS);
- return new StaticUnitDeployer(deploymentUnitStore, nodeName,
deploymentUnitFolder).searchAndDeployStaticUnits();
+ return new StaticUnitDeployer(deploymentUnitStore, nodeName,
deploymentUnitFolder).syncDeployedUnits();
}
@Override
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StaticUnitDeployer.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StaticUnitDeployer.java
index 387796baaff..9b96b4f1cd4 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StaticUnitDeployer.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StaticUnitDeployer.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.deployunit;
import static java.util.Collections.emptyList;
import static org.apache.ignite.deployment.version.Version.parseVersion;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import java.io.IOException;
import java.nio.file.FileVisitResult;
@@ -36,14 +38,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.ignite.deployment.version.Version;
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.tostring.IgniteToStringInclude;
-import org.apache.ignite.internal.tostring.S;
/**
* Observes a predefined directory with statically provisioned deployment
units and registers their presence in the deployment store.
@@ -75,43 +77,118 @@ public class StaticUnitDeployer {
}
/**
- * Scans the filesystem for statically deployed units and registers their
cluster and node statuses if they are not yet present in the
- * store.
+ * Performs startup deployment unit synchronization.
*
- * <p>Already registered unit versions for this node are skipped. New ones
are registered as DEPLOYED.
+ * <ol>
+ * <li>Scans the filesystem for statically deployed units and
registers them if not yet present in the store.</li>
+ * <li>Checks for units marked as DEPLOYED in metastorage but missing
on disk, and resets them to UPLOADING to trigger on-demand
+ * deployment from another node.</li>
+ * </ol>
*/
- public CompletableFuture<Void> searchAndDeployStaticUnits() {
- StaticUnits allUnits = collectStaticUnits();
+ CompletableFuture<Void> syncDeployedUnits() {
+ StaticUnits unitsOnDisk = collectStaticUnits();
return
deploymentUnitStore.getNodeStatuses(nodeName).thenCompose(statuses -> {
- List<CompletableFuture<?>> futures = new ArrayList<>();
+ List<String> recoveredUnits = new ArrayList<>();
+ List<CompletableFuture<Boolean>> recoveredUnitFutures = new
ArrayList<>();
+ // Process existing statuses: filter out from static units and
recover missing units
for (UnitNodeStatus status : statuses) {
- allUnits.filter(status.id(), status.version());
+ String id = status.id();
+ Version version = status.version();
+
+ unitsOnDisk.filter(id, version);
+
+ if (status.status() == DEPLOYED) {
+ Path unitPath =
deploymentUnitsRoot.resolve(id).resolve(version.render());
+ if (Files.notExists(unitPath)) {
+ recoveredUnits.add(id + ":" + version);
+ recoveredUnitFutures.add(checkAndRecoverUnit(status));
+ }
+ }
}
- LOG.info("Start processing static deployment units {}", allUnits);
- allUnits.forEach((id, version) -> {
- CompletableFuture<Boolean> future =
deploymentUnitStore.createClusterStatus(id, version, Set.of(nodeName))
- .thenCompose(status -> {
- if (status == null) {
- return
deploymentUnitStore.getClusterStatus(id, version).thenCompose(it ->
-
deploymentUnitStore.createNodeStatus(nodeName, id, version, it.opId(), DEPLOYED)
- );
- } else {
- return
deploymentUnitStore.createNodeStatus(nodeName, id, version, status.opId(),
DEPLOYED);
- }
- });
- futures.add(future);
- });
- return allOf(futures).whenComplete((unused, t) -> {
- if (!futures.isEmpty()) {
- LOG.info("Finished static units deploy {}", t, allUnits);
+ LOG.info("Start processing static deployment units {}",
unitsOnDisk);
+ List<CompletableFuture<Boolean>> staticUnits = new ArrayList<>();
+ unitsOnDisk.forEach((id, version) ->
staticUnits.add(deployStaticUnit(id, version)));
+
+ return allOf(List.of(
+ process(
+ recoveredUnitFutures,
+ "Finished recovered units deploy {}",
+ "Units recovery failed {}",
+ recoveredUnits::toString
+ ),
+ process(
+ staticUnits,
+ "Finished static units deploy {}",
+ "Static deployment failed {}",
+ unitsOnDisk::toString
+ )
+ ));
+ });
+ }
+
+ private static CompletableFuture<Void> process(
+ List<CompletableFuture<Boolean>> futures,
+ String onFinishMessage,
+ String onErrorMessage,
+ Supplier<String> infoSupplier
+ ) {
+ return allOf(futures).handle((unused, throwable) -> {
+ if (!futures.isEmpty()) {
+ if (throwable == null) {
+ LOG.info(onFinishMessage, infoSupplier.get());
+ } else {
+ LOG.error(onErrorMessage, throwable, infoSupplier.get());
}
- });
+ }
+ return null;
});
}
+ private CompletableFuture<Boolean> deployStaticUnit(String id, Version
version) {
+ return deploymentUnitStore.createClusterStatus(id, version,
Set.of(nodeName))
+ .thenCompose(status -> {
+ if (status == null) {
+ return deploymentUnitStore.getClusterStatus(id,
version).thenCompose(it ->
+ deploymentUnitStore.createNodeStatus(nodeName,
id, version, it.opId(), DEPLOYED)
+ );
+ } else {
+ return deploymentUnitStore.createNodeStatus(nodeName,
id, version, status.opId(), DEPLOYED);
+ }
+ });
+ }
+
+ private CompletableFuture<Boolean> checkAndRecoverUnit(UnitNodeStatus
status) {
+ String id = status.id();
+ Version version = status.version();
+
+ LOG.info("Unit {}:{} is in DEPLOYED state but files are missing,
resetting to UPLOADING", id, version);
+
+ // Delete the old status and recreate with UPLOADING state, since
updateNodeStatus doesn't allow backward transitions
+ return deploymentUnitStore.removeNodeStatus(nodeName, id, version,
status.opId())
+ .thenCompose(removed -> {
+ if (removed) {
+ return deploymentUnitStore.getClusterStatus(id,
version)
+ .thenCompose(clusterStatus -> {
+ if (clusterStatus == null) {
+ LOG.warn("Cluster status not found for
{}:{}, skipping recovery", id, version);
+ return falseCompletedFuture();
+ }
+ if (clusterStatus.status() != DEPLOYED &&
clusterStatus.status() != UPLOADING) {
+ LOG.warn("Cluster status is {} for
{}:{}, skipping recovery", clusterStatus.status(), id, version);
+ return falseCompletedFuture();
+ }
+ return
deploymentUnitStore.createNodeStatus(nodeName, id, version,
clusterStatus.opId(),
+ UPLOADING);
+ });
+ }
+ LOG.warn("Unit status {}:{} was not removed, skipping
recovery", id, version);
+ return falseCompletedFuture();
+ });
+ }
+
private StaticUnits collectStaticUnits() {
StaticUnits units = new StaticUnits();
List<Path> unitFolders = allSubdirectories(deploymentUnitsRoot);
@@ -151,7 +228,6 @@ public class StaticUnitDeployer {
}
private static class StaticUnits {
- @IgniteToStringInclude
private final Map<String, Set<Version>> units = new HashMap<>();
void filter(String id, Version version) {
@@ -175,7 +251,9 @@ public class StaticUnitDeployer {
@Override
public String toString() {
- return S.toString(this);
+ return units.entrySet().stream()
+ .flatMap(e -> e.getValue().stream().map(version ->
e.getKey() + ":" + version))
+ .collect(Collectors.joining(", "));
}
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
index 012728c89f9..9fe8e7fb994 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
@@ -103,6 +103,7 @@ class UnitDownloader {
}
private CompletableFuture<Boolean> downloadUnitContent(String id, Version
version, Collection<String> nodes) {
+ LOG.info("Downloading unit {}:{} from nodes {}", id, version, nodes);
return messaging.downloadUnitContent(id, version, nodes)
.thenCompose(content -> {
DeploymentUnit unit = toDeploymentUnit(content);
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserverTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticUnitDeployerTest.java
similarity index 77%
rename from
modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserverTest.java
rename to
modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticUnitDeployerTest.java
index d9bf5bffb28..34d0b1f5627 100644
---
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserverTest.java
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticUnitDeployerTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.deployunit;
import static org.apache.ignite.deployment.version.Version.parseVersion;
import static
org.apache.ignite.internal.deployment.UnitStatusMatchers.deploymentStatusIs;
import static
org.apache.ignite.internal.deployment.UnitStatusMatchers.versionIs;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
import static
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.create;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -31,34 +32,29 @@ import static org.hamcrest.Matchers.notNullValue;
import java.io.IOException;
import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl;
import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
import org.apache.ignite.internal.manager.ComponentContext;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
/**
* Test suite for {@link StaticUnitDeployer}.
*/
-@ExtendWith(WorkDirectoryExtension.class)
-public class StaticDeploymentUnitObserverTest extends BaseIgniteAbstractTest {
- private StaticUnitDeployer observer;
+public class StaticUnitDeployerTest extends IgniteAbstractTest {
+ private StaticUnitDeployer deployer;
private DeploymentUnitStore deploymentUnitStore;
- @WorkDirectory
- private Path workDir;
private StandaloneMetaStorageManager metastore;
@BeforeEach
@@ -67,7 +63,7 @@ public class StaticDeploymentUnitObserverTest extends
BaseIgniteAbstractTest {
assertThat(metastore.startAsync(new ComponentContext()),
willCompleteSuccessfully());
deploymentUnitStore = new DeploymentUnitStoreImpl(metastore);
- this.observer = new StaticUnitDeployer(deploymentUnitStore, "node1",
workDir);
+ this.deployer = new StaticUnitDeployer(deploymentUnitStore, "node1",
workDir);
}
@AfterEach
@@ -83,7 +79,7 @@ public class StaticDeploymentUnitObserverTest extends
BaseIgniteAbstractTest {
Files.createDirectories(workDir.resolve("unit2").resolve("1.0.0"));
Files.createDirectories(workDir.resolve("unit3").resolve("1.1.0"));
- assertThat(observer.searchAndDeployStaticUnits(),
willCompleteSuccessfully());
+ assertThat(deployer.syncDeployedUnits(), willCompleteSuccessfully());
assertThat(
deploymentUnitStore.getNodeStatuses("node1", "unit1"),
@@ -120,7 +116,7 @@ public class StaticDeploymentUnitObserverTest extends
BaseIgniteAbstractTest {
Files.createDirectories(workDir.resolve("unit1").resolve("1.0.1"));
Files.createDirectories(workDir.resolve("unit2").resolve("1.0.0"));
- assertThat(observer.searchAndDeployStaticUnits(),
willCompleteSuccessfully());
+ assertThat(deployer.syncDeployedUnits(), willCompleteSuccessfully());
assertThat(
deploymentUnitStore.getNodeStatuses("node1", "unit1"),
@@ -131,4 +127,24 @@ public class StaticDeploymentUnitObserverTest extends
BaseIgniteAbstractTest {
assertThat(deploymentUnitStore.getClusterStatus("unit1",
parseVersion("1.0.0")), willBe(deploymentStatusIs(UPLOADING)));
assertThat(deploymentUnitStore.getNodeStatus("node1", "unit1",
parseVersion("1.0.0")), willBe(deploymentStatusIs(UPLOADING)));
}
+
+ @Test
+ void recoverMissingUnits() {
+ String id = "unit1";
+ Version version = parseVersion("1.0.0");
+
+ // Imitate completely deployed unit
+ CompletableFuture<UnitClusterStatus> clusterStatus =
deploymentUnitStore.createClusterStatus(id, version, Set.of("node1"));
+ assertThat(clusterStatus, willBe(notNullValue()));
+ UUID opId = clusterStatus.join().opId();
+ assertThat(deploymentUnitStore.updateClusterStatus(id, version,
DEPLOYED), willBe(true));
+ assertThat(deploymentUnitStore.createNodeStatus("node1", id, version,
opId, DEPLOYED), willBe(true));
+
+ // Sync units
+ assertThat(deployer.syncDeployedUnits(), willCompleteSuccessfully());
+
+ // Verify that the node status has changed
+ assertThat(deploymentUnitStore.getClusterStatus(id, version),
willBe(deploymentStatusIs(DEPLOYED)));
+ assertThat(deploymentUnitStore.getNodeStatus("node1", id, version),
willBe(deploymentStatusIs(UPLOADING)));
+ }
}