This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9368ef31c98 IGNITE-26455 Add support for deploy units with folders
structure (#6625)
9368ef31c98 is described below
commit 9368ef31c98956266316202b524aca065faec644
Author: Mikhail <[email protected]>
AuthorDate: Thu Sep 25 17:34:28 2025 +0300
IGNITE-26455 Add support for deploy units with folders structure (#6625)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../ignite/internal/deployment/DeployFile.java | 9 +-
.../ignite/internal/deployment/DeployFiles.java | 104 ++++++++++---
.../internal/deployment/ItDeploymentUnitTest.java | 18 +++
.../apache/ignite/internal/deployment/Unit.java | 35 ++++-
.../internal/deployunit/DeployerProcessor.java | 86 +++++++++++
.../ignite/internal/deployunit/DeploymentUnit.java | 50 ++++---
.../deployunit/DeploymentUnitProcessor.java | 59 ++++++++
.../internal/deployunit/FileDeployerService.java | 25 +---
...eploymentUnit.java => FilesDeploymentUnit.java} | 28 +++-
.../ignite/internal/deployunit/UnitContent.java | 23 +--
.../internal/deployunit/ZipDeploymentUnit.java | 66 ++++++++
.../DeploymentUnitZipException.java} | 31 ++--
.../ignite/deployment/FileDeployerServiceTest.java | 31 +++-
.../internal/compute/ItComputeTestStandalone.java | 3 +-
.../internal/testframework/IgniteTestUtils.java | 26 ++++
.../testframework/WorkDirectoryExtension.java | 15 +-
modules/platforms/cpp/ignite/common/error_codes.h | 1 +
modules/platforms/cpp/ignite/odbc/common_types.cpp | 1 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 3 +
.../rest/api/deployment/DeploymentCodeApi.java | 38 +++++
.../DeploymentManagementControllerTest.java | 166 ++++++++++++++++++---
.../deployment/CompletedFileUploadSubscriber.java | 62 ++++----
.../deployment/DeploymentManagementController.java | 46 ++++--
.../rest/deployment/InputStreamCollector.java | 63 ++++++++
.../rest/deployment/InputStreamCollectorImpl.java | 57 +++++++
.../rest/deployment/ZipInputStreamCollector.java | 98 ++++++++++++
.../handler/DeploymentUnitZipExceptionHandler.java | 44 ++++++
28 files changed, 1006 insertions(+), 185 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index e259b65e281..35fdd683a56 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -571,6 +571,9 @@ public class ErrorGroups {
/** Deployment unit is unavailable for computing. */
public static final int UNIT_UNAVAILABLE_ERR =
CODE_DEPLOYMENT_ERR_GROUP.registerErrorCode((short) 4);
+
+ /** Deployment unit zip deploy error. */
+ public static final int UNIT_ZIP_ERR =
CODE_DEPLOYMENT_ERR_GROUP.registerErrorCode((short) 5);
}
/**
diff --git
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
index 071aa615a46..c76b9a84031 100644
---
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
+++
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
@@ -29,10 +29,13 @@ class DeployFile {
private final int replicaTimeout;
- DeployFile(Path file, long expectedSize, int replicaTimeout) throws
IOException {
+ private final boolean zip;
+
+ DeployFile(Path file, boolean zip, long expectedSize, int replicaTimeout)
throws IOException {
this.file = file;
this.expectedSize = expectedSize;
this.replicaTimeout = replicaTimeout;
+ this.zip = zip;
ensureExists();
}
@@ -46,6 +49,10 @@ class DeployFile {
return file;
}
+ boolean zip() {
+ return zip;
+ }
+
long expectedSize() {
return expectedSize;
}
diff --git
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
index de5a1ffb265..d356a870d47 100644
---
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
+++
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
@@ -18,9 +18,10 @@
package org.apache.ignite.internal.deployment;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.createZipFile;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.io.InputStream;
@@ -29,15 +30,16 @@ import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
+import java.util.zip.ZipInputStream;
import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.deployunit.DeploymentUnit;
+import org.apache.ignite.internal.deployunit.FilesDeploymentUnit;
import org.apache.ignite.internal.deployunit.NodesToDeploy;
import org.apache.ignite.internal.deployunit.UnitStatuses;
import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder;
+import org.apache.ignite.internal.deployunit.ZipDeploymentUnit;
class DeployFiles {
private static final int BASE_REPLICA_TIMEOUT = 30;
@@ -56,6 +58,10 @@ class DeployFiles {
private DeployFile bigFile;
+ private DeployFile flatZipFile;
+
+ private DeployFile treeZipFile;
+
// TODO https://issues.apache.org/jira/browse/IGNITE-20204
DeployFiles(Path workDir) {
this.workDir = workDir;
@@ -63,7 +69,16 @@ class DeployFiles {
private static DeployFile create(Path path, long size, int replicaTimeout)
{
try {
- return new DeployFile(path, size, replicaTimeout);
+ return new DeployFile(path, false, size, replicaTimeout);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private DeployFile createZip(Path path, Map<String, Long> content, int
replicaTimeout) {
+ try {
+ createZipFile(content, path);
+ return new DeployFile(path, true,
content.values().stream().mapToLong(l -> l).sum(), replicaTimeout);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -90,6 +105,39 @@ class DeployFiles {
return bigFile;
}
+ DeployFile flatZipFile() {
+ if (flatZipFile == null) {
+ flatZipFile = createZip(
+ workDir.resolve("flat.zip"),
+ Map.of(
+ "a1", SMALL_IN_BYTES,
+ "a2", SMALL_IN_BYTES,
+ "a3", SMALL_IN_BYTES,
+ "a4", SMALL_IN_BYTES
+ ),
+ BASE_REPLICA_TIMEOUT
+ );
+ }
+ return flatZipFile;
+ }
+
+ DeployFile treeZipFile() {
+ if (treeZipFile == null) {
+ treeZipFile = createZip(
+ workDir.resolve("tree.zip"),
+ Map.of(
+ "a1/a2", SMALL_IN_BYTES,
+ "b1", SMALL_IN_BYTES,
+ "c1/c2/c3/c4", MEDIUM_IN_BYTES,
+ "d1/d2", SMALL_IN_BYTES,
+ "d1/a2", SMALL_IN_BYTES
+ ),
+ BASE_REPLICA_TIMEOUT * 3
+ );
+ }
+ return treeZipFile;
+ }
+
private Unit deployAndVerify(String id, Version version, DeployFile file,
IgniteImpl entryNode) {
return deployAndVerify(id, version, false, file, entryNode);
}
@@ -110,14 +158,8 @@ class DeployFiles {
NodesToDeploy nodesToDeploy,
IgniteImpl entryNode
) {
- List<Path> paths = files.stream()
- .map(DeployFile::file)
- .collect(Collectors.toList());
-
- CompletableFuture<Boolean> deploy;
-
- DeploymentUnit deploymentUnit = fromPaths(paths);
- deploy = entryNode.deployment()
+ DeploymentUnit deploymentUnit = fromFiles(files);
+ CompletableFuture<Boolean> deploy = entryNode.deployment()
.deployAsync(id, version, force, deploymentUnit, nodesToDeploy)
.whenComplete((res, err) -> {
try {
@@ -131,11 +173,8 @@ class DeployFiles {
Unit unit = new Unit(entryNode, workDir, id, version, files);
- Path nodeUnitDirectory = unit.getNodeUnitDirectory(entryNode);
-
for (DeployFile file : files) {
- Path filePath =
nodeUnitDirectory.resolve(file.file().getFileName());
- assertTrue(Files.exists(filePath));
+ unit.verify(file, entryNode);
}
return unit;
@@ -153,6 +192,14 @@ class DeployFiles {
return deployAndVerify(id, version, bigFile(), entryNode);
}
+ public Unit deployAndVerifyFlatZip(String id, Version version, IgniteImpl
entryNode) {
+ return deployAndVerify(id, version, flatZipFile(), entryNode);
+ }
+
+ public Unit deployAndVerifyTreeZip(String id, Version version, IgniteImpl
entryNode) {
+ return deployAndVerify(id, version, treeZipFile(), entryNode);
+ }
+
public static UnitStatuses buildStatus(String id, Unit... units) {
UnitStatusesBuilder builder = UnitStatuses.builder(id);
for (Unit unit : units) {
@@ -169,16 +216,31 @@ class DeployFiles {
Files.copy(file.file(), unitFile);
}
- private static DeploymentUnit fromPaths(List<Path> paths) {
- Objects.requireNonNull(paths);
+ private static DeploymentUnit fromFiles(List<DeployFile> files) {
Map<String, InputStream> map = new HashMap<>();
+ ZipInputStream zis = null;
try {
- for (Path path : paths) {
- map.put(path.getFileName().toString(),
Files.newInputStream(path));
+ for (DeployFile file : files) {
+ if (file.zip()) {
+ if (zis != null) {
+ fail("Only single zip file deploy is supported.");
+ }
+ zis = new
ZipInputStream(Files.newInputStream(file.file()));
+ } else {
+ map.put(file.file().getFileName().toString(),
Files.newInputStream(file.file()));
+ }
}
} catch (IOException e) {
throw new RuntimeException(e);
}
- return new DeploymentUnit(map);
+
+ if (zis != null) {
+ if (!map.isEmpty()) {
+ fail("Mixed zip and plain files deploy is not supported.");
+ }
+ return new ZipDeploymentUnit(zis);
+ } else {
+ return new FilesDeploymentUnit(map);
+ }
}
}
diff --git
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index fc5554e8baa..1a33fb7d1c2 100644
---
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -281,4 +281,22 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
smallUnit.waitUnitClean(igniteImpl(2));
}
+
+ @Test
+ public void testZipDeploy() {
+ String id = "test";
+ Unit unit = files.deployAndVerifyFlatZip(id,
Version.parseVersion("1.1.0"), igniteImpl(1));
+
+ Unit unit2 = files.deployAndVerifyTreeZip(id,
Version.parseVersion("1.1.1"), igniteImpl(1));
+
+ UnitStatuses status = buildStatus(id, unit, unit2);
+
+ await().timeout(2, SECONDS)
+ .pollDelay(500, MILLISECONDS)
+ .until(() ->
igniteImpl(2).deployment().clusterStatusesAsync(), willBe(List.of(status)));
+
+ IgniteImpl cmg = igniteImpl(0);
+ unit.waitUnitReplica(cmg);
+ unit2.waitUnitReplica(cmg);
+ }
}
diff --git
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/Unit.java
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/Unit.java
index e09445f4b3d..c2ae55763cf 100644
---
a/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/Unit.java
+++
b/modules/code-deployment/src/integrationTest/java/org/apache/ignite/internal/deployment/Unit.java
@@ -19,15 +19,21 @@ package org.apache.ignite.internal.deployment;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.app.IgniteImpl;
import
org.apache.ignite.internal.deployunit.configuration.DeploymentExtensionConfiguration;
+import org.hamcrest.Matchers;
class Unit {
private final IgniteImpl deployedNode;
@@ -108,8 +114,6 @@ class Unit {
}
void waitUnitReplica(IgniteImpl ignite) {
- Path unitDirectory = getNodeUnitDirectory(ignite);
-
int combinedTimeout =
files.stream().map(DeployFile::replicaTimeout).reduce(Integer::sum).get();
await().timeout(combinedTimeout, SECONDS)
@@ -117,13 +121,32 @@ class Unit {
.ignoreException(IOException.class)
.until(() -> {
for (DeployFile file : files) {
- Path filePath =
unitDirectory.resolve(file.file().getFileName());
- if (Files.notExists(filePath) || Files.size(filePath)
!= file.expectedSize()) {
- return false;
- }
+ verify(file, ignite);
}
return true;
});
}
+
+ public void verify(DeployFile file, IgniteImpl entryNode) {
+ Path nodeUnitDirectory = getNodeUnitDirectory(entryNode);
+ if (file.zip()) {
+ try (ZipInputStream zis = new
ZipInputStream(Files.newInputStream(file.file()))) {
+ ZipEntry ze;
+ while ((ze = zis.getNextEntry()) != null) {
+
assertTrue(Files.exists(nodeUnitDirectory.resolve(ze.getName())));
+ }
+ } catch (IOException e) {
+ fail(e);
+ }
+ } else {
+ try {
+ Path filePath =
nodeUnitDirectory.resolve(file.file().getFileName());
+ assertTrue(Files.exists(filePath));
+ assertThat(Files.size(filePath),
Matchers.is(file.expectedSize()));
+ } catch (IOException e) {
+ fail(e);
+ }
+ }
+ }
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployerProcessor.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployerProcessor.java
new file mode 100644
index 00000000000..258a4e2922a
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployerProcessor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map.Entry;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/**
+ * Implementation of {@link DeploymentUnitProcessor} that deploys deployment
unit content to the file system.
+ *
+ * <p>This processor extracts and deploys files from deployment units to a
specified target directory.
+ * It handles both regular deployment units (containing individual files as
input streams) and ZIP-based
+ * deployment units (containing compressed archives that need extraction).
+ *
+ * <p>The deployment process ensures atomic file operations by:
+ * <ul>
+ * <li>First copying files to temporary locations with a {@code .tmp}
suffix</li>
+ * <li>Then atomically moving them to their final destinations</li>
+ * <li>Creating necessary parent directories as needed</li>
+ * </ul>
+ *
+ * <p>This approach prevents partial deployments and ensures that files are
either fully deployed
+ * or not deployed at all, maintaining consistency during the deployment
process.
+ *
+ * <p>Type parameters:
+ * <ul>
+ * <li>{@code Path} - the argument type representing the target deployment
directory</li>
+ * </ul>
+ */
+public class DeployerProcessor implements DeploymentUnitProcessor<Path> {
+ /** Suffix used for temporary files during the deployment process. */
+ private static final String TMP_SUFFIX = ".tmp";
+
+ @Override
+ public void processContent(FilesDeploymentUnit unit, Path unitFolder)
throws IOException {
+ for (Entry<String, InputStream> e : unit.content().entrySet()) {
+ doDeploy(unitFolder, e.getKey(), e.getValue());
+ }
+ }
+
+ @Override
+ public void processContentWithUnzip(ZipDeploymentUnit unit, Path
unitFolder) throws IOException {
+ ZipInputStream zis = unit.zis();
+ ZipEntry ze;
+ while ((ze = zis.getNextEntry()) != null) {
+ if (ze.isDirectory()) {
+ // To support empty dirs.
+ Path entryPath = unitFolder.resolve(ze.getName());
+ Files.createDirectories(entryPath);
+ } else {
+ doDeploy(unitFolder, ze.getName(), zis);
+ }
+ }
+ }
+
+ private static void doDeploy(Path unitFolder, String entryName,
InputStream is) throws IOException {
+ Path unitPath = unitFolder.resolve(entryName);
+ Files.createDirectories(unitPath.getParent());
+ Path unitPathTmp = unitFolder.resolve(entryName + TMP_SUFFIX);
+ Files.copy(is, unitPathTmp, REPLACE_EXISTING);
+ Files.move(unitPathTmp, unitPath, ATOMIC_MOVE, REPLACE_EXISTING);
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
index bb36c8dccc0..0d86117097f 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
@@ -17,32 +17,38 @@
package org.apache.ignite.internal.deployunit;
-import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
-
-import java.io.InputStream;
-import java.util.Map;
+import java.io.IOException;
/**
- * Deployment unit interface.
+ * Interface representing a deployment unit in the Apache Ignite code
deployment system.
+ *
+ * <p>A deployment unit is a container for code and resources that can be
deployed to and managed
+ * within an Ignite cluster. This interface provides a contract for processing
deployment unit
+ * content through a processor pattern, allowing for flexible handling of
different types of
+ * deployment units such as regular file-based units and ZIP-compressed units.
*/
-public class DeploymentUnit implements AutoCloseable {
- private final Map<String, InputStream> content;
-
- public DeploymentUnit(Map<String, InputStream> content) {
- this.content = content;
- }
-
+public interface DeploymentUnit extends AutoCloseable {
/**
- * Deployment unit content - a map from file name to input stream.
+ * Processes the deployment unit content using the provided processor.
+ *
+ * <p>This method delegates the processing of the deployment unit to the
specified processor,
+ * following a strategy pattern. The processor determines how the unit
content should be
+ * handled based on its implementation. Different processors can perform
different operations
+ * on the same deployment unit.
+ *
+ * <p>The method supports generic type parameters to allow processors to
work with different
+ * argument types and return different result types, providing flexibility
for various
+ * processing scenarios.
+ *
+ * <p>For ZIP-based deployment units, the processor may need to handle
both regular content
+ * and compressed content that requires extraction.
*
- * @return Deployment unit content.
+ *
+ * @param <T> the type of argument passed to the processor
+ * @param processor the processor that will handle the deployment unit
content
+ * @param unitFolder the argument to be passed to the processor during
processing
+ * @throws IOException if an I/O error occurs during processing, such as
issues reading
+ * deployment unit content or writing processed
results.
*/
- public Map<String, InputStream> content() {
- return content;
- }
-
- @Override
- public void close() throws Exception {
- closeAll(content.values());
- }
+ <T> void process(DeploymentUnitProcessor<T> processor, T unitFolder)
throws IOException;
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java
new file mode 100644
index 00000000000..8b768d3536c
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import java.io.IOException;
+
+/**
+ * Processor interface for handling deployment unit content operations.
+ *
+ * <p>This interface defines a contract for processing deployment units,
providing methods to handle
+ * both regular deployment units and ZIP-based deployment units.
Implementations of this interface
+ * can perform various operations on deployment unit content such as
deployment, validation,
+ * transformation, or extraction.
+ *
+ * @param <T> the type of argument passed to the processing methods
+ */
+public interface DeploymentUnitProcessor<T> {
+ /**
+ * Processes the content of a regular deployment unit.
+ *
+ * <p>This method handles deployment units that contain a collection of
files represented
+ * as input streams. The implementation should process each file in the
deployment unit
+ * according to the specific processor's logic.
+ *
+ * @param unit the deployment unit containing the content to be processed
+ * @param arg the argument to be used during processing
+ * @throws IOException if an I/O error occurs during processing
+ */
+ void processContent(FilesDeploymentUnit unit, T arg) throws IOException;
+
+ /**
+ * Processes the content of a ZIP-based deployment unit with automatic
extraction.
+ *
+ * <p>This method handles deployment units that contain ZIP archives. The
implementation
+ * should process the ZIP content by extracting and handling each entry
according to the
+ * specific processor's logic. This method is typically used when the
deployment unit
+ * contains compressed content that needs to be extracted during
processing.
+ *
+ * @param unit the ZIP deployment unit containing the compressed content
to be processed
+ * @param arg the argument to be used during processing
+ * @throws IOException if an I/O error occurs during processing or
extraction
+ */
+ void processContentWithUnzip(ZipDeploymentUnit unit, T arg) throws
IOException;
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
index 73d13708c7d..863b14715d7 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
@@ -17,11 +17,7 @@
package org.apache.ignite.internal.deployunit;
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-
import java.io.IOException;
-import java.io.InputStream;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -29,7 +25,6 @@ import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -46,8 +41,6 @@ import org.apache.ignite.internal.util.IgniteUtils;
public class FileDeployerService {
private static final IgniteLogger LOG =
Loggers.forClass(FileDeployerService.class);
- private static final String TMP_SUFFIX = ".tmp";
-
private static final int DEPLOYMENT_EXECUTOR_SIZE = 4;
/**
@@ -57,6 +50,8 @@ public class FileDeployerService {
private final ExecutorService executor;
+ private final DeploymentUnitProcessor<Path> deployProcessor = new
DeployerProcessor();
+
/** Constructor. */
public FileDeployerService(String nodeName) {
executor = Executors.newFixedThreadPool(
@@ -81,16 +76,10 @@ public class FileDeployerService {
return CompletableFuture.supplyAsync(() -> {
try {
Path unitFolder = unitPath(id, version);
-
Files.createDirectories(unitFolder);
- for (Entry<String, InputStream> entry :
deploymentUnit.content().entrySet()) {
- String fileName = entry.getKey();
- Path unitPath = unitFolder.resolve(fileName);
- Path unitPathTmp = unitFolder.resolve(fileName +
TMP_SUFFIX);
- Files.copy(entry.getValue(), unitPathTmp,
REPLACE_EXISTING);
- Files.move(unitPathTmp, unitPath, ATOMIC_MOVE,
REPLACE_EXISTING);
- }
+ deploymentUnit.process(deployProcessor, unitFolder);
+
return true;
} catch (IOException e) {
LOG.error("Failed to deploy unit " + id + ":" + version, e);
@@ -129,10 +118,12 @@ public class FileDeployerService {
return CompletableFuture.supplyAsync(() -> {
Map<String, byte[]> result = new HashMap<>();
try {
- Files.walkFileTree(unitPath(id, version), new
SimpleFileVisitor<>() {
+ Path unitFolder = unitPath(id, version);
+ Files.walkFileTree(unitFolder, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file,
BasicFileAttributes attrs) throws IOException {
- result.put(file.getFileName().toString(),
Files.readAllBytes(file));
+ Path unitStructure = unitFolder.relativize(file);
+ result.put(unitStructure.toString(),
Files.readAllBytes(file));
return FileVisitResult.CONTINUE;
}
});
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FilesDeploymentUnit.java
similarity index 54%
copy from
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
copy to
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FilesDeploymentUnit.java
index bb36c8dccc0..3d01bd2f07d 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FilesDeploymentUnit.java
@@ -19,23 +19,34 @@ package org.apache.ignite.internal.deployunit;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
/**
- * Deployment unit interface.
+ * Standard implementation of {@link DeploymentUnit} that handles regular
(non-compressed) deployment content.
+ *
+ * <p>This class represents a deployment unit containing a collection of
files, where each file is
+ * represented as a mapping from file name to its corresponding {@link
InputStream}. This implementation is designed for straightforward
+ * deployment scenarios where the content does not require compression or
special extraction handling.
+ *
*/
-public class DeploymentUnit implements AutoCloseable {
+public class FilesDeploymentUnit implements DeploymentUnit {
+ /**
+ * The deployment unit content represented as a mapping from file names to
their input streams. Each entry represents a file within the
+ * deployment unit.
+ */
private final Map<String, InputStream> content;
- public DeploymentUnit(Map<String, InputStream> content) {
+ /**
+ * Constructor.
+ */
+ public FilesDeploymentUnit(Map<String, InputStream> content) {
this.content = content;
}
/**
- * Deployment unit content - a map from file name to input stream.
- *
- * @return Deployment unit content.
+ * Returns the deployment unit content as a map of file names to input
streams.
*/
public Map<String, InputStream> content() {
return content;
@@ -45,4 +56,9 @@ public class DeploymentUnit implements AutoCloseable {
public void close() throws Exception {
closeAll(content.values());
}
+
+ @Override
+ public <T> void process(DeploymentUnitProcessor<T> processor, T arg)
throws IOException {
+ processor.processContent(this, arg);
+ }
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
index 6ea4cf60b02..7cd6e7a3b5e 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitContent.java
@@ -18,15 +18,12 @@
package org.apache.ignite.internal.deployunit;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.stream.Collectors;
-import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
/**
* Unit content representation.
@@ -78,24 +75,6 @@ public class UnitContent implements Iterable<Entry<String,
byte[]>> {
return files.entrySet().iterator();
}
- /**
- * Read unit content from unit {@link DeploymentUnit}.
- *
- * @param deploymentUnit Deployment unit instance.
- * @return Unit content from provided deployment unit.
- */
- public static UnitContent readContent(DeploymentUnit deploymentUnit) {
- Map<String, byte[]> map = deploymentUnit.content().entrySet().stream()
- .collect(Collectors.toMap(Entry::getKey, entry -> {
- try {
- return entry.getValue().readAllBytes();
- } catch (IOException e) {
- throw new DeploymentUnitReadException(e);
- }
- }));
- return new UnitContent(map);
- }
-
/**
* Convert unit content to {@link DeploymentUnit}.
*
@@ -107,6 +86,6 @@ public class UnitContent implements Iterable<Entry<String,
byte[]>> {
content.iterator().forEachRemaining(it -> {
files.put(it.getKey(), new ByteArrayInputStream(it.getValue()));
});
- return new DeploymentUnit(files);
+ return new FilesDeploymentUnit(files);
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java
new file mode 100644
index 00000000000..78f5874e25b
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/ZipDeploymentUnit.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import java.io.IOException;
+import java.util.zip.ZipInputStream;
+
+/**
+ * A specialized implementation of {@link DeploymentUnit} that handles
ZIP-compressed deployment content.
+ *
+ * <p>This class represents a deployment unit that contains ZIP-compressed
archive that require extraction during processing.
+ */
+public class ZipDeploymentUnit implements DeploymentUnit {
+ private final ZipInputStream zis;
+
+ /**
+ * Constructor.
+ */
+ public ZipDeploymentUnit(ZipInputStream zis) {
+ this.zis = zis;
+ }
+
+ /**
+ * Processes the deployment unit zip content.
+ *
+ * <p>This method implements the {@link DeploymentUnit} processing
contract with specialized
+ * handling for ZIP content.
+ *
+ * @param <T> the type of argument passed to the processor.
+ * @param processor the processor that will handle both regular and ZIP
content;
+ * must implement both {@code processContent} and {@code
processContentWithUnzip} methods.
+ * @param arg the argument to be passed to the processor during both
processing phases.
+ * @throws IOException if an I/O error occurs during either processing
phase.
+ */
+ @Override
+ public <T> void process(DeploymentUnitProcessor<T> processor, T arg)
throws IOException {
+ processor.processContentWithUnzip(this, arg);
+ }
+
+ /**
+ * Returns the ZIP input streams that require extraction during processing.
+ */
+ public ZipInputStream zis() {
+ return zis;
+ }
+
+ @Override
+ public void close() throws Exception {
+ zis.close();
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/exception/DeploymentUnitZipException.java
similarity index 55%
copy from
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
copy to
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/exception/DeploymentUnitZipException.java
index bb36c8dccc0..7077ebcd9ac 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnit.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/exception/DeploymentUnitZipException.java
@@ -15,34 +15,23 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.deployunit;
+package org.apache.ignite.internal.deployunit.exception;
-import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
-
-import java.io.InputStream;
-import java.util.Map;
+import org.apache.ignite.lang.ErrorGroups.CodeDeployment;
+import org.apache.ignite.lang.IgniteException;
/**
- * Deployment unit interface.
+ * Throws when deployment unit via zip is failed.
*/
-public class DeploymentUnit implements AutoCloseable {
- private final Map<String, InputStream> content;
-
- public DeploymentUnit(Map<String, InputStream> content) {
- this.content = content;
- }
+public class DeploymentUnitZipException extends IgniteException {
+ private static final long serialVersionUID = 8248918298785133526L;
/**
- * Deployment unit content - a map from file name to input stream.
+ * Constructor.
*
- * @return Deployment unit content.
+ * @param message error message.
*/
- public Map<String, InputStream> content() {
- return content;
- }
-
- @Override
- public void close() throws Exception {
- closeAll(content.values());
+ public DeploymentUnitZipException(String message) {
+ super(CodeDeployment.UNIT_ZIP_ERR, message);
}
}
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
index 0cb57643735..fb225ef8cb5 100644
---
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java
@@ -29,12 +29,15 @@ import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.deployunit.DeploymentUnit;
import org.apache.ignite.internal.deployunit.FileDeployerService;
+import org.apache.ignite.internal.deployunit.FilesDeploymentUnit;
import org.apache.ignite.internal.deployunit.UnitContent;
+import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -70,18 +73,18 @@ public class FileDeployerServiceTest {
@Test
public void test() throws Exception {
- try (DeploymentUnit unit = content()) {
+ try (FilesDeploymentUnit unit = content()) {
CompletableFuture<Boolean> deployed = service.deploy("id",
parseVersion("1.0.0"), unit);
assertThat(deployed, willBe(true));
}
- try (DeploymentUnit unit = content()) {
+ try (FilesDeploymentUnit unit = content()) {
CompletableFuture<UnitContent> unitContent =
service.getUnitContent("id", parseVersion("1.0.0"));
- assertThat(unitContent,
willBe(equalTo(UnitContent.readContent(unit))));
+ assertThat(unitContent, willBe(equalTo(readContent(unit))));
}
}
- private DeploymentUnit content() {
+ private FilesDeploymentUnit content() {
Map<String, InputStream> map = Stream.of(file1, file2, file3)
.collect(Collectors.toMap(it -> it.getFileName().toString(),
it -> {
try {
@@ -95,6 +98,24 @@ public class FileDeployerServiceTest {
}
}));
- return new DeploymentUnit(map);
+ return new FilesDeploymentUnit(map);
+ }
+
+ /**
+ * Read unit content from unit {@link DeploymentUnit}.
+ *
+ * @param deploymentUnit Deployment unit instance.
+ * @return Unit content from provided deployment unit.
+ */
+ private static UnitContent readContent(FilesDeploymentUnit deploymentUnit)
{
+ Map<String, byte[]> map = deploymentUnit.content().entrySet().stream()
+ .collect(Collectors.toMap(Entry::getKey, entry -> {
+ try {
+ return entry.getValue().readAllBytes();
+ } catch (IOException e) {
+ throw new DeploymentUnitReadException(e);
+ }
+ }));
+ return new UnitContent(map);
}
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
index c2090bcecb4..dc14070463b 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
@@ -50,6 +50,7 @@ import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.deployunit.FilesDeploymentUnit;
import org.apache.ignite.internal.deployunit.IgniteDeployment;
import org.apache.ignite.internal.deployunit.NodesToDeploy;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -256,7 +257,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
CompletableFuture<Boolean> deployed = deployment.deployAsync(
unitId,
unitVersion,
- new
org.apache.ignite.internal.deployunit.DeploymentUnit(Map.of(jarName,
jarStream)),
+ new FilesDeploymentUnit(Map.of(jarName, jarStream)),
new NodesToDeploy(MAJORITY)
);
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index f379f5d385a..1d044e4ba38 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -21,6 +21,8 @@ import static java.lang.Thread.sleep;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.function.Function.identity;
+import static
org.apache.ignite.internal.testframework.WorkDirectoryExtension.zipDirectory;
+import static org.apache.ignite.internal.util.IgniteUtils.deleteIfExists;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -45,6 +47,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@@ -996,6 +1000,28 @@ public final class IgniteTestUtils {
}
}
+ /**
+ * Generate zip file with dummy content based on provided map.
+ *
+ * @param contentTree Map from zip content files path to size.
+ * @param dest Zip file destination.
+ * @throws IOException if an I/O error is thrown.
+ */
+ public static void createZipFile(Map<String, Long> contentTree, Path dest)
throws IOException {
+ Path zipTempFolder = Files.createTempDirectory("zipContent");
+ for (Entry<String, Long> e : contentTree.entrySet()) {
+ String zipEntryPath = e.getKey();
+ Long entrySize = e.getValue();
+ Path entry = zipTempFolder.resolve(zipEntryPath);
+ if (entrySize > 0) {
+ Files.createDirectories(entry.getParent());
+ fillDummyFile(entry, entrySize);
+ }
+ }
+ zipDirectory(zipTempFolder, dest);
+ deleteIfExists(zipTempFolder);
+ }
+
/**
* Run the closure in the given executor, wait for the result and get it
synchronously.
*
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
index 76327903740..21845b4be36 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
@@ -307,7 +307,20 @@ public class WorkDirectoryExtension
return PATTERN.matcher(property).matches();
}
- private static void zipDirectory(Path source, Path target) {
+ /**
+ * Creates a ZIP archive from the contents of a source directory.
+ *
+ * <p>This method recursively walks through the source directory and
compresses all files (excluding directories)
+ * into a ZIP archive at the target location. The directory structure is
preserved within the ZIP file using
+ * relative paths from the source directory.
+ *
+ * <p>The parent directories of the target ZIP file are created if they
don't exist. If the target file already
+ * exists, it will be overwritten.
+ *
+ * @param source the path to the source directory to be zipped; must be an
existing directory
+ * @param target the path where the ZIP archive will be created; parent
directories will be created if necessary
+ */
+ public static void zipDirectory(Path source, Path target) {
try {
Files.createDirectories(target.getParent());
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index 08f4078ec5b..fa8433aa5b6 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -174,6 +174,7 @@ enum class code : underlying_t {
UNIT_ALREADY_EXISTS = 0xd0002,
UNIT_CONTENT_READ = 0xd0003,
UNIT_UNAVAILABLE = 0xd0004,
+ UNIT_ZIP = 0xd0005,
// GarbageCollector group. Group code: 14
CLOSED = 0xe0001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index f6b09fb07e6..213e67f16d6 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -251,6 +251,7 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::UNIT_ALREADY_EXISTS:
case error::code::UNIT_CONTENT_READ:
case error::code::UNIT_UNAVAILABLE:
+ case error::code::UNIT_ZIP:
return sql_state::SHY000_GENERAL_ERROR;
// GarbageCollector group. Group code: 14
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 77c80bc7e7c..cca00845de9 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -518,6 +518,9 @@ namespace Apache.Ignite
/// <summary> UnitUnavailable error. </summary>
public const int UnitUnavailable = (GroupCode << 16) | (4 &
0xFFFF);
+
+ /// <summary> UnitZip error. </summary>
+ public const int UnitZip = (GroupCode << 16) | (5 & 0xFFFF);
}
/// <summary> GarbageCollector errors. </summary>
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
index a903ba6b912..5a1d13cc293 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/deployment/DeploymentCodeApi.java
@@ -80,6 +80,44 @@ public interface DeploymentCodeApi {
Optional<List<String>> initialNodes
);
+ /**
+ * Deploy unit with zip file REST method.
+ */
+ @Operation(
+ operationId = "deployZipUnit",
+ summary = "Deploy unit with folders structure in zip.",
+ description = "Deploys provided unit in zip file to the cluster
with folders structure."
+ )
+ @ApiResponse(responseCode = "200", description = "Unit deployed
successfully.",
+ content = @Content(mediaType = APPLICATION_JSON, schema =
@Schema(type = "boolean"))
+ )
+ @ApiResponse(responseCode = "409", description = "Unit with same
identifier and version is already deployed.",
+ content = @Content(mediaType = PROBLEM_JSON, schema =
@Schema(implementation = Problem.class))
+ )
+ @ApiResponse(
+ responseCode = "400", description = "Deployment unit with unzip
supports only single zip file.",
+ content = @Content(mediaType = PROBLEM_JSON, schema =
@Schema(implementation = Problem.class))
+ )
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = PROBLEM_JSON, schema =
@Schema(implementation = Problem.class))
+ )
+ @Consumes(FORM_DATA)
+ @Post("units/zip/{unitId}/{unitVersion}")
+ CompletableFuture<Boolean> deployZip(
+ @Schema(name = "unitId", requiredMode = REQUIRED, description =
"The ID of the deployment unit.")
+ String unitId,
+ @Schema(name = "unitVersion", requiredMode = REQUIRED, description
= "The version of the deployment unit.")
+ String unitVersion,
+ @Schema(name = "unitContent", requiredMode = REQUIRED, description
= "The zip file with unit content to deploy.")
+ Publisher<CompletedFileUpload> unitContent,
+ @QueryValue
+ @Schema(name = "deployMode", requiredMode = REQUIRED, description
= "ALL or MAJORITY.")
+ Optional<InitialDeployMode> deployMode,
+ @QueryValue
+ @Schema(name = "initialNodes", requiredMode = REQUIRED,
description = "List of node identifiers to deploy to.")
+ Optional<List<String>> initialNodes
+ );
+
/**
* Undeploy unit REST method.
*/
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementControllerTest.java
index f4f6b02dadb..84e53ded828 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementControllerTest.java
@@ -17,12 +17,14 @@
package org.apache.ignite.internal.rest.deployment;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
import static org.apache.ignite.internal.rest.constants.HttpCode.BAD_REQUEST;
import static org.apache.ignite.internal.rest.constants.HttpCode.CONFLICT;
import static org.apache.ignite.internal.rest.constants.HttpCode.NOT_FOUND;
import static org.apache.ignite.internal.rest.constants.HttpCode.OK;
+import static
org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.assertThrowsProblem;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.createZipFile;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.fillDummyFile;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -30,10 +32,13 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
+import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.HttpClient;
@@ -43,17 +48,21 @@ import io.micronaut.http.client.multipart.MultipartBody;
import io.micronaut.http.client.multipart.MultipartBody.Builder;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
-import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
import org.apache.ignite.internal.ClusterConfiguration;
-import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.rest.api.deployment.UnitStatus;
import org.apache.ignite.internal.rest.api.deployment.UnitVersionStatus;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.rest.matcher.ProblemMatcher;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -61,43 +70,79 @@ import org.junit.jupiter.api.Test;
* Integration test for REST controller {@link DeploymentManagementController}.
*/
@MicronautTest(rebuildContext = true)
-public class DeploymentManagementControllerTest extends
ClusterPerTestIntegrationTest {
+public class DeploymentManagementControllerTest extends
ClusterPerClassIntegrationTest {
private static final String NODE_URL = "http://localhost:" +
ClusterConfiguration.DEFAULT_BASE_HTTP_PORT;
private Path smallFile;
private Path bigFile;
+ private Path zipFile;
+
private static final long SIZE_IN_BYTES = 1024L;
private static final long BIG_IN_BYTES = 100 * 1024L * 1024L; // 100 MiB
+ private static final String UNIT_ID = "unitId";
+
@Inject
@Client(NODE_URL + "/management/v1/deployment")
HttpClient client;
@BeforeEach
public void setup() throws IOException {
- smallFile = workDir.resolve("small.txt");
- bigFile = workDir.resolve("big.txt");
+ smallFile = WORK_DIR.resolve("small.txt");
+ bigFile = WORK_DIR.resolve("big.txt");
+ zipFile = WORK_DIR.resolve("zip.zip");
if (!Files.exists(smallFile)) {
- IgniteTestUtils.fillDummyFile(smallFile, SIZE_IN_BYTES);
+ fillDummyFile(smallFile, SIZE_IN_BYTES);
}
if (!Files.exists(bigFile)) {
- IgniteTestUtils.fillDummyFile(bigFile, BIG_IN_BYTES);
+ fillDummyFile(bigFile, BIG_IN_BYTES);
+ }
+ if (!Files.exists(zipFile)) {
+ createZipFile(Map.of(
+ "a1/a2", SIZE_IN_BYTES,
+ "b1", SIZE_IN_BYTES,
+ "c1/c2/c3/c4", BIG_IN_BYTES,
+ "d1/d2", SIZE_IN_BYTES,
+ "d1/a2", SIZE_IN_BYTES
+ ), zipFile);
}
}
+ @AfterEach
+ public void cleanup() {
+ List<UnitStatus> list = list(UNIT_ID);
+
+ for (UnitStatus unitStatus : list) {
+ for (UnitVersionStatus versionToStatus :
unitStatus.versionToStatus()) {
+ if (versionToStatus.getStatus() == DEPLOYED) {
+ HttpResponse<Object> response = undeploy(UNIT_ID,
versionToStatus.getVersion());
+ assertThat(response.code(), is(OK.code()));
+ }
+ }
+ }
+
+ await().untilAsserted(() -> {
+ MutableHttpRequest<Object> get = HttpRequest.GET("cluster/units");
+ Collection<UnitStatus> statuses =
client.toBlocking().retrieve(get, Argument.listOf(UnitStatus.class));
+
+ assertThat(statuses, is(empty()));
+ });
+
+ }
+
@Test
public void testDeploySuccessful() {
- String id = "testId";
+ String id = UNIT_ID;
String version = "1.1.1";
HttpResponse<Object> response = deploy(id, version);
assertThat(response.code(), is(OK.code()));
- await().timeout(10, SECONDS).untilAsserted(() -> {
+ await().untilAsserted(() -> {
MutableHttpRequest<Object> get = HttpRequest.GET("cluster/units");
UnitStatus status = client.toBlocking().retrieve(get,
UnitStatus.class);
@@ -108,9 +153,9 @@ public class DeploymentManagementControllerTest extends
ClusterPerTestIntegratio
@Test
public void testDeployBig() {
- String id = "testId";
+ String id = UNIT_ID;
String version = "1.1.1";
- HttpResponse<Object> response = deploy(id, version, bigFile.toFile());
+ HttpResponse<Object> response = deploy(id, version, false, bigFile);
assertThat(response.code(), is(OK.code()));
}
@@ -121,13 +166,13 @@ public class DeploymentManagementControllerTest extends
ClusterPerTestIntegratio
String version = "1.1.1";
HttpClientResponseException e = assertThrows(
HttpClientResponseException.class,
- () -> deploy(id, version, null));
+ () -> deploy(id, version, false, null));
assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
}
@Test
public void testDeployExisted() {
- String id = "testId";
+ String id = UNIT_ID;
String version = "1.1.1";
HttpResponse<Object> response = deploy(id, version);
@@ -141,7 +186,7 @@ public class DeploymentManagementControllerTest extends
ClusterPerTestIntegratio
@Test
public void testDeployUndeploy() {
- String id = "testId";
+ String id = UNIT_ID;
String version = "1.1.1";
HttpResponse<Object> response = deploy(id, version);
@@ -156,7 +201,7 @@ public class DeploymentManagementControllerTest extends
ClusterPerTestIntegratio
public void testUndeployFailed() {
HttpClientResponseException e = assertThrows(
HttpClientResponseException.class,
- () -> undeploy("testId", "1.1.1"));
+ () -> undeploy(UNIT_ID, "1.1.1"));
assertThat(e.getResponse().code(), is(NOT_FOUND.code()));
}
@@ -169,7 +214,7 @@ public class DeploymentManagementControllerTest extends
ClusterPerTestIntegratio
@Test
public void testList() {
- String id = "unitId";
+ String id = UNIT_ID;
deploy(id, "1.1.1");
deploy(id, "1.1.2");
deploy(id, "1.2.1");
@@ -185,21 +230,96 @@ public class DeploymentManagementControllerTest extends
ClusterPerTestIntegratio
assertThat(versions, contains("1.0.0", "1.0.1", "1.1.1", "1.1.2",
"1.2.1", "2.0.0"));
}
+ @Test
+ public void testZipDeploy() {
+ String id = UNIT_ID;
+ String version = "1.1.1";
+ HttpResponse<Object> response = deployZip(id, version);
+
+ assertThat(response.code(), is(OK.code()));
+
+ await().untilAsserted(() -> {
+ MutableHttpRequest<Object> get = HttpRequest.GET("cluster/units");
+ UnitStatus status = client.toBlocking().retrieve(get,
UnitStatus.class);
+
+ assertThat(status.id(), is(id));
+ assertThat(status.versionToStatus(), equalTo(List.of(new
UnitVersionStatus(version, DEPLOYED))));
+ });
+
+ Path workDir0 = CLUSTER.nodeWorkDir(0);
+ Path nodeUnitDirectory =
workDir0.resolve("deployment").resolve(id).resolve(version);
+
+ try (ZipInputStream zis = new
ZipInputStream(Files.newInputStream(zipFile))) {
+ ZipEntry ze;
+ while ((ze = zis.getNextEntry()) != null) {
+
assertTrue(Files.exists(nodeUnitDirectory.resolve(ze.getName())), "File " +
ze.getName() + " does not exist");
+ }
+ } catch (IOException e) {
+ fail(e);
+ }
+ }
+
+ @Test
+ public void testZipDeployAsFile() {
+ String id = UNIT_ID;
+ String version = "1.1.1";
+ HttpResponse<Object> response = deploy(id, version, false, zipFile);
+
+ assertThat(response.code(), is(OK.code()));
+
+ await().untilAsserted(() -> {
+ MutableHttpRequest<Object> get = HttpRequest.GET("cluster/units");
+ UnitStatus status = client.toBlocking().retrieve(get,
UnitStatus.class);
+
+ assertThat(status.id(), is(id));
+ assertThat(status.versionToStatus(), equalTo(List.of(new
UnitVersionStatus(version, DEPLOYED))));
+ });
+
+ Path workDir0 = CLUSTER.nodeWorkDir(0);
+ Path nodeUnitDirectory =
workDir0.resolve("deployment").resolve(id).resolve(version);
+
+
assertTrue(Files.exists(nodeUnitDirectory.resolve(zipFile.getFileName())));
+ }
+
+ @Test
+ public void testDeployFileAsZip() {
+ String id = UNIT_ID;
+ String version = "1.1.1";
+
+ assertThrowsProblem(
+ () -> deploy(id, version, true, smallFile),
+ HttpStatus.BAD_REQUEST,
+ ProblemMatcher.isProblem().withDetail("Only zip file is
supported.")
+ );
+
+ assertThrowsProblem(
+ () -> deploy(id, version, true, zipFile, zipFile),
+ HttpStatus.BAD_REQUEST,
+ ProblemMatcher.isProblem().withDetail("Deployment unit with
unzip supports only single zip file.")
+ );
+ }
+
+ private HttpResponse<Object> deployZip(String id, String version) {
+ return deploy(id, version, true, zipFile);
+ }
+
private HttpResponse<Object> deploy(String id, String version) {
- return deploy(id, version, smallFile.toFile());
+ return deploy(id, version, false, smallFile);
}
- private HttpResponse<Object> deploy(String id, String version, File file) {
+ private HttpResponse<Object> deploy(String id, String version, boolean
unzip, Path... files) {
MultipartBody body = null;
- if (file != null) {
+ if (files != null) {
Builder builder = MultipartBody.builder();
- builder.addPart("unitContent", file);
+ for (Path file : files) {
+ builder.addPart("unitContent", file.toFile());
+ }
body = builder.build();
}
MutableHttpRequest<MultipartBody> post = HttpRequest
- .POST("units/" + id + "/" + version, body)
+ .POST("units/" + (unzip ? "zip/" : "") + id + "/" + version,
body)
.contentType(MediaType.MULTIPART_FORM_DATA);
return client.toBlocking().exchange(post);
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
index e9c67064715..39aa2ef1c72 100644
---
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/CompletedFileUploadSubscriber.java
@@ -19,9 +19,6 @@ package org.apache.ignite.internal.rest.deployment;
import io.micronaut.http.multipart.CompletedFileUpload;
import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.deployunit.DeploymentUnit;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -33,14 +30,18 @@ import org.reactivestreams.Subscription;
* Implementation of {@link Subscriber} based on {@link CompletedFileUpload}
which will collect uploaded files to the
* {@link DeploymentUnit}.
*/
-class CompletedFileUploadSubscriber implements
Subscriber<CompletedFileUpload>, AutoCloseable {
+class CompletedFileUploadSubscriber implements Subscriber<CompletedFileUpload>
{
private static final IgniteLogger LOG =
Loggers.forClass(CompletedFileUploadSubscriber.class);
private final CompletableFuture<DeploymentUnit> result = new
CompletableFuture<>();
- private final Map<String, InputStream> content = new HashMap<>();
+ private final InputStreamCollector collector;
- private IOException ex;
+ private Throwable ex;
+
+ public CompletedFileUploadSubscriber(boolean unzip) {
+ this.collector = unzip ? new ZipInputStreamCollector() : new
InputStreamCollectorImpl();
+ }
@Override
public void onSubscribe(Subscription subscription) {
@@ -50,20 +51,22 @@ class CompletedFileUploadSubscriber implements
Subscriber<CompletedFileUpload>,
@Override
public void onNext(CompletedFileUpload item) {
try {
- content.put(item.getFilename(), item.getInputStream());
+ collector.addInputStream(item.getFilename(),
item.getInputStream());
} catch (IOException e) {
LOG.error("Failed to read file: " + item.getFilename(), e);
- if (ex != null) {
- ex.addSuppressed(e);
- } else {
- ex = e;
- }
+ suppressException(e);
}
}
@Override
public void onError(Throwable throwable) {
- result.completeExceptionally(throwable);
+ try {
+ collector.close();
+ } catch (Exception e) {
+ suppressException(e);
+ }
+ suppressException(throwable);
+ result.completeExceptionally(ex);
}
@Override
@@ -71,22 +74,31 @@ class CompletedFileUploadSubscriber implements
Subscriber<CompletedFileUpload>,
if (ex != null) {
result.completeExceptionally(ex);
} else {
- result.complete(new DeploymentUnit(content));
+ try {
+ DeploymentUnit deploymentUnit = collector.toDeploymentUnit();
+ result.complete(deploymentUnit);
+ } catch (Exception e) {
+ suppressException(e);
+ try {
+ collector.close();
+ } catch (Exception e2) {
+ suppressException(e2);
+ }
+ result.completeExceptionally(ex);
+ }
}
}
- public CompletableFuture<DeploymentUnit> result() {
- return result;
+ private void suppressException(Throwable t) {
+ LOG.warn("Deployment unit subscriber error: ", t);
+ if (ex == null) {
+ ex = t;
+ } else {
+ ex.addSuppressed(t);
+ }
}
- @Override
- public void close() throws Exception {
- result.thenAccept(it -> {
- try {
- it.close();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
+ public CompletableFuture<DeploymentUnit> result() {
+ return result;
}
}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
index 58996ee418b..762a6d89304 100644
---
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.rest.deployment;
+import static org.apache.ignite.deployment.version.Version.parseVersion;
+
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.multipart.CompletedFileUpload;
import java.util.ArrayList;
@@ -65,28 +67,44 @@ public class DeploymentManagementController implements
DeploymentCodeApi, Resour
Optional<InitialDeployMode> deployMode,
Optional<List<String>> initialNodes
) {
+ return doDeploy(unitId, unitVersion, unitContent, deployMode,
initialNodes, false);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> deployZip(String unitId, String
unitVersion, Publisher<CompletedFileUpload> unitContent,
+ Optional<InitialDeployMode> deployMode, Optional<List<String>>
initialNodes) {
+ return doDeploy(unitId, unitVersion, unitContent, deployMode,
initialNodes, true);
+ }
- CompletedFileUploadSubscriber subscriber = new
CompletedFileUploadSubscriber();
+ private CompletableFuture<Boolean> doDeploy(
+ String unitId,
+ String unitVersion,
+ Publisher<CompletedFileUpload> unitContent,
+ Optional<InitialDeployMode> deployMode,
+ Optional<List<String>> initialNodes,
+ boolean zip
+ ) {
+ CompletedFileUploadSubscriber subscriber = new
CompletedFileUploadSubscriber(zip);
unitContent.subscribe(subscriber);
NodesToDeploy nodesToDeploy = initialNodes.map(NodesToDeploy::new)
.orElseGet(() -> new
NodesToDeploy(fromInitialDeployMode(deployMode)));
- return subscriber.result().thenCompose(content -> {
- return deployment.deployAsync(unitId,
Version.parseVersion(unitVersion), content, nodesToDeploy);
- }).whenComplete((res, throwable) -> {
- try {
- subscriber.close();
- } catch (Exception e) {
- LOG.error("Failed to close subscriber", e);
- }
- });
-
+ return subscriber.result().thenCompose(deploymentUnit ->
+ deployment.deployAsync(unitId, parseVersion(unitVersion),
deploymentUnit, nodesToDeploy)
+ .whenComplete((unitStatus, throwable) -> {
+ try {
+ deploymentUnit.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close subscriber", e);
+ }
+ })
+ );
}
@Override
public CompletableFuture<Boolean> undeploy(String unitId, String
unitVersion) {
- return deployment.undeployAsync(unitId,
Version.parseVersion(unitVersion));
+ return deployment.undeployAsync(unitId, parseVersion(unitVersion));
}
@Override
@@ -107,7 +125,7 @@ public class DeploymentManagementController implements
DeploymentCodeApi, Resour
private CompletableFuture<List<UnitStatuses>> clusterStatuses(String
unitId, Optional<String> version) {
if (version.isPresent()) {
- Version parsedVersion = Version.parseVersion(version.get());
+ Version parsedVersion = parseVersion(version.get());
return deployment.clusterStatusAsync(unitId, parsedVersion)
.thenApply(deploymentStatus -> {
if (deploymentStatus != null) {
@@ -140,7 +158,7 @@ public class DeploymentManagementController implements
DeploymentCodeApi, Resour
private CompletableFuture<List<UnitStatuses>> nodeStatuses(String unitId,
Optional<String> version) {
if (version.isPresent()) {
- Version parsedVersion = Version.parseVersion(version.get());
+ Version parsedVersion = parseVersion(version.get());
return deployment.nodeStatusAsync(unitId, parsedVersion)
.thenApply(deploymentStatus -> {
if (deploymentStatus != null) {
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java
new file mode 100644
index 00000000000..5f382f1902d
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.deployment;
+
+import java.io.InputStream;
+import org.apache.ignite.internal.deployunit.DeploymentUnit;
+
+/**
+ * Interface for collecting input streams and converting them into deployment
units.
+ *
+ * <p>This interface provides a contract for accumulating input streams from
various sources
+ * (such as file uploads, network transfers, or other data sources) and
organizing them into
+ * a structured deployment unit that can be processed by the Apache Ignite
deployment system.
+ *
+ * <p>Implementations of this interface are typically used in REST endpoints,
file upload
+ * handlers, and other scenarios where deployment content is received in
stream format and
+ * needs to be converted into deployment units for processing.
+ */
+public interface InputStreamCollector extends AutoCloseable {
+ /**
+ * Adds an input stream with the specified filename to the collection.
+ *
+ * <p>This method accepts an input stream representing a file or resource
that should be
+ * included in the deployment unit. The filename parameter provides the
logical name or
+ * path for the content within the deployment unit structure.
+ *
+ * <p>Once added, the input stream becomes managed by this collector and
should not be
+ * closed directly by the caller. The collector is responsible for proper
cleanup of
+ * all managed streams.
+ *
+ *
+ * @param filename the logical name or path for the content within the
deployment unit;
+ * must not be {@code null} or empty.
+ * @param is the input stream containing the content to be added; must not
be {@code null}.
+ */
+ void addInputStream(String filename, InputStream is);
+
+ /**
+ * Converts the collected input streams into a deployment unit.
+ *
+ * <p>This method creates a {@link DeploymentUnit} instance containing all
the input streams
+ * that have been added to this collector. The specific type of deployment
unit returned
+ * depends on the implementation and the characteristics of the collected
content.
+ *
+ * @return a deployment unit containing all collected input streams; never
{@code null}.
+ */
+ DeploymentUnit toDeploymentUnit();
+}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java
new file mode 100644
index 00000000000..a6d441b012a
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/InputStreamCollectorImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.deployment;
+
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.deployunit.DeploymentUnit;
+import org.apache.ignite.internal.deployunit.FilesDeploymentUnit;
+
+/**
+ * Standard implementation of {@link InputStreamCollector} for collecting
regular file content.
+ *
+ * <p>This implementation provides a straightforward approach to collecting
input streams and
+ * converting them into a standard {@link FilesDeploymentUnit}. It maintains
an internal map
+ * of filename-to-stream associations and creates deployment units containing
regular
+ * (non-compressed) file content.
+ */
+public class InputStreamCollectorImpl implements InputStreamCollector {
+ /**
+ * Internal storage for collected input streams mapped by their filenames.
+ * The map maintains the association between logical file paths and their
content streams.
+ */
+ private final Map<String, InputStream> content = new HashMap<>();
+
+ @Override
+ public void addInputStream(String filename, InputStream is) {
+ content.put(filename, is);
+ }
+
+ @Override
+ public DeploymentUnit toDeploymentUnit() {
+ return new FilesDeploymentUnit(content);
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeAll(content.values());
+ }
+}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java
new file mode 100644
index 00000000000..058269cc148
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/ZipInputStreamCollector.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.deployment;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.zip.ZipInputStream;
+import org.apache.ignite.internal.deployunit.DeploymentUnit;
+import org.apache.ignite.internal.deployunit.ZipDeploymentUnit;
+import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitZipException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Advanced implementation of {@link InputStreamCollector} that automatically
detects and handles ZIP content.
+ *
+ * <p>This decorator implementation automatically detects ZIP archive and
throws exception in case when provided more than one archive.
+ */
+public class ZipInputStreamCollector implements InputStreamCollector {
+ private static final IgniteLogger LOG =
Loggers.forClass(ZipInputStreamCollector.class);
+ private static final byte[] ZIP_MAGIC_HEADER = {0x50, 0x4b, 0x03, 0x04};
+
+ private ZipInputStream zis;
+
+ private IgniteException igniteException;
+
+ @Override
+ public void addInputStream(String filename, InputStream is) {
+ if (zis != null || igniteException != null) {
+ // We don't need the stream anymore, so we close it to avoid
resource leak.
+ safeClose(is);
+ if (igniteException == null) {
+ igniteException = new DeploymentUnitZipException("Deployment
unit with unzip supports only single zip file.");
+ }
+ return;
+ }
+
+ InputStream result = is.markSupported() ? is : new
BufferedInputStream(is);
+
+ if (isZip(result)) {
+ zis = new ZipInputStream(result);
+ } else {
+ safeClose(result);
+ igniteException = new DeploymentUnitZipException("Only zip file is
supported.");
+ }
+ }
+
+ private static void safeClose(InputStream is) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close non-zip input stream.", e);
+ }
+ }
+
+ private static boolean isZip(InputStream is) {
+ try {
+ boolean isZip = Objects.deepEquals(ZIP_MAGIC_HEADER,
is.readNBytes(4));
+ is.reset();
+ return isZip;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (zis != null) {
+ zis.close();
+ }
+ }
+
+ @Override
+ public DeploymentUnit toDeploymentUnit() {
+ if (igniteException != null) {
+ throw igniteException;
+ }
+ return new ZipDeploymentUnit(zis);
+ }
+}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/exception/handler/DeploymentUnitZipExceptionHandler.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/exception/handler/DeploymentUnitZipExceptionHandler.java
new file mode 100644
index 00000000000..36e5007a2e2
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/exception/handler/DeploymentUnitZipExceptionHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.deployment.exception.handler;
+
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.http.HttpRequest;
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.server.exceptions.ExceptionHandler;
+import jakarta.inject.Singleton;
+import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitZipException;
+import org.apache.ignite.internal.rest.api.Problem;
+import org.apache.ignite.internal.rest.constants.HttpCode;
+import org.apache.ignite.internal.rest.problem.HttpProblemResponse;
+
+/**
+ * Exception handler of {@link DeploymentUnitZipException}.
+ */
+@Singleton
+@Requires(classes = {DeploymentUnitZipException.class, ExceptionHandler.class})
+public class DeploymentUnitZipExceptionHandler implements
ExceptionHandler<DeploymentUnitZipException, HttpResponse<? extends Problem>> {
+
+ @Override
+ public HttpResponse<? extends Problem> handle(HttpRequest request,
DeploymentUnitZipException exception) {
+ return HttpProblemResponse.from(
+ Problem.fromHttpCode(HttpCode.BAD_REQUEST)
+ .detail(exception.getMessage()).build()
+ );
+ }
+}