This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 80b4340e0d4 IGNITE-27600 Sync deployment units on start (#7545)
80b4340e0d4 is described below

commit 80b4340e0d4d0a13dc3d71c0bc7aacde9a328f60
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Mar 24 05:59:34 2026 +0300

    IGNITE-27600 Sync deployment units on start (#7545)
---
 .../deployment/ItDeploymentUnitFailoverTest.java   |  55 ++++++++-
 .../internal/deployunit/DeploymentManagerImpl.java |   2 +-
 .../internal/deployunit/StaticUnitDeployer.java    | 136 ++++++++++++++++-----
 .../ignite/internal/deployunit/UnitDownloader.java |   1 +
 ...serverTest.java => StaticUnitDeployerTest.java} |  42 +++++--
 5 files changed, 191 insertions(+), 45 deletions(-)

diff --git 
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
 
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
index 4097aa50001..27fe8899f6f 100644
--- 
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
+++ 
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
@@ -21,15 +21,18 @@ import static 
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.deleteIfExists;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+import java.nio.file.Path;
 import java.util.List;
 import org.apache.ignite.deployment.version.Version;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.deployunit.IgniteDeployment;
 import org.apache.ignite.internal.deployunit.NodesToDeploy;
+import 
org.apache.ignite.internal.deployunit.configuration.DeploymentExtensionConfiguration;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -91,7 +94,55 @@ public class ItDeploymentUnitFailoverTest extends 
ClusterPerTestIntegrationTest
 
         assertThat(unit.undeployAsync(), willCompleteSuccessfully());
 
-        IgniteImpl cmgNode = unwrapIgniteImpl(startNode(nodeIndex));
-        unit.waitUnitClean(cmgNode);
+        IgniteImpl restartedNode = unwrapIgniteImpl(startNode(nodeIndex));
+        unit.waitUnitClean(restartedNode);
+    }
+
+    @Test
+    void testRecoveryAfterCleanWorkDir() {
+        int nodeIndex = 1;
+        IgniteImpl node = igniteImpl(nodeIndex);
+
+        String id = "id1";
+        Version version = Version.parseVersion("1.0.0");
+
+        // Deploy a unit to the cluster including the target node
+        Unit unit = files.deployAndVerify(
+                id,
+                version,
+                false,
+                List.of(files.smallFile()),
+                new NodesToDeploy(List.of(node.name())),
+                igniteImpl(0)
+        );
+
+        // Wait for deployment on target node
+        await().until(() -> node.deployment().nodeStatusAsync(id, version), 
willBe(DEPLOYED));
+
+        // Verify unit files exist on the target node
+        unit.waitUnitReplica(node);
+
+        // Get the deployment directory path before stopping the node
+        Path deploymentDir = getDeploymentDirectory(node);
+
+        // Stop the target node
+        stopNode(nodeIndex);
+
+        // Clear the deployment directory to simulate a clean restart
+        deleteIfExists(deploymentDir);
+
+        // Start the node again
+        IgniteImpl restartedNode = unwrapIgniteImpl(startNode(nodeIndex));
+
+        // Verify the unit is recovered from another node
+        unit.waitUnitReplica(restartedNode);
+    }
+
+    private static Path getDeploymentDirectory(IgniteImpl ignite) {
+        String deploymentFolder = ignite.nodeConfiguration()
+                
.getConfiguration(DeploymentExtensionConfiguration.KEY).deployment()
+                .location().value();
+
+        return ignite.workDir().resolve(deploymentFolder);
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 6dbe43347ee..98ee9322314 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -433,7 +433,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
         messaging.subscribe();
         failover.registerTopologyChangeCallback(nodeStatusCallback);
         undeployer.start(UNDEPLOYER_DELAY.getSeconds(), TimeUnit.SECONDS);
-        return new StaticUnitDeployer(deploymentUnitStore, nodeName, 
deploymentUnitFolder).searchAndDeployStaticUnits();
+        return new StaticUnitDeployer(deploymentUnitStore, nodeName, 
deploymentUnitFolder).syncDeployedUnits();
     }
 
     @Override
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StaticUnitDeployer.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StaticUnitDeployer.java
index 387796baaff..9b96b4f1cd4 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StaticUnitDeployer.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StaticUnitDeployer.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.deployunit;
 import static java.util.Collections.emptyList;
 import static org.apache.ignite.deployment.version.Version.parseVersion;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
 import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 
 import java.io.IOException;
 import java.nio.file.FileVisitResult;
