This is an automated email from the ASF dual-hosted git repository. apkhmv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 55f33041f8 IGNITE-19926 Close CompletedFileUploads when unit is deployed (#2334) 55f33041f8 is described below commit 55f33041f88422b1f46d4ba999b7bf7d498ae177 Author: Ivan Gagarkin <gagarkin....@gmail.com> AuthorDate: Tue Jul 25 15:10:27 2023 +0400 IGNITE-19926 Close CompletedFileUploads when unit is deployed (#2334) --- .../internal/deployunit/DefaultNodeCallback.java | 16 ++++++++- .../internal/deployunit/DeploymentManagerImpl.java | 41 ++++++++++++---------- .../ignite/internal/deployunit/DeploymentUnit.java | 19 ++++++++-- .../internal/deployunit/FileDeployerService.java | 12 +++---- .../internal/deployunit/IgniteDeployment.java | 4 +-- .../ignite/internal/deployunit/UnitContent.java | 24 +++++++++++-- .../ignite/deployment/FileDeployerServiceTest.java | 41 +++++++++++++++------- .../compute/util/DummyIgniteDeployment.java | 2 +- .../deployment/CompletedFileUploadSubscriber.java | 35 +++++++++++------- .../deployment/DeploymentManagementController.java | 23 +++++++++--- .../internal/compute/ItComputeTestStandalone.java | 2 +- .../ignite/internal/deployment/DeployFiles.java | 14 ++++++-- 12 files changed, 166 insertions(+), 67 deletions(-) diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java index d657227a5e..df7cf0c115 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java @@ -32,11 +32,15 @@ import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore; import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback; import org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus; import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; /** * Default implementation of {@link NodeEventCallback}. */ public class DefaultNodeCallback extends NodeEventCallback { + private static final IgniteLogger LOG = Loggers.forClass(DefaultNodeCallback.class); + private final DeploymentUnitStore deploymentUnitStore; private final DeployMessagingService messaging; @@ -83,7 +87,17 @@ public class DefaultNodeCallback extends NodeEventCallback { public void onUploading(String id, Version version, List<UnitNodeStatus> holders) { tracker.track(id, version, () -> messaging.downloadUnitContent(id, version, new ArrayList<>(getDeployedNodeIds(holders))) - .thenCompose(content -> deployer.deploy(id, version, content)) + .thenCompose(content -> { + org.apache.ignite.internal.deployunit.DeploymentUnit unit = UnitContent.toDeploymentUnit(content); + return deployer.deploy(id, version, unit) + .whenComplete((deployed, err) -> { + try { + unit.close(); + } catch (Exception e) { + LOG.error("Failed to close deployment unit", e); + } + }); + }) .thenApply(deployed -> { if (deployed) { return deploymentUnitStore.updateNodeStatus(nodeName, id, version, DEPLOYED); diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java index 2176715195..bf3a7ee080 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java @@ -24,6 +24,7 @@ import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED; import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE; import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING; import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING; +import static org.apache.ignite.internal.deployunit.UnitContent.toDeploymentUnit; import java.nio.file.Path; import java.time.Duration; @@ -43,7 +44,6 @@ import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder; import org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration; import org.apache.ignite.internal.deployunit.exception.DeploymentUnitAlreadyExistsException; import org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException; -import org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException; import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallback; import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallbackImpl; import org.apache.ignite.internal.deployunit.metastore.ClusterStatusWatchListener; @@ -181,7 +181,7 @@ public class DeploymentManagerImpl implements IgniteDeployment { String id, Version version, boolean force, - CompletableFuture<DeploymentUnit> deploymentUnit, + DeploymentUnit deploymentUnit, NodesToDeploy deployedNodes ) { checkId(id); @@ -199,17 +199,17 @@ public class DeploymentManagerImpl implements IgniteDeployment { String id, Version version, boolean force, - CompletableFuture<DeploymentUnit> unitFuture, + DeploymentUnit deploymentUnit, Set<String> nodesToDeploy ) { return deploymentUnitStore.createClusterStatus(id, version, nodesToDeploy) - .thenCompose(clusterStatus -> unitFuture.thenCompose(deploymentUnit -> { + .thenCompose(clusterStatus -> { if (clusterStatus != null) { return doDeploy(clusterStatus, deploymentUnit, nodesToDeploy); } else { if (force) { return undeployAsync(id, version) - .thenCompose(u -> doDeploy(id, version, false, unitFuture, nodesToDeploy)); + .thenCompose(u -> doDeploy(id, version, false, deploymentUnit, nodesToDeploy)); } LOG.warn("Failed to deploy meta of unit " + id + ":" + version + " to metastore. " + "Already exists."); @@ -217,7 +217,7 @@ public class DeploymentManagerImpl implements IgniteDeployment { new DeploymentUnitAlreadyExistsException(id, "Unit " + id + ":" + version + " already exists")); } - })); + }); } private CompletableFuture<Boolean> doDeploy( @@ -225,14 +225,7 @@ public class DeploymentManagerImpl implements IgniteDeployment { DeploymentUnit deploymentUnit, Set<String> nodesToDeploy ) { - UnitContent unitContent; - try { - unitContent = UnitContent.readContent(deploymentUnit); - } catch (DeploymentUnitReadException e) { - LOG.error("Error reading deployment unit content", e); - return failedFuture(e); - } - return deployToLocalNode(clusterStatus, unitContent) + return deployToLocalNode(clusterStatus, deploymentUnit) .thenApply(completed -> { if (completed) { nodesToDeploy.forEach(node -> { @@ -251,8 +244,8 @@ public class DeploymentManagerImpl implements IgniteDeployment { }); } - private CompletableFuture<Boolean> deployToLocalNode(UnitClusterStatus clusterStatus, UnitContent unitContent) { - return deployer.deploy(clusterStatus.id(), clusterStatus.version(), unitContent) + private CompletableFuture<Boolean> deployToLocalNode(UnitClusterStatus clusterStatus, DeploymentUnit deploymentUnit) { + return deployer.deploy(clusterStatus.id(), clusterStatus.version(), deploymentUnit) .thenCompose(deployed -> { if (deployed) { return deploymentUnitStore.createNodeStatus( @@ -366,8 +359,20 @@ public class DeploymentManagerImpl implements IgniteDeployment { return completedFuture(true); } return messaging.downloadUnitContent(id, version, nodes) - .thenCompose(content -> deploymentUnitStore.getClusterStatus(id, version) - .thenCompose(status -> deployToLocalNode(status, content))); + .thenCompose(content -> { + return deploymentUnitStore.getClusterStatus(id, version) + .thenCompose(status -> { + DeploymentUnit unit = toDeploymentUnit(content); + return deployToLocalNode(status, unit) + .whenComplete((deployed, throwable) -> { + try { + unit.close(); + } catch (Exception e) { + LOG.error("Error closing deployment unit", e); + } + }); + }); + }); }); } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java index 3c4bddd4bf..e2186c2b97 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java @@ -19,16 +19,29 @@ package org.apache.ignite.internal.deployunit; import java.io.InputStream; import java.util.Map; +import org.apache.ignite.internal.util.IgniteUtils; /** * Deployment unit interface. */ -@FunctionalInterface -public interface DeploymentUnit { +public class DeploymentUnit implements AutoCloseable { + private final Map<String, InputStream> content; + + public DeploymentUnit(Map<String, InputStream> content) { + this.content = content; + } + /** * Deployment unit content - a map from file name to input stream. * * @return Deployment unit content. */ - Map<String, InputStream> content(); + public Map<String, InputStream> content() { + return content; + } + + @Override + public void close() throws Exception { + IgniteUtils.closeAll(content.values()); + } } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java index 30d226a849..f2b74ec7c0 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java @@ -19,11 +19,9 @@ package org.apache.ignite.internal.deployunit; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; -import static java.nio.file.StandardOpenOption.CREATE; -import static java.nio.file.StandardOpenOption.SYNC; -import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; import java.io.IOException; +import java.io.InputStream; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; @@ -70,21 +68,21 @@ public class FileDeployerService { * * @param id Deploy unit identifier. * @param version Deploy unit version. - * @param unitContent Map of deploy unit file names to file content. + * @param deploymentUnit Deployment unit. * @return Future with deploy result. */ - public CompletableFuture<Boolean> deploy(String id, Version version, UnitContent unitContent) { + public CompletableFuture<Boolean> deploy(String id, Version version, DeploymentUnit deploymentUnit) { return CompletableFuture.supplyAsync(() -> { try { Path unitFolder = unitPath(id, version); Files.createDirectories(unitFolder); - for (Entry<String, byte[]> entry : unitContent) { + for (Entry<String, InputStream> entry : deploymentUnit.content().entrySet()) { String fileName = entry.getKey(); Path unitPath = unitFolder.resolve(fileName); Path unitPathTmp = unitFolder.resolve(fileName + TMP_SUFFIX); - Files.write(unitPathTmp, entry.getValue(), CREATE, SYNC, TRUNCATE_EXISTING); + Files.copy(entry.getValue(), unitPathTmp, REPLACE_EXISTING); Files.move(unitPathTmp, unitPath, ATOMIC_MOVE, REPLACE_EXISTING); } return true; diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java index 980427f091..95cdb0e216 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java @@ -39,7 +39,7 @@ public interface IgniteDeployment extends IgniteComponent { default CompletableFuture<Boolean> deployAsync( String id, Version version, - CompletableFuture<DeploymentUnit> deploymentUnit, + DeploymentUnit deploymentUnit, NodesToDeploy nodesToDeploy ) { return deployAsync(id, version, false, deploymentUnit, nodesToDeploy); @@ -60,7 +60,7 @@ public interface IgniteDeployment extends IgniteComponent { String id, Version version, boolean force, - CompletableFuture<DeploymentUnit> deploymentUnit, + DeploymentUnit deploymentUnit, NodesToDeploy nodesToDeploy ); diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java index f8aa584940..61001613cc 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java @@ -17,8 +17,11 @@ package org.apache.ignite.internal.deployunit; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -84,13 +87,28 @@ public class UnitContent implements Iterable<Entry<String, byte[]>> { * @return Unit content from provided deployment unit. */ public static UnitContent readContent(DeploymentUnit deploymentUnit) { - return new UnitContent(deploymentUnit.content().entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> { + Map<String, byte[]> map = deploymentUnit.content().entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, entry -> { try { return entry.getValue().readAllBytes(); } catch (IOException e) { throw new DeploymentUnitReadException(e); } - }))); + })); + return new UnitContent(map); + } + + /** + * Convert unit content to {@link DeploymentUnit}. + * + * @param content Unit content. + * @return Deployment unit instance. + */ + public static DeploymentUnit toDeploymentUnit(UnitContent content) { + Map<String, InputStream> files = new HashMap<>(); + content.iterator().forEachRemaining(it -> { + files.put(it.getKey(), new ByteArrayInputStream(it.getValue())); + }); + return new DeploymentUnit(files); } } diff --git a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java index 3b94764ebe..7baab5e26c 100644 --- a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java +++ b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java @@ -22,11 +22,17 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.ignite.internal.deployunit.DeploymentUnit; import org.apache.ignite.internal.deployunit.FileDeployerService; import org.apache.ignite.internal.deployunit.UnitContent; import org.apache.ignite.internal.testframework.WorkDirectory; @@ -63,21 +69,32 @@ public class FileDeployerServiceTest { } @Test - public void test() throws IOException { - CompletableFuture<Boolean> deployed = service.deploy("id", parseVersion("1.0.0"), content()); - assertThat(deployed, willBe(true)); + public void test() throws Exception { + try (DeploymentUnit unit = content()) { + CompletableFuture<Boolean> deployed = service.deploy("id", parseVersion("1.0.0"), unit); + assertThat(deployed, willBe(true)); + } - CompletableFuture<UnitContent> unitContent = service.getUnitContent("id", parseVersion("1.0.0")); - assertThat(unitContent, willBe(equalTo(content()))); + try (DeploymentUnit unit = content()) { + CompletableFuture<UnitContent> unitContent = service.getUnitContent("id", parseVersion("1.0.0")); + assertThat(unitContent, willBe(equalTo(UnitContent.readContent(unit)))); + } } - private UnitContent content() throws IOException { - byte[] content1 = Files.readAllBytes(file1); - byte[] content2 = Files.readAllBytes(file2); - byte[] content3 = Files.readAllBytes(file3); + private DeploymentUnit content() { + Map<String, InputStream> map = Stream.of(file1, file2, file3) + .collect(Collectors.toMap(it -> it.getFileName().toString(), it -> { + try { + byte[] buf = Files.readAllBytes(it); + if (buf.length == 0) { + throw new RuntimeException(new FileNotFoundException(it.toString())); + } + return new ByteArrayInputStream(buf); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); - return new UnitContent(Map.of(file1.getFileName().toString(), content1, - file2.getFileName().toString(), content2, - file3.getFileName().toString(), content3)); + return new DeploymentUnit(map); } } diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java index 408d3da8e4..c60d219f20 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java @@ -50,7 +50,7 @@ public class DummyIgniteDeployment implements IgniteDeployment { String id, Version version, boolean force, - CompletableFuture<DeploymentUnit> deploymentUnit, + DeploymentUnit deploymentUnit, NodesToDeploy nodesToDeploy) { throw new UnsupportedOperationException("Not implemented"); } diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java index 4eb5fba651..e9c6706471 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.deployunit.DeploymentUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -31,22 +33,15 @@ import org.reactivestreams.Subscription; * Implementation of {@link Subscriber} based on {@link CompletedFileUpload} which will collect uploaded files to the * {@link DeploymentUnit}. */ -class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload> { - private final CompletableFuture<DeploymentUnit> result; +class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload>, AutoCloseable { + private static final IgniteLogger LOG = Loggers.forClass(CompletedFileUploadSubscriber.class); + + private final CompletableFuture<DeploymentUnit> result = new CompletableFuture<>(); private final Map<String, InputStream> content = new HashMap<>(); private IOException ex; - /** - * Constructor. - * - * @param result Result future. - */ - CompletedFileUploadSubscriber(CompletableFuture<DeploymentUnit> result) { - this.result = result; - } - @Override public void onSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); @@ -57,6 +52,7 @@ class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload> { try { content.put(item.getFilename(), item.getInputStream()); } catch (IOException e) { + LOG.error("Failed to read file: " + item.getFilename(), e); if (ex != null) { ex.addSuppressed(e); } else { @@ -75,7 +71,22 @@ class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload> { if (ex != null) { result.completeExceptionally(ex); } else { - result.complete(() -> content); + result.complete(new DeploymentUnit(content)); } } + + public CompletableFuture<DeploymentUnit> result() { + return result; + } + + @Override + public void close() throws Exception { + result.thenAccept(it -> { + try { + it.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } } diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java index c3496d1f21..0f0cf3e168 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java @@ -29,10 +29,11 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.ignite.compute.version.Version; -import org.apache.ignite.internal.deployunit.DeploymentUnit; import org.apache.ignite.internal.deployunit.IgniteDeployment; import org.apache.ignite.internal.deployunit.NodesToDeploy; import org.apache.ignite.internal.deployunit.UnitStatuses; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.rest.api.deployment.DeploymentCodeApi; import org.apache.ignite.internal.rest.api.deployment.DeploymentStatus; import org.apache.ignite.internal.rest.api.deployment.InitialDeployMode; @@ -47,6 +48,8 @@ import org.reactivestreams.Publisher; @SuppressWarnings("OptionalContainsCollection") @Controller("/management/v1/deployment") public class DeploymentManagementController implements DeploymentCodeApi { + private static final IgniteLogger LOG = Loggers.forClass(DeploymentManagementController.class); + private final IgniteDeployment deployment; public DeploymentManagementController(IgniteDeployment deployment) { @@ -61,11 +64,23 @@ public class DeploymentManagementController implements DeploymentCodeApi { Optional<InitialDeployMode> deployMode, Optional<List<String>> initialNodes ) { - CompletableFuture<DeploymentUnit> result = new CompletableFuture<>(); - unitContent.subscribe(new CompletedFileUploadSubscriber(result)); + + CompletedFileUploadSubscriber subscriber = new CompletedFileUploadSubscriber(); + unitContent.subscribe(subscriber); + NodesToDeploy nodesToDeploy = initialNodes.map(NodesToDeploy::new) .orElseGet(() -> new NodesToDeploy(fromInitialDeployMode(deployMode))); - return deployment.deployAsync(unitId, Version.parseVersion(unitVersion), result, nodesToDeploy); + + return subscriber.result().thenCompose(content -> { + return deployment.deployAsync(unitId, Version.parseVersion(unitVersion), content, nodesToDeploy); + }).whenComplete((res, throwable) -> { + try { + subscriber.close(); + } catch (Exception e) { + LOG.error("Failed to close subscriber", e); + } + }); + } @Override diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java index d5ad96bb6c..43943b861d 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java @@ -182,7 +182,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest { CompletableFuture<Boolean> deployed = node.deployment().deployAsync( unitId, unitVersion, - CompletableFuture.completedFuture(() -> Map.of(jarName, jarStream)), + new org.apache.ignite.internal.deployunit.DeploymentUnit(Map.of(jarName, jarStream)), new NodesToDeploy(MAJORITY) ); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java index 11bd2d03d0..235be55ae3 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java @@ -116,8 +116,16 @@ class DeployFiles { CompletableFuture<Boolean> deploy; + DeploymentUnit deploymentUnit = fromPaths(paths); deploy = entryNode.deployment() - .deployAsync(id, version, force, fromPaths(paths), nodesToDeploy); + .deployAsync(id, version, force, deploymentUnit, nodesToDeploy) + .whenComplete((res, err) -> { + try { + deploymentUnit.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); assertThat(deploy, willBe(true)); @@ -154,7 +162,7 @@ class DeployFiles { return builder.build(); } - private static CompletableFuture<DeploymentUnit> fromPaths(List<Path> paths) { + private static DeploymentUnit fromPaths(List<Path> paths) { Objects.requireNonNull(paths); Map<String, InputStream> map = new HashMap<>(); try { @@ -164,6 +172,6 @@ class DeployFiles { } catch (IOException e) { throw new RuntimeException(e); } - return CompletableFuture.completedFuture(() -> map); + return new DeploymentUnit(map); } }