This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 55f33041f8 IGNITE-19926 Close CompletedFileUploads when unit is 
deployed (#2334)
55f33041f8 is described below

commit 55f33041f88422b1f46d4ba999b7bf7d498ae177
Author: Ivan Gagarkin <gagarkin....@gmail.com>
AuthorDate: Tue Jul 25 15:10:27 2023 +0400

    IGNITE-19926 Close CompletedFileUploads when unit is deployed (#2334)
---
 .../internal/deployunit/DefaultNodeCallback.java   | 16 ++++++++-
 .../internal/deployunit/DeploymentManagerImpl.java | 41 ++++++++++++----------
 .../ignite/internal/deployunit/DeploymentUnit.java | 19 ++++++++--
 .../internal/deployunit/FileDeployerService.java   | 12 +++----
 .../internal/deployunit/IgniteDeployment.java      |  4 +--
 .../ignite/internal/deployunit/UnitContent.java    | 24 +++++++++++--
 .../ignite/deployment/FileDeployerServiceTest.java | 41 +++++++++++++++-------
 .../compute/util/DummyIgniteDeployment.java        |  2 +-
 .../deployment/CompletedFileUploadSubscriber.java  | 35 +++++++++++-------
 .../deployment/DeploymentManagementController.java | 23 +++++++++---
 .../internal/compute/ItComputeTestStandalone.java  |  2 +-
 .../ignite/internal/deployment/DeployFiles.java    | 14 ++++++--
 12 files changed, 166 insertions(+), 67 deletions(-)

diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
index d657227a5e..df7cf0c115 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
@@ -32,11 +32,15 @@ import 
org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
 import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
 import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
 import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 
 /**
  * Default implementation of {@link NodeEventCallback}.
  */
 public class DefaultNodeCallback extends NodeEventCallback {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DefaultNodeCallback.class);
+
     private final DeploymentUnitStore deploymentUnitStore;
 
     private final DeployMessagingService messaging;
@@ -83,7 +87,17 @@ public class DefaultNodeCallback extends NodeEventCallback {
     public void onUploading(String id, Version version, List<UnitNodeStatus> 
holders) {
         tracker.track(id, version,
                 () -> messaging.downloadUnitContent(id, version, new 
ArrayList<>(getDeployedNodeIds(holders)))
-                        .thenCompose(content -> deployer.deploy(id, version, 
content))
+                        .thenCompose(content -> {
+                            
org.apache.ignite.internal.deployunit.DeploymentUnit unit = 
UnitContent.toDeploymentUnit(content);
+                            return deployer.deploy(id, version, unit)
+                                    .whenComplete((deployed, err) -> {
+                                        try {
+                                            unit.close();
+                                        } catch (Exception e) {
+                                            LOG.error("Failed to close 
deployment unit", e);
+                                        }
+                                    });
+                        })
                         .thenApply(deployed -> {
                             if (deployed) {
                                 return 
deploymentUnitStore.updateNodeStatus(nodeName, id, version, DEPLOYED);
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 2176715195..bf3a7ee080 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
+import static 
org.apache.ignite.internal.deployunit.UnitContent.toDeploymentUnit;
 
 import java.nio.file.Path;
 import java.time.Duration;
@@ -43,7 +44,6 @@ import 
org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder;
 import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitAlreadyExistsException;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
-import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
 import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallback;
 import 
org.apache.ignite.internal.deployunit.metastore.ClusterEventCallbackImpl;
 import 
org.apache.ignite.internal.deployunit.metastore.ClusterStatusWatchListener;
@@ -181,7 +181,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
             String id,
             Version version,
             boolean force,
-            CompletableFuture<DeploymentUnit> deploymentUnit,
+            DeploymentUnit deploymentUnit,
             NodesToDeploy deployedNodes
     ) {
         checkId(id);
@@ -199,17 +199,17 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
             String id,
             Version version,
             boolean force,
-            CompletableFuture<DeploymentUnit> unitFuture,
+            DeploymentUnit deploymentUnit,
             Set<String> nodesToDeploy
     ) {
         return deploymentUnitStore.createClusterStatus(id, version, 
nodesToDeploy)
-                .thenCompose(clusterStatus -> 
unitFuture.thenCompose(deploymentUnit -> {
+                .thenCompose(clusterStatus -> {
                     if (clusterStatus != null) {
                         return doDeploy(clusterStatus, deploymentUnit, 
nodesToDeploy);
                     } else {
                         if (force) {
                             return undeployAsync(id, version)
-                                    .thenCompose(u -> doDeploy(id, version, 
false, unitFuture, nodesToDeploy));
+                                    .thenCompose(u -> doDeploy(id, version, 
false, deploymentUnit, nodesToDeploy));
                         }
                         LOG.warn("Failed to deploy meta of unit " + id + ":" + 
version + " to metastore. "
                                 + "Already exists.");
@@ -217,7 +217,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
                                 new DeploymentUnitAlreadyExistsException(id,
                                         "Unit " + id + ":" + version + " 
already exists"));
                     }
-                }));
+                });
     }
 
     private CompletableFuture<Boolean> doDeploy(
@@ -225,14 +225,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
             DeploymentUnit deploymentUnit,
             Set<String> nodesToDeploy
     ) {
-        UnitContent unitContent;
-        try {
-            unitContent = UnitContent.readContent(deploymentUnit);
-        } catch (DeploymentUnitReadException e) {
-            LOG.error("Error reading deployment unit content", e);
-            return failedFuture(e);
-        }
-        return deployToLocalNode(clusterStatus, unitContent)
+        return deployToLocalNode(clusterStatus, deploymentUnit)
                 .thenApply(completed -> {
                     if (completed) {
                         nodesToDeploy.forEach(node -> {
@@ -251,8 +244,8 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
                 });
     }
 
-    private CompletableFuture<Boolean> deployToLocalNode(UnitClusterStatus 
clusterStatus, UnitContent unitContent) {
-        return deployer.deploy(clusterStatus.id(), clusterStatus.version(), 
unitContent)
+    private CompletableFuture<Boolean> deployToLocalNode(UnitClusterStatus 
clusterStatus, DeploymentUnit deploymentUnit) {
+        return deployer.deploy(clusterStatus.id(), clusterStatus.version(), 
deploymentUnit)
                 .thenCompose(deployed -> {
                     if (deployed) {
                         return deploymentUnitStore.createNodeStatus(
@@ -366,8 +359,20 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
                         return completedFuture(true);
                     }
                     return messaging.downloadUnitContent(id, version, nodes)
-                            .thenCompose(content -> 
deploymentUnitStore.getClusterStatus(id, version)
-                                    .thenCompose(status -> 
deployToLocalNode(status, content)));
+                            .thenCompose(content -> {
+                                return 
deploymentUnitStore.getClusterStatus(id, version)
+                                        .thenCompose(status -> {
+                                            DeploymentUnit unit = 
toDeploymentUnit(content);
+                                            return deployToLocalNode(status, 
unit)
+                                                    .whenComplete((deployed, 
throwable) -> {
+                                                        try {
+                                                            unit.close();
+                                                        } catch (Exception e) {
+                                                            LOG.error("Error 
closing deployment unit", e);
+                                                        }
+                                                    });
+                                        });
+                            });
 
                 });
     }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
index 3c4bddd4bf..e2186c2b97 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
@@ -19,16 +19,29 @@ package org.apache.ignite.internal.deployunit;
 
 import java.io.InputStream;
 import java.util.Map;
+import org.apache.ignite.internal.util.IgniteUtils;
 
 /**
  * Deployment unit interface.
  */
-@FunctionalInterface
-public interface DeploymentUnit {
+public class DeploymentUnit implements AutoCloseable {
+    private final Map<String, InputStream> content;
+
+    public DeploymentUnit(Map<String, InputStream> content) {
+        this.content = content;
+    }
+
     /**
      * Deployment unit content - a map from file name to input stream.
      *
      * @return Deployment unit content.
      */
-    Map<String, InputStream> content();
+    public Map<String, InputStream> content() {
+        return content;
+    }
+
+    @Override
+    public void close() throws Exception {
+        IgniteUtils.closeAll(content.values());
+    }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
index 30d226a849..f2b74ec7c0 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
@@ -19,11 +19,9 @@ package org.apache.ignite.internal.deployunit;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
 import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.SYNC;
-import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -70,21 +68,21 @@ public class FileDeployerService {
      *
      * @param id Deploy unit identifier.
      * @param version Deploy unit version.
-     * @param unitContent Map of deploy unit file names to file content.
+     * @param deploymentUnit Deployment unit.
      * @return Future with deploy result.
      */
-    public CompletableFuture<Boolean> deploy(String id, Version version, 
UnitContent unitContent) {
+    public CompletableFuture<Boolean> deploy(String id, Version version, 
DeploymentUnit deploymentUnit) {
         return CompletableFuture.supplyAsync(() -> {
             try {
                 Path unitFolder = unitPath(id, version);
 
                 Files.createDirectories(unitFolder);
 
-                for (Entry<String, byte[]> entry : unitContent) {
+                for (Entry<String, InputStream> entry : 
deploymentUnit.content().entrySet()) {
                     String fileName = entry.getKey();
                     Path unitPath = unitFolder.resolve(fileName);
                     Path unitPathTmp = unitFolder.resolve(fileName + 
TMP_SUFFIX);
-                    Files.write(unitPathTmp, entry.getValue(), CREATE, SYNC, 
TRUNCATE_EXISTING);
+                    Files.copy(entry.getValue(), unitPathTmp, 
REPLACE_EXISTING);
                     Files.move(unitPathTmp, unitPath, ATOMIC_MOVE, 
REPLACE_EXISTING);
                 }
                 return true;
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
index 980427f091..95cdb0e216 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
@@ -39,7 +39,7 @@ public interface IgniteDeployment extends IgniteComponent {
     default CompletableFuture<Boolean> deployAsync(
             String id,
             Version version,
-            CompletableFuture<DeploymentUnit> deploymentUnit,
+            DeploymentUnit deploymentUnit,
             NodesToDeploy nodesToDeploy
     ) {
         return deployAsync(id, version, false, deploymentUnit, nodesToDeploy);
@@ -60,7 +60,7 @@ public interface IgniteDeployment extends IgniteComponent {
             String id,
             Version version,
             boolean force,
-            CompletableFuture<DeploymentUnit> deploymentUnit,
+            DeploymentUnit deploymentUnit,
             NodesToDeploy nodesToDeploy
     );
 
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
index f8aa584940..61001613cc 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
@@ -17,8 +17,11 @@
 
 package org.apache.ignite.internal.deployunit;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -84,13 +87,28 @@ public class UnitContent implements Iterable<Entry<String, 
byte[]>> {
      * @return Unit content from provided deployment unit.
      */
     public static UnitContent readContent(DeploymentUnit deploymentUnit) {
-        return new UnitContent(deploymentUnit.content().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+        Map<String, byte[]> map = deploymentUnit.content().entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, entry -> {
                     try {
                         return entry.getValue().readAllBytes();
                     } catch (IOException e) {
                         throw new DeploymentUnitReadException(e);
                     }
-                })));
+                }));
+        return new UnitContent(map);
+    }
+
+    /**
+     * Convert unit content to {@link DeploymentUnit}.
+     *
+     * @param content Unit content.
+     * @return Deployment unit instance.
+     */
+    public static DeploymentUnit toDeploymentUnit(UnitContent content) {
+        Map<String, InputStream> files = new HashMap<>();
+        content.iterator().forEachRemaining(it -> {
+            files.put(it.getKey(), new ByteArrayInputStream(it.getValue()));
+        });
+        return new DeploymentUnit(files);
     }
 }
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
index 3b94764ebe..7baab5e26c 100644
--- 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
@@ -22,11 +22,17 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.deployunit.DeploymentUnit;
 import org.apache.ignite.internal.deployunit.FileDeployerService;
 import org.apache.ignite.internal.deployunit.UnitContent;
 import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -63,21 +69,32 @@ public class FileDeployerServiceTest {
     }
 
     @Test
-    public void test() throws IOException {
-        CompletableFuture<Boolean> deployed = service.deploy("id", 
parseVersion("1.0.0"), content());
-        assertThat(deployed, willBe(true));
+    public void test() throws Exception {
+        try (DeploymentUnit unit = content()) {
+            CompletableFuture<Boolean> deployed = service.deploy("id", 
parseVersion("1.0.0"), unit);
+            assertThat(deployed, willBe(true));
+        }
 
-        CompletableFuture<UnitContent> unitContent = 
service.getUnitContent("id", parseVersion("1.0.0"));
-        assertThat(unitContent, willBe(equalTo(content())));
+        try (DeploymentUnit unit = content()) {
+            CompletableFuture<UnitContent> unitContent = 
service.getUnitContent("id", parseVersion("1.0.0"));
+            assertThat(unitContent, 
willBe(equalTo(UnitContent.readContent(unit))));
+        }
     }
 
-    private UnitContent content() throws IOException {
-        byte[] content1 = Files.readAllBytes(file1);
-        byte[] content2 = Files.readAllBytes(file2);
-        byte[] content3 = Files.readAllBytes(file3);
+    private DeploymentUnit content() {
+        Map<String, InputStream> map = Stream.of(file1, file2, file3)
+                .collect(Collectors.toMap(it -> it.getFileName().toString(), 
it -> {
+                    try {
+                        byte[] buf = Files.readAllBytes(it);
+                        if (buf.length == 0) {
+                            throw new RuntimeException(new 
FileNotFoundException(it.toString()));
+                        }
+                        return new ByteArrayInputStream(buf);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }));
 
-        return new UnitContent(Map.of(file1.getFileName().toString(), content1,
-                file2.getFileName().toString(), content2,
-                file3.getFileName().toString(), content3));
+        return new DeploymentUnit(map);
     }
 }
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java
index 408d3da8e4..c60d219f20 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java
@@ -50,7 +50,7 @@ public class DummyIgniteDeployment implements 
IgniteDeployment {
             String id,
             Version version,
             boolean force,
-            CompletableFuture<DeploymentUnit> deploymentUnit,
+            DeploymentUnit deploymentUnit,
             NodesToDeploy nodesToDeploy) {
         throw new UnsupportedOperationException("Not implemented");
     }
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
index 4eb5fba651..e9c6706471 100644
--- 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.deployunit.DeploymentUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
@@ -31,22 +33,15 @@ import org.reactivestreams.Subscription;
  * Implementation of {@link Subscriber} based on {@link CompletedFileUpload} 
which will collect uploaded files to the
  * {@link DeploymentUnit}.
  */
-class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload> 
{
-    private final CompletableFuture<DeploymentUnit> result;
+class CompletedFileUploadSubscriber implements 
Subscriber<CompletedFileUpload>, AutoCloseable {
+    private static final IgniteLogger LOG = 
Loggers.forClass(CompletedFileUploadSubscriber.class);
+
+    private final CompletableFuture<DeploymentUnit> result = new 
CompletableFuture<>();
 
     private final Map<String, InputStream> content = new HashMap<>();
 
     private IOException ex;
 
-    /**
-     * Constructor.
-     *
-     * @param result Result future.
-     */
-    CompletedFileUploadSubscriber(CompletableFuture<DeploymentUnit> result) {
-        this.result = result;
-    }
-
     @Override
     public void onSubscribe(Subscription subscription) {
         subscription.request(Long.MAX_VALUE);
@@ -57,6 +52,7 @@ class CompletedFileUploadSubscriber implements 
Subscriber<CompletedFileUpload> {
         try {
             content.put(item.getFilename(), item.getInputStream());
         } catch (IOException e) {
+            LOG.error("Failed to read file: " + item.getFilename(), e);
             if (ex != null) {
                 ex.addSuppressed(e);
             } else {
@@ -75,7 +71,22 @@ class CompletedFileUploadSubscriber implements 
Subscriber<CompletedFileUpload> {
         if (ex != null) {
             result.completeExceptionally(ex);
         } else {
-            result.complete(() -> content);
+            result.complete(new DeploymentUnit(content));
         }
     }
+
+    public CompletableFuture<DeploymentUnit> result() {
+        return result;
+    }
+
+    @Override
+    public void close() throws Exception {
+        result.thenAccept(it -> {
+            try {
+                it.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
 }
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
index c3496d1f21..0f0cf3e168 100644
--- 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
@@ -29,10 +29,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.ignite.compute.version.Version;
-import org.apache.ignite.internal.deployunit.DeploymentUnit;
 import org.apache.ignite.internal.deployunit.IgniteDeployment;
 import org.apache.ignite.internal.deployunit.NodesToDeploy;
 import org.apache.ignite.internal.deployunit.UnitStatuses;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.rest.api.deployment.DeploymentCodeApi;
 import org.apache.ignite.internal.rest.api.deployment.DeploymentStatus;
 import org.apache.ignite.internal.rest.api.deployment.InitialDeployMode;
@@ -47,6 +48,8 @@ import org.reactivestreams.Publisher;
 @SuppressWarnings("OptionalContainsCollection")
 @Controller("/management/v1/deployment")
 public class DeploymentManagementController implements DeploymentCodeApi {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DeploymentManagementController.class);
+
     private final IgniteDeployment deployment;
 
     public DeploymentManagementController(IgniteDeployment deployment) {
@@ -61,11 +64,23 @@ public class DeploymentManagementController implements 
DeploymentCodeApi {
             Optional<InitialDeployMode> deployMode,
             Optional<List<String>> initialNodes
     ) {
-        CompletableFuture<DeploymentUnit> result = new CompletableFuture<>();
-        unitContent.subscribe(new CompletedFileUploadSubscriber(result));
+
+        CompletedFileUploadSubscriber subscriber = new 
CompletedFileUploadSubscriber();
+        unitContent.subscribe(subscriber);
+
         NodesToDeploy nodesToDeploy = initialNodes.map(NodesToDeploy::new)
                 .orElseGet(() -> new 
NodesToDeploy(fromInitialDeployMode(deployMode)));
-        return deployment.deployAsync(unitId, 
Version.parseVersion(unitVersion), result, nodesToDeploy);
+
+        return subscriber.result().thenCompose(content -> {
+            return deployment.deployAsync(unitId, 
Version.parseVersion(unitVersion), content, nodesToDeploy);
+        }).whenComplete((res, throwable) -> {
+            try {
+                subscriber.close();
+            } catch (Exception e) {
+                LOG.error("Failed to close subscriber", e);
+            }
+        });
+
     }
 
     @Override
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
index d5ad96bb6c..43943b861d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
@@ -182,7 +182,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
             CompletableFuture<Boolean> deployed = 
node.deployment().deployAsync(
                     unitId,
                     unitVersion,
-                    CompletableFuture.completedFuture(() -> Map.of(jarName, 
jarStream)),
+                    new 
org.apache.ignite.internal.deployunit.DeploymentUnit(Map.of(jarName, 
jarStream)),
                     new NodesToDeploy(MAJORITY)
             );
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
index 11bd2d03d0..235be55ae3 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
@@ -116,8 +116,16 @@ class DeployFiles {
 
         CompletableFuture<Boolean> deploy;
 
+        DeploymentUnit deploymentUnit = fromPaths(paths);
         deploy = entryNode.deployment()
-                .deployAsync(id, version, force, fromPaths(paths), 
nodesToDeploy);
+                .deployAsync(id, version, force, deploymentUnit, nodesToDeploy)
+                .whenComplete((res, err) -> {
+                    try {
+                        deploymentUnit.close();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
 
         assertThat(deploy, willBe(true));
 
@@ -154,7 +162,7 @@ class DeployFiles {
         return builder.build();
     }
 
-    private static CompletableFuture<DeploymentUnit> fromPaths(List<Path> 
paths) {
+    private static DeploymentUnit fromPaths(List<Path> paths) {
         Objects.requireNonNull(paths);
         Map<String, InputStream> map = new HashMap<>();
         try {
@@ -164,6 +172,6 @@ class DeployFiles {
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        return CompletableFuture.completedFuture(() -> map);
+        return new DeploymentUnit(map);
     }
 }

Reply via email to