@@ -36,14 +38,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.ignite.deployment.version.Version;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
 import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.tostring.IgniteToStringInclude;
-import org.apache.ignite.internal.tostring.S;
 
 /**
  * Observes a predefined directory with statically provisioned deployment 
units and registers their presence in the deployment store.
@@ -75,43 +77,118 @@ public class StaticUnitDeployer {
     }
 
     /**
-     * Scans the filesystem for statically deployed units and registers their 
cluster and node statuses if they are not yet present in the
-     * store.
+     * Performs startup deployment unit synchronization.
      *
-     * <p>Already registered unit versions for this node are skipped. New ones 
are registered as DEPLOYED.
+     * <ol>
+     *     <li>Scans the filesystem for statically deployed units and 
registers them if not yet present in the store.</li>
+     *     <li>Checks for units marked as DEPLOYED in metastorage but missing 
on disk, and resets them to UPLOADING to trigger on-demand
+     *     deployment from another node.</li>
+     * </ol>
      */
-    public CompletableFuture<Void> searchAndDeployStaticUnits() {
-        StaticUnits allUnits = collectStaticUnits();
+    CompletableFuture<Void> syncDeployedUnits() {
+        StaticUnits unitsOnDisk = collectStaticUnits();
 
         return 
deploymentUnitStore.getNodeStatuses(nodeName).thenCompose(statuses -> {
-            List<CompletableFuture<?>> futures = new ArrayList<>();
+            List<String> recoveredUnits = new ArrayList<>();
+            List<CompletableFuture<Boolean>> recoveredUnitFutures = new 
ArrayList<>();
 
+            // Process existing statuses: filter out from static units and 
recover missing units
             for (UnitNodeStatus status : statuses) {
-                allUnits.filter(status.id(), status.version());
+                String id = status.id();
+                Version version = status.version();
+
+                unitsOnDisk.filter(id, version);
+
+                if (status.status() == DEPLOYED) {
+                    Path unitPath = 
deploymentUnitsRoot.resolve(id).resolve(version.render());
+                    if (Files.notExists(unitPath)) {
+                        recoveredUnits.add(id + ":" + version);
+                        recoveredUnitFutures.add(checkAndRecoverUnit(status));
+                    }
+                }
             }
-            LOG.info("Start processing static deployment units {}", allUnits);
-            allUnits.forEach((id, version) -> {
-                CompletableFuture<Boolean> future = 
deploymentUnitStore.createClusterStatus(id, version, Set.of(nodeName))
-                        .thenCompose(status -> {
-                            if (status == null) {
-                                return 
deploymentUnitStore.getClusterStatus(id, version).thenCompose(it ->
-                                        
deploymentUnitStore.createNodeStatus(nodeName, id, version, it.opId(), DEPLOYED)
-                                );
-                            } else {
-                                return 
deploymentUnitStore.createNodeStatus(nodeName, id, version, status.opId(), 
DEPLOYED);
-                            }
-                        });
-                futures.add(future);
-            });
 
-            return allOf(futures).whenComplete((unused, t) -> {
-                if (!futures.isEmpty()) {
-                    LOG.info("Finished static units deploy {}", t, allUnits);
+            LOG.info("Start processing static deployment units {}", 
unitsOnDisk);
+            List<CompletableFuture<Boolean>> staticUnits = new ArrayList<>();
+            unitsOnDisk.forEach((id, version) -> 
staticUnits.add(deployStaticUnit(id, version)));
+
+            return allOf(List.of(
+                    process(
+                            recoveredUnitFutures,
+                            "Finished recovered units deploy {}",
+                            "Units recovery failed {}",
+                            recoveredUnits::toString
+                    ),
+                    process(
+                            staticUnits,
+                            "Finished static units deploy {}",
+                            "Static deployment failed {}",
+                            unitsOnDisk::toString
+                    )
+            ));
+        });
+    }
+
+    private static CompletableFuture<Void> process(
+            List<CompletableFuture<Boolean>> futures,
+            String onFinishMessage,
+            String onErrorMessage,
+            Supplier<String> infoSupplier
+    ) {
+        return allOf(futures).handle((unused, throwable) -> {
+            if (!futures.isEmpty()) {
+                if (throwable == null) {
+                    LOG.info(onFinishMessage, infoSupplier.get());
+                } else {
+                    LOG.error(onErrorMessage, throwable, infoSupplier.get());
                 }
-            });
+            }
+            return null;
         });
     }
 
+    private CompletableFuture<Boolean> deployStaticUnit(String id, Version 
version) {
+        return deploymentUnitStore.createClusterStatus(id, version, 
Set.of(nodeName))
+                .thenCompose(status -> {
+                    if (status == null) {
+                        return deploymentUnitStore.getClusterStatus(id, 
version).thenCompose(it ->
+                                deploymentUnitStore.createNodeStatus(nodeName, 
id, version, it.opId(), DEPLOYED)
+                        );
+                    } else {
+                        return deploymentUnitStore.createNodeStatus(nodeName, 
id, version, status.opId(), DEPLOYED);
+                    }
+                });
+    }
+
+    private CompletableFuture<Boolean> checkAndRecoverUnit(UnitNodeStatus 
status) {
+        String id = status.id();
+        Version version = status.version();
+
+        LOG.info("Unit {}:{} is in DEPLOYED state but files are missing, 
resetting to UPLOADING", id, version);
+
+        // Delete the old status and recreate with UPLOADING state, since 
updateNodeStatus doesn't allow backward transitions
+        return deploymentUnitStore.removeNodeStatus(nodeName, id, version, 
status.opId())
+                .thenCompose(removed -> {
+                    if (removed) {
+                        return deploymentUnitStore.getClusterStatus(id, 
version)
+                                .thenCompose(clusterStatus -> {
+                                    if (clusterStatus == null) {
+                                        LOG.warn("Cluster status not found for 
{}:{}, skipping recovery", id, version);
+                                        return falseCompletedFuture();
+                                    }
+                                    if (clusterStatus.status() != DEPLOYED && 
clusterStatus.status() != UPLOADING) {
+                                        LOG.warn("Cluster status is {} for 
{}:{}, skipping recovery", clusterStatus.status(), id, version);
+                                        return falseCompletedFuture();
+                                    }
+                                    return 
deploymentUnitStore.createNodeStatus(nodeName, id, version, 
clusterStatus.opId(),
+                                            UPLOADING);
+                                });
+                    }
+                    LOG.warn("Unit status {}:{} was not removed, skipping 
recovery", id, version);
+                    return falseCompletedFuture();
+                });
+    }
+
     private StaticUnits collectStaticUnits() {
         StaticUnits units = new StaticUnits();
         List<Path> unitFolders = allSubdirectories(deploymentUnitsRoot);
@@ -151,7 +228,6 @@ public class StaticUnitDeployer {
     }
 
     private static class StaticUnits {
-        @IgniteToStringInclude
         private final Map<String, Set<Version>> units = new HashMap<>();
 
         void filter(String id, Version version) {
@@ -175,7 +251,9 @@ public class StaticUnitDeployer {
 
         @Override
         public String toString() {
-            return S.toString(this);
+            return units.entrySet().stream()
+                    .flatMap(e -> e.getValue().stream().map(version -> 
e.getKey() + ":" + version))
+                    .collect(Collectors.joining(", "));
         }
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
index 012728c89f9..9fe8e7fb994 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
@@ -103,6 +103,7 @@ class UnitDownloader {
     }
 
     private CompletableFuture<Boolean> downloadUnitContent(String id, Version 
version, Collection<String> nodes) {
+        LOG.info("Downloading unit {}:{} from nodes {}", id, version, nodes);
         return messaging.downloadUnitContent(id, version, nodes)
                 .thenCompose(content -> {
                     DeploymentUnit unit = toDeploymentUnit(content);
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserverTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticUnitDeployerTest.java
similarity index 77%
rename from 
modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserverTest.java
rename to 
modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticUnitDeployerTest.java
index d9bf5bffb28..34d0b1f5627 100644
--- 
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserverTest.java
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticUnitDeployerTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.deployunit;
 import static org.apache.ignite.deployment.version.Version.parseVersion;
 import static 
org.apache.ignite.internal.deployment.UnitStatusMatchers.deploymentStatusIs;
 import static 
org.apache.ignite.internal.deployment.UnitStatusMatchers.versionIs;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
 import static 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.create;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -31,34 +32,29 @@ import static org.hamcrest.Matchers.notNullValue;
 
 import java.io.IOException;
 import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.deployment.version.Version;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl;
 import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
 import org.apache.ignite.internal.manager.ComponentContext;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Test suite for {@link StaticUnitDeployer}.
  */
-@ExtendWith(WorkDirectoryExtension.class)
-public class StaticDeploymentUnitObserverTest extends BaseIgniteAbstractTest {
-    private StaticUnitDeployer observer;
+public class StaticUnitDeployerTest extends IgniteAbstractTest {
+    private StaticUnitDeployer deployer;
 
     private DeploymentUnitStore deploymentUnitStore;
 
-    @WorkDirectory
-    private Path workDir;
     private StandaloneMetaStorageManager metastore;
 
     @BeforeEach
@@ -67,7 +63,7 @@ public class StaticDeploymentUnitObserverTest extends 
BaseIgniteAbstractTest {
         assertThat(metastore.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
         deploymentUnitStore = new DeploymentUnitStoreImpl(metastore);
 
-        this.observer = new StaticUnitDeployer(deploymentUnitStore, "node1", 
workDir);
+        this.deployer = new StaticUnitDeployer(deploymentUnitStore, "node1", 
workDir);
     }
 
     @AfterEach
@@ -83,7 +79,7 @@ public class StaticDeploymentUnitObserverTest extends 
BaseIgniteAbstractTest {
         Files.createDirectories(workDir.resolve("unit2").resolve("1.0.0"));
         Files.createDirectories(workDir.resolve("unit3").resolve("1.1.0"));
 
-        assertThat(observer.searchAndDeployStaticUnits(), 
willCompleteSuccessfully());
+        assertThat(deployer.syncDeployedUnits(), willCompleteSuccessfully());
 
         assertThat(
                 deploymentUnitStore.getNodeStatuses("node1", "unit1"),
@@ -120,7 +116,7 @@ public class StaticDeploymentUnitObserverTest extends 
BaseIgniteAbstractTest {
         Files.createDirectories(workDir.resolve("unit1").resolve("1.0.1"));
         Files.createDirectories(workDir.resolve("unit2").resolve("1.0.0"));
 
-        assertThat(observer.searchAndDeployStaticUnits(), 
willCompleteSuccessfully());
+        assertThat(deployer.syncDeployedUnits(), willCompleteSuccessfully());
 
         assertThat(
                 deploymentUnitStore.getNodeStatuses("node1", "unit1"),
@@ -131,4 +127,24 @@ public class StaticDeploymentUnitObserverTest extends 
BaseIgniteAbstractTest {
         assertThat(deploymentUnitStore.getClusterStatus("unit1", 
parseVersion("1.0.0")), willBe(deploymentStatusIs(UPLOADING)));
         assertThat(deploymentUnitStore.getNodeStatus("node1", "unit1", 
parseVersion("1.0.0")), willBe(deploymentStatusIs(UPLOADING)));
     }
+
+    @Test
+    void recoverMissingUnits() {
+        String id = "unit1";
+        Version version = parseVersion("1.0.0");
+
+        // Imitate completely deployed unit
+        CompletableFuture<UnitClusterStatus> clusterStatus = 
deploymentUnitStore.createClusterStatus(id, version, Set.of("node1"));
+        assertThat(clusterStatus, willBe(notNullValue()));
+        UUID opId = clusterStatus.join().opId();
+        assertThat(deploymentUnitStore.updateClusterStatus(id, version, 
DEPLOYED), willBe(true));
+        assertThat(deploymentUnitStore.createNodeStatus("node1", id, version, 
opId, DEPLOYED), willBe(true));
+
+        // Sync units
+        assertThat(deployer.syncDeployedUnits(), willCompleteSuccessfully());
+
+        // Verify that the node status has changed
+        assertThat(deploymentUnitStore.getClusterStatus(id, version), 
willBe(deploymentStatusIs(DEPLOYED)));
+        assertThat(deploymentUnitStore.getNodeStatus("node1", id, version), 
willBe(deploymentStatusIs(UPLOADING)));
+    }
 }

Reply via email to