This is an automated email from the ASF dual-hosted git repository. mpochatkin pushed a commit to branch IGNITE-25508 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit c4ca32683ddec180d7c3776b5404429db5a22150 Author: Mikhail Pochatkin <[email protected]> AuthorDate: Thu Sep 4 20:28:28 2025 +0300 IGNITE-25508 Static unit deployment --- .../ignite/internal/deployment/DeployFiles.java | 10 ++ .../deployment/ItStaticDeploymentTest.java | 69 ++++++++ .../internal/deployunit/DeploymentManagerImpl.java | 6 +- .../deployunit/StaticDeploymentUnitObserver.java | 136 +++++++++++++++ .../deployunit/metastore/DeploymentUnitStore.java | 7 +- .../metastore/DeploymentUnitStoreImpl.java | 4 +- .../StaticDeploymentUnitObserverTest.java | 77 +++++++++ .../deployunit/StubDeploymentUnitStore.java | 186 +++++++++++++++++++++ .../java/org/apache/ignite/internal/Cluster.java | 5 + 9 files changed, 493 insertions(+), 7 deletions(-) diff --git a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java index 13b5c6a8817..b10b78db2f9 100644 --- a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java +++ b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java @@ -162,6 +162,16 @@ class DeployFiles { return builder.build(); } + public static void staticDeploy(String id, Version version, DeployFile file, Path workDir) throws IOException { + Path deploymentRootFolder = workDir.resolve("deployment"); + Path unitFile = deploymentRootFolder.resolve(id).resolve(version.render()).resolve(file.file().getFileName().toString()); + Files.createDirectories(unitFile.getParent()); + Files.copy( + file.file(), + unitFile + ); + } + private static DeploymentUnit fromPaths(List<Path> paths) { Objects.requireNonNull(paths); Map<String, InputStream> map = new HashMap<>(); diff --git a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItStaticDeploymentTest.java b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItStaticDeploymentTest.java new file mode 100644 index 00000000000..f79a2bbdb4f --- /dev/null +++ b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItStaticDeploymentTest.java @@ -0,0 +1,69 @@ +package org.apache.ignite.internal.deployment; + +import static org.apache.ignite.deployment.version.Version.parseVersion; +import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.Ignite; +import org.apache.ignite.deployment.version.Version; +import org.apache.ignite.internal.Cluster; +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.deployunit.DeploymentStatus; +import org.apache.ignite.internal.deployunit.IgniteDeployment; +import org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher; +import org.hamcrest.MatcherAssert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class ItStaticDeploymentTest extends ClusterPerClassIntegrationTest { + private DeployFiles files; + + @BeforeEach + public void generateDummy() { + files = new DeployFiles(WORK_DIR); + } + + @Override + protected boolean needInitializeCluster() { + return false; + } + + @Test + public void testStaticDeploy() throws IOException { + DeployFile smallFile = files.smallFile(); + DeployFile mediumFile = files.mediumFile(); + + Path node0WorkDir = CLUSTER.nodeWorkDir(0); + Path node1WorkDir = CLUSTER.nodeWorkDir(1); + Path node2WorkDir = CLUSTER.nodeWorkDir(2); + + DeployFiles.staticDeploy("unit1", parseVersion("1.0.0"), smallFile, node0WorkDir); + + DeployFiles.staticDeploy("unit2", parseVersion("1.0.0"), smallFile, node0WorkDir); + DeployFiles.staticDeploy("unit2", parseVersion("1.0.0"), smallFile, node1WorkDir); + + DeployFiles.staticDeploy("unit3", parseVersion("1.0.0"), smallFile, node0WorkDir); + DeployFiles.staticDeploy("unit3", parseVersion("1.0.0"), smallFile, node1WorkDir); + DeployFiles.staticDeploy("unit3", parseVersion("1.0.0"), smallFile, node2WorkDir); + + DeployFiles.staticDeploy("unit1", parseVersion("1.1.0"), smallFile, node0WorkDir); + DeployFiles.staticDeploy("unit1", parseVersion("1.1.0"), smallFile, node1WorkDir); + + DeployFiles.staticDeploy("unit3", parseVersion("1.0.1"), smallFile, node2WorkDir); + + CLUSTER.startAndInit(3); + + IgniteDeployment node0 = unwrapIgniteImpl(CLUSTER.node(0)).deployment(); + IgniteDeployment node1 = unwrapIgniteImpl(CLUSTER.node(1)).deployment(); + IgniteDeployment node2 = unwrapIgniteImpl(CLUSTER.node(2)).deployment(); + + CompletableFuture<DeploymentStatus> unit1 = node0.nodeStatusAsync("unit1", parseVersion("1.0.0")); + + assertThat(unit1, CompletableFutureMatcher.willBe(DeploymentStatus.DEPLOYED)); + } +} diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java index 7aefafd2dc7..647cbefb086 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java @@ -411,14 +411,14 @@ public class DeploymentManagerImpl implements IgniteDeployment { @Override public CompletableFuture<Void> startAsync(ComponentContext componentContext) { - deployer.initUnitsFolder(workDir.resolve(configuration.location().value())); + Path deploymentUnitFolder = workDir.resolve(configuration.location().value()); + deployer.initUnitsFolder(deploymentUnitFolder); deploymentUnitStore.registerNodeStatusListener(nodeStatusWatchListener); deploymentUnitStore.registerClusterStatusListener(clusterStatusWatchListener); messaging.subscribe(); failover.registerTopologyChangeCallback(nodeStatusCallback, clusterEventCallback); undeployer.start(UNDEPLOYER_DELAY.getSeconds(), TimeUnit.SECONDS); - - return nullCompletedFuture(); + return new StaticDeploymentUnitObserver(deploymentUnitStore, nodeName, deploymentUnitFolder).observeAndRegisterStaticUnits(); } @Override diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserver.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserver.java new file mode 100644 index 00000000000..a12efeb62d4 --- /dev/null +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserver.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.deployunit; + +import static java.util.UUID.randomUUID; +import static java.util.stream.Collectors.toMap; +import static org.apache.ignite.deployment.version.Version.parseVersion; +import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED; +import static org.apache.ignite.internal.util.CompletableFutures.allOf; + +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.ignite.deployment.version.Version; +import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore; +import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; + +public class StaticDeploymentUnitObserver { + private static final IgniteLogger LOG = Loggers.forClass(StaticDeploymentUnitObserver.class); + + private final DeploymentUnitStore deploymentUnitStore; + + private final String nodeName; + + private final Path deploymentUnitsRoot; + + public StaticDeploymentUnitObserver( + DeploymentUnitStore deploymentUnitStore, + String nodeName, + Path deploymentUnitsRoot + ) { + this.deploymentUnitStore = deploymentUnitStore; + this.nodeName = nodeName; + this.deploymentUnitsRoot = deploymentUnitsRoot; + } + + public CompletableFuture<Void> observeAndRegisterStaticUnits() { + Map<String, List<Version>> staticUnits = collectStaticUnits(); + + + + return deploymentUnitStore.getNodeStatuses(nodeName).thenCompose(statuses -> { + List<CompletableFuture<?>> futures = new ArrayList<>(); + + for (UnitNodeStatus status : statuses) { + staticUnits.get(status.id()).remove(status.version()); + if (staticUnits.get(status.id()).isEmpty()) { + staticUnits.remove(status.id()); + } + } + + staticUnits.forEach((id, versions) -> { + versions.forEach(version -> { + LOG.info("Start processing unit {}:{}", id, version); + CompletableFuture<Boolean> future = deploymentUnitStore.createClusterStatus(id, version, Set.of(nodeName)) + .thenCompose(status -> { + if (status == null) { + return deploymentUnitStore.getClusterStatus(id, version).thenCompose(it -> + deploymentUnitStore.createNodeStatus(nodeName, id, version, it.opId(), DEPLOYED) + ); + } else { + return deploymentUnitStore.createNodeStatus(nodeName, id, version, status.opId(), DEPLOYED); + } + }) + .whenComplete((result, t) -> LOG.info("Finished static status creating {}:{} with result {}", t, id, version, result)); + futures.add(future); + }); + }); + + return allOf(futures); + }); + + + } + + private Map<String, List<Version>> collectStaticUnits() { + Map<String, List<Version>> units = new HashMap<>(); + List<Path> unitFolders = allSubdirectories(deploymentUnitsRoot); + for (Path unitFolder : unitFolders) { + List<Path> versions = allSubdirectories(unitFolder); + units.put( + unitFolder.getFileName().toString(), + versions.stream() + .map(versionFolder -> parseVersion(versionFolder.getFileName().toString())) + .collect(Collectors.toList()) + ); + } + return units; + } + + private static List<Path> allSubdirectories(Path folder) { + List<Path> subfolders = new ArrayList<>(); + try { + Files.walkFileTree(folder, Set.of(), 2, new SimpleFileVisitor<>() { + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { + if (!dir.equals(folder)) { + subfolders.add(dir); + } + return super.preVisitDirectory(dir, attrs); + } + }); + } catch (IOException e) { + + } + return subfolders; + } +} diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java index 8b7ba33561b..8b2760dcbc2 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java @@ -25,6 +25,7 @@ import org.apache.ignite.deployment.version.Version; import org.apache.ignite.internal.deployunit.DeploymentStatus; import org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus; import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus; +import org.jetbrains.annotations.Nullable; /** * Metastore for deployment units. @@ -114,10 +115,10 @@ public interface DeploymentUnitStore { * * @param id Deployment unit identifier. * @param version Deployment unit version. - * @return Future with {@code true} result if status created successfully - * or with {@code false} if status with provided {@param id} and {@param version} already existed. + * @return Future with cluster status object if status created successfully + * or with {@code null} if status with provided {@param id} and {@param version} already existed. */ - CompletableFuture<UnitClusterStatus> createClusterStatus(String id, Version version, Set<String> nodesToDeploy); + CompletableFuture<@Nullable UnitClusterStatus> createClusterStatus(String id, Version version, Set<String> nodesToDeploy); /** * Create new node status for deployment unit. diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java index 86785358b85..64b1c765252 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java @@ -26,6 +26,7 @@ import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision; import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; import static org.apache.ignite.internal.metastorage.dsl.Operations.remove; +import static org.apache.ignite.internal.metastorage.dsl.Statements.iif; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import java.util.List; @@ -45,6 +46,7 @@ import org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus; import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.jetbrains.annotations.Nullable; /** * Implementation of {@link DeploymentUnitStore} based on {@link MetaStorageManager}. @@ -141,7 +143,7 @@ public class DeploymentUnitStoreImpl implements DeploymentUnitStore { } @Override - public CompletableFuture<UnitClusterStatus> createClusterStatus(String id, Version version, Set<String> nodes) { + public CompletableFuture<@Nullable UnitClusterStatus> createClusterStatus(String id, Version version, Set<String> nodes) { ByteArray key = ClusterStatusKey.builder().id(id).version(version).build().toByteArray(); UUID operationId = UUID.randomUUID(); UnitClusterStatus clusterStatus = new UnitClusterStatus(id, version, UPLOADING, operationId, nodes); diff --git a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserverTest.java b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserverTest.java new file mode 100644 index 00000000000..4b7349a4624 --- /dev/null +++ b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StaticDeploymentUnitObserverTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.deployunit; + +import static org.apache.ignite.deployment.version.Version.parseVersion; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.ignite.deployment.version.Version; +import org.apache.ignite.internal.testframework.WorkDirectory; +import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(WorkDirectoryExtension.class) +public class StaticDeploymentUnitObserverTest { + private StaticDeploymentUnitObserver observer; + + private StubDeploymentUnitStore deploymentUnitStore; + + @WorkDirectory + private Path workDir; + + @BeforeEach + public void setup() { + deploymentUnitStore = new StubDeploymentUnitStore(); + this.observer = new StaticDeploymentUnitObserver(deploymentUnitStore, "node1", workDir); + } + + @Test + public void test() throws IOException { + Files.createDirectories(workDir.resolve("unit1").resolve("1.0.0")); + Files.createDirectories(workDir.resolve("unit1").resolve("1.0.1")); + Files.createDirectories(workDir.resolve("unit1").resolve("1.1.1")); + Files.createDirectories(workDir.resolve("unit2").resolve("1.0.0")); + Files.createDirectories(workDir.resolve("unit3").resolve("1.1.0")); + + assertThat(observer.observeAndRegisterStaticUnits(), willCompleteSuccessfully()); + + CompletableFuture<List<Version>> unit1Statuses = deploymentUnitStore.getNodeStatuses("node1", "unit1") + .thenApply(statuses -> statuses.stream().map(UnitStatus::version).collect(Collectors.toList())); + assertThat(unit1Statuses, willBe(containsInAnyOrder(parseVersion("1.0.0"), parseVersion("1.0.1"), parseVersion("1.1.1")))); + + CompletableFuture<List<Version>> unit2Statuses = deploymentUnitStore.getNodeStatuses("node1", "unit2") + .thenApply(statuses -> statuses.stream().map(UnitStatus::version).collect(Collectors.toList())); + + assertThat(unit2Statuses, willBe(containsInAnyOrder(parseVersion("1.0.0")))); + + CompletableFuture<List<Version>> unit3Statuses = deploymentUnitStore.getNodeStatuses("node1", "unit3") + .thenApply(statuses -> statuses.stream().map(UnitStatus::version).collect(Collectors.toList())); + assertThat(unit3Statuses, willBe(containsInAnyOrder(parseVersion("1.1.0")))); + } +} diff --git a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StubDeploymentUnitStore.java b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StubDeploymentUnitStore.java new file mode 100644 index 00000000000..dd4d83a5355 --- /dev/null +++ b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/StubDeploymentUnitStore.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.deployunit; + +import static java.util.Collections.emptyList; +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.ignite.deployment.version.Version; +import org.apache.ignite.internal.deployunit.metastore.ClusterStatusWatchListener; +import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore; +import org.apache.ignite.internal.deployunit.metastore.NodeStatusWatchListener; +import org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus; +import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus; + +public class StubDeploymentUnitStore implements DeploymentUnitStore { + private final Map<String, Map<Version, UnitClusterStatus>> clusterStore = new HashMap<>(); + + private final Map<String, Map<String, Map<Version, UnitNodeStatus>>> nodeStore = new HashMap<>(); + + @Override + public void registerNodeStatusListener(NodeStatusWatchListener listener) { + + } + + @Override + public void unregisterNodeStatusListener(NodeStatusWatchListener listener) { + + } + + @Override + public void registerClusterStatusListener(ClusterStatusWatchListener listener) { + + } + + @Override + public void unregisterClusterStatusListener(ClusterStatusWatchListener listener) { + + } + + @Override + public CompletableFuture<List<UnitClusterStatus>> getClusterStatuses() { + return null; + } + + @Override + public CompletableFuture<List<UnitClusterStatus>> getClusterStatuses(String id) { + Map<Version, UnitClusterStatus> map = clusterStore.get(id); + List<UnitClusterStatus> result = map == null ? emptyList() : new ArrayList<>(map.values()); + return completedFuture(result); + } + + @Override + public CompletableFuture<UnitClusterStatus> getClusterStatus(String id, Version version) { + Map<Version, UnitClusterStatus> map = clusterStore.get(id); + UnitClusterStatus result = map == null ? null : map.get(version); + return completedFuture(result); + } + + @Override + public CompletableFuture<List<UnitNodeStatus>> getNodeStatuses(String nodeId) { + Map<String, Map<Version, UnitNodeStatus>> nodeContent = nodeStore.get(nodeId); + if (nodeContent == null) { + return completedFuture(emptyList()); + } + List<UnitNodeStatus> result = new ArrayList<>(); + + for (Map<Version, UnitNodeStatus> value : nodeContent.values()) { + result.addAll(value.values()); + } + + return completedFuture(result); + } + + @Override + public CompletableFuture<List<UnitNodeStatus>> getNodeStatuses(String nodeId, String unitId) { + Map<String, Map<Version, UnitNodeStatus>> nodeContent = nodeStore.get(nodeId); + if (nodeContent == null) { + return completedFuture(emptyList()); + } + + Map<Version, UnitNodeStatus> map = nodeContent.get(unitId); + if (map == null) { + return completedFuture(emptyList()); + } + + return completedFuture(new ArrayList<>(map.values())); + + } + + @Override + public CompletableFuture<UnitNodeStatus> getNodeStatus(String nodeId, String id, Version version) { + Map<String, Map<Version, UnitNodeStatus>> nodeContent = nodeStore.get(nodeId); + if (nodeContent == null) { + return completedFuture(null); + } + + Map<Version, UnitNodeStatus> map = nodeContent.get(id); + if (map == null) { + return completedFuture(null); + } + + return completedFuture(map.get(version)); + } + + @Override + public CompletableFuture<UnitClusterStatus> createClusterStatus(String id, Version version, Set<String> nodesToDeploy) { + UnitClusterStatus status = new UnitClusterStatus(id, version, DeploymentStatus.UPLOADING, UUID.randomUUID(), nodesToDeploy); + + Map<Version, UnitClusterStatus> map = clusterStore.computeIfAbsent(id, k -> new HashMap<>()); + if (map.containsKey(version)) { + return completedFuture(null); + } else { + map.put(version, status); + return completedFuture(status); + } + } + + @Override + public CompletableFuture<Boolean> createNodeStatus(String nodeId, String id, Version version, UUID opId, DeploymentStatus status) { + UnitNodeStatus nodeStatus = new UnitNodeStatus(id, version, DeploymentStatus.UPLOADING, UUID.randomUUID(), nodeId); + + Map<String, Map<Version, UnitNodeStatus>> nodesMap = nodeStore.computeIfAbsent(nodeId, k -> new HashMap<>()); + Map<Version, UnitNodeStatus> map = nodesMap.computeIfAbsent(id, k -> new HashMap<>()); + if (map.containsKey(version)) { + return completedFuture(false); + } else { + map.put(version, nodeStatus); + return completedFuture(true); + } + } + + @Override + public CompletableFuture<Boolean> updateClusterStatus(String id, Version version, DeploymentStatus status) { + return null; + } + + @Override + public CompletableFuture<Boolean> updateNodeStatus(String nodeId, String id, Version version, DeploymentStatus status) { + return null; + } + + @Override + public CompletableFuture<List<String>> getAllNodes(String id, Version version) { + return null; + } + + @Override + public CompletableFuture<List<UnitNodeStatus>> getAllNodeStatuses(String id, Version version) { + return null; + } + + @Override + public CompletableFuture<Boolean> removeClusterStatus(String id, Version version, UUID opId) { + return null; + } + + @Override + public CompletableFuture<Boolean> removeNodeStatus(String nodeId, String id, Version version, UUID opId) { + return null; + } +} diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java index 92b58a31fc2..18242d16d66 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java @@ -38,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -360,6 +361,10 @@ public class Cluster { return clusterConfiguration.basePort() + nodeIndex; } + public Path nodeWorkDir(int nodeIndex) { + return clusterConfiguration.workDir().resolve(clusterConfiguration.clusterName()).resolve(nodeName(nodeIndex)); + } + /** * Returns HTTP port by index. */
