This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new b9329d27d8 Better test coverage - usage, examples, operations
b9329d27d8 is described below
commit b9329d27d8f9ed1b8f64420d6df9eb04cea9c455
Author: JiriOndrusek <[email protected]>
AuthorDate: Fri Jun 20 07:59:54 2025 +0200
Better test coverage - usage, examples, operations
Fixes #7445
---
.../deployment/AzureCoreSupportProcessor.java | 17 ++
.../deployment/AzureStorageDatalakeProcessor.java | 8 +-
.../azure/azure-storage-datalake/pom.xml | 39 +++
.../datalake/it/AzureStorageDatalakeResource.java | 62 ++++
.../datalake/it/AzureStorageDatalakeRoutes.java | 162 +++++++++++
.../datalake/it/AzureStorageDatalakeUtil.java | 0
.../datalake/it/AzureStorageDatalakeTest.java | 311 ++++++++++++++++++++-
.../it/AzureStorageDatalakeTestResource.java | 33 ++-
integration-tests/azure-grouped/pom.xml | 17 ++
9 files changed, 640 insertions(+), 9 deletions(-)
diff --git
a/extensions-support/azure-core/deployment/src/main/java/org/apache/camel/quarkus/support/reactor/netty/deployment/AzureCoreSupportProcessor.java
b/extensions-support/azure-core/deployment/src/main/java/org/apache/camel/quarkus/support/reactor/netty/deployment/AzureCoreSupportProcessor.java
index b603a2a2c9..6b8fadbe25 100644
---
a/extensions-support/azure-core/deployment/src/main/java/org/apache/camel/quarkus/support/reactor/netty/deployment/AzureCoreSupportProcessor.java
+++
b/extensions-support/azure-core/deployment/src/main/java/org/apache/camel/quarkus/support/reactor/netty/deployment/AzureCoreSupportProcessor.java
@@ -17,6 +17,7 @@
package org.apache.camel.quarkus.support.reactor.netty.deployment;
import java.io.IOException;
+import java.util.LinkedHashSet;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
@@ -25,6 +26,8 @@ import java.util.stream.Stream;
import com.azure.core.annotation.ServiceInterface;
import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.HttpClientProvider;
+import com.azure.json.JsonSerializable;
+import com.azure.xml.XmlSerializable;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
@@ -71,6 +74,20 @@ public class AzureCoreSupportProcessor {
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(httpResponseExceptionClasses.toArray(new
String[0]))
.methods()
.build());
+
+ // implementations of serializers are used during errors reporting
+ LinkedHashSet<String> serializers = new LinkedHashSet<>(
+
combinedIndex.getIndex().getAllKnownImplementations(JsonSerializable.class).stream()
+ .map(ci -> ci.name().toString())
+ .toList());
+
serializers.addAll(combinedIndex.getIndex().getAllKnownImplementations(XmlSerializable.class).stream()
+ .map(ci -> ci.name().toString())
+ .toList());
+
+
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(serializers.toArray(new
String[0]))
+ .methods()
+ .fields()
+ .build());
}
@BuildStep
diff --git
a/extensions/azure-storage-datalake/deployment/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/deployment/AzureStorageDatalakeProcessor.java
b/extensions/azure-storage-datalake/deployment/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/deployment/AzureStorageDatalakeProcessor.java
index 1bc96b0117..212e481698 100644
---
a/extensions/azure-storage-datalake/deployment/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/deployment/AzureStorageDatalakeProcessor.java
+++
b/extensions/azure-storage-datalake/deployment/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/deployment/AzureStorageDatalakeProcessor.java
@@ -16,9 +16,7 @@
*/
package org.apache.camel.quarkus.component.azure.storage.datalake.deployment;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.stream.Collectors;
+import java.util.LinkedHashSet;
import com.azure.core.annotation.ServiceInterface;
import io.quarkus.deployment.annotations.BuildProducer;
@@ -54,10 +52,10 @@ class AzureStorageDatalakeProcessor {
ReflectiveClassBuildItem registerForReflection(CombinedIndexBuildItem
combinedIndex) {
IndexView index = combinedIndex.getIndex();
- List<String> dtos = new LinkedList<>(index.getKnownClasses().stream()
+ LinkedHashSet<String> dtos = new
LinkedHashSet<>(index.getKnownClasses().stream()
.map(ci -> ci.name().toString())
.filter(n ->
n.startsWith("com.azure.storage.file.datalake.implementation.models"))
- .collect(Collectors.toList()));
+ .toList());
dtos.add("com.azure.storage.file.datalake.implementation.ServicesImpl$ServicesService");
diff --git a/integration-test-groups/azure/azure-storage-datalake/pom.xml
b/integration-test-groups/azure/azure-storage-datalake/pom.xml
index 691116ea99..9dd6798ba3 100644
--- a/integration-test-groups/azure/azure-storage-datalake/pom.xml
+++ b/integration-test-groups/azure/azure-storage-datalake/pom.xml
@@ -35,6 +35,14 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-azure-storage-datalake</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-file</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct</artifactId>
+ </dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
@@ -60,6 +68,11 @@
<artifactId>camel-quarkus-integration-tests-support-azure</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
@@ -85,6 +98,32 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-file-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</profile>
<profile>
diff --git
a/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeResource.java
b/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeResource.java
index 699a1dd25a..a3edb33a83 100644
---
a/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeResource.java
+++
b/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeResource.java
@@ -16,8 +16,11 @@
*/
package org.apache.camel.quarkus.component.azure.storage.datalake.it;
+import java.io.ByteArrayOutputStream;
import java.net.URI;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -38,9 +41,12 @@ import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
+import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.azure.storage.datalake.DataLakeConstants;
import
org.apache.camel.component.azure.storage.datalake.DataLakeOperationsDefinition;
@@ -56,6 +62,9 @@ public class AzureStorageDatalakeResource {
@Inject
ConsumerTemplate consumerTemplate;
+ @Inject
+ CamelContext camelContext;
+
@ConfigProperty(name = "azure.storage.account-name")
Optional<String> azureStorageAccountName;
@@ -169,6 +178,53 @@ public class AzureStorageDatalakeResource {
10000, String.class);
}
+ @Path("/route/{route}/filesystem/{filesystem}")
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ public Object consumer(@PathParam("route") String routeName,
+ @PathParam("filesystem") String filesystem,
+ @QueryParam("useOutputStream") boolean useOutputStream,
+ Map<String, Object> headers) throws Exception {
+
+ ByteArrayOutputStream inMemoryStream = new ByteArrayOutputStream();
+
+ Map<String, Object> _headers = new HashMap();
+ if (headers != null) {
+ _headers.putAll(headers);
+
+ }
+ _headers.put("filesystemName", filesystem);
+ _headers.put("accountName", azureStorageAccountName.get());
+
+ Exchange exchange = producerTemplate.request(
+ "direct:" + routeName,
+ e -> {
+ e.getIn().setHeaders(_headers);
+ if (useOutputStream &&
"datalakeGetFile".equals(routeName)) {
+ e.getIn().setBody(inMemoryStream);
+ }
+ });
+
+ Object o = exchange.getIn().getBody();
+ switch (routeName) {
+ case "datalakeListFileSystem":
+ return ((List<FileSystemItem>) o).stream()
+ .map(FileSystemItem::getName)
+ .collect(Collectors.toList());
+ case "datalakeListPaths":
+ return ((List<PathItem>) o).stream()
+ .map(PathItem::getName)
+ .collect(Collectors.toList());
+ case "datalakeGetFile":
+ if (useOutputStream) {
+ return inMemoryStream.toString();
+ }
+ break;
+ }
+
+ return exchange.getIn().getBody(String.class);
+ }
+
private String componentUri(final String filesystem, final
DataLakeOperationsDefinition operation) {
return
String.format("azure-storage-datalake://%s%s?serviceClient=#azureDatalakeServiceClient&operation=%s",
azureStorageAccountName,
@@ -176,4 +232,10 @@ public class AzureStorageDatalakeResource {
operation.name());
}
+ @Path("/start/{routeId}")
+ @GET
+ public Response startRoute(@PathParam("routeId") String routeId) throws
Exception {
+ camelContext.getRouteController().startRoute(routeId);
+ return Response.ok().build();
+ }
}
diff --git
a/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeRoutes.java
b/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeRoutes.java
new file mode 100644
index 0000000000..d184524d2d
--- /dev/null
+++
b/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeRoutes.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.azure.storage.datalake.it;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import com.azure.storage.file.datalake.models.ListFileSystemsOptions;
+import com.azure.storage.file.datalake.options.FileQueryOptions;
+import jakarta.enterprise.context.ApplicationScoped;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.azure.storage.datalake.DataLakeConstants;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+@ApplicationScoped
+public class AzureStorageDatalakeRoutes extends RouteBuilder {
+
+ public static final String FILE_NAME = "operations.txt";
+ public static final String FILE_NAME2 = "test/file.txt";
+ public static final String CONSUMER_FILE_NAME = "file_for_download.txt";
+ public static final String CONSUMER_FILE_NAME2 = "file_for_download2.txt";
+ private static final String CLIENT_SUFFIX =
"&serviceClient=#azureDatalakeServiceClient";
+
+ @Override
+ public void configure() throws Exception {
+
+ String tmpFolder =
ConfigProvider.getConfig().getValue("cqDatalakeTmpFolder", String.class);
+ String consumerFilesystem =
ConfigProvider.getConfig().getValue("cqCDatalakeConsumerFilesystem",
String.class);
+
+ /* Consumer examples */
+
+ //Consume a file from the storage datalake into a file using the file
component
+ from("azure-storage-datalake://" +
AzureStorageDatalakeUtil.getRealAccountKeyFromEnv() + "/" + consumerFilesystem
+ + "?fileName=" + CONSUMER_FILE_NAME
+ + CLIENT_SUFFIX)
+ .routeId("consumeWithFileComponent")
+ .autoStartup(false)
+ .to("file:" + tmpFolder + "/consumer-files?fileName=" +
CONSUMER_FILE_NAME);
+
+ //write to a file without using the file component
+ from("azure-storage-datalake://" +
AzureStorageDatalakeUtil.getRealAccountKeyFromEnv() + "/" + consumerFilesystem
+ + "?fileName=" + CONSUMER_FILE_NAME2 + "&fileDir=" + tmpFolder
+ "/consumer-files&delay=3000000"
+ + CLIENT_SUFFIX)
+ .routeId("consumeWithoutFileComponent")
+ .autoStartup(false)
+ .log("File downloaded");
+
+ //batch consumer
+ from("azure-storage-datalake://" +
AzureStorageDatalakeUtil.getRealAccountKeyFromEnv() + "/" + consumerFilesystem
+ + "?fileDir=" + tmpFolder +
"/consumer-files/batch&path=/&delay=3000000" + CLIENT_SUFFIX)
+ .routeId("consumeBatch")
+ .autoStartup(false)
+ .log("File downloaded");
+
+ /* Producer examples */
+
+ //listFileSystem
+ from("direct:datalakeListFileSystem")
+ .process(exchange -> {
+
exchange.getIn().setHeader(DataLakeConstants.LIST_FILESYSTEMS_OPTIONS,
+ new
ListFileSystemsOptions().setMaxResultsPerPage(10));
+ })
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=listFileSystem"
+ + CLIENT_SUFFIX);
+
+ //createFileSystem
+ from("direct:datalakeCreateFilesystem")
+
.toD("azure-storage-datalake://${header.accountName}?operation=createFileSystem"
+ CLIENT_SUFFIX);
+
+ //listPaths
+ from("direct:datalakeListPaths")
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=listPaths"
+ + CLIENT_SUFFIX);
+
+ //getFile
+ from("direct:datalakeGetFile")
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=getFile&fileName=${header.fileName}"
+ + CLIENT_SUFFIX);
+
+ //deleteFile
+ from("direct:datalakeDeleteFile")
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=deleteFile&fileName="
+ + FILE_NAME + CLIENT_SUFFIX);
+
+ //downloadToFile
+ from("direct:datalakeDownloadToFile")
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=downloadToFile&fileName="
+ + FILE_NAME + "&fileDir=${header.tmpFolder}" +
CLIENT_SUFFIX);
+
+ //downloadLink
+ from("direct:datalakeDownloadLink")
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=downloadLink&fileName="
+ + FILE_NAME + CLIENT_SUFFIX);
+
+ //appendToFile
+ from("direct:datalakeAppendToFile")
+ .process(exchange -> {
+ final String data = exchange.getIn().getHeader("append",
String.class);
+ final InputStream inputStream = new
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+ exchange.getIn().setBody(inputStream);
+ })
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=appendToFile&fileName="
+ + FILE_NAME + CLIENT_SUFFIX);
+
+ //flushToFile
+ from("direct:datalakeFlushToFile")
+ .process(exchange -> {
+ exchange.getIn().setHeader(DataLakeConstants.POSITION, 8);
+ })
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=flushToFile&fileName="
+ + FILE_NAME + CLIENT_SUFFIX);
+
+ //openQueryInputStream
+ from("direct:openQueryInputStream")
+ .process(exchange -> {
+ exchange.getIn().setHeader(DataLakeConstants.QUERY_OPTIONS,
+ new FileQueryOptions("SELECT * from BlobStorage"));
+ })
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=openQueryInputStream&fileName="
+ + FILE_NAME + CLIENT_SUFFIX);
+
+ //upload
+ from("direct:datalakeUpload")
+ .process(exchange -> {
+ String fileContent =
exchange.getIn().getHeader("fileContent", String.class);
+ final InputStream inputStream = new
ByteArrayInputStream(fileContent.getBytes(StandardCharsets.UTF_8));
+ exchange.getIn().setBody(inputStream);
+ })
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=upload&fileName="
+ + FILE_NAME + CLIENT_SUFFIX);
+
+ // uploadFromFile
+ from("direct:datalakeUploadFromFile")
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=uploadFromFile&fileName="
+ + FILE_NAME2 + CLIENT_SUFFIX);
+
+ // createFile
+ from("direct:datalakeCreateFile")
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=createFile&fileName=${header.fileName}"
+ + CLIENT_SUFFIX);
+
+ //deleteDirectory
+ from("direct:datalakeDeleteDirectory")
+
.toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=deleteDirectory"
+ + CLIENT_SUFFIX);
+ }
+}
diff --git
a/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeUtil.java
b/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeUtil.java
similarity index 100%
rename from
integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeUtil.java
rename to
integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeUtil.java
diff --git
a/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTest.java
b/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTest.java
index 9bf9b0c66f..978331a9ff 100644
---
a/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTest.java
+++
b/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTest.java
@@ -16,16 +16,30 @@
*/
package org.apache.camel.quarkus.component.azure.storage.datalake.it;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
import io.restassured.RestAssured;
+import io.restassured.http.ContentType;
+import org.apache.camel.component.azure.storage.datalake.DataLakeConstants;
import org.apache.commons.lang3.RandomStringUtils;
+import org.eclipse.microprofile.config.ConfigProvider;
import org.hamcrest.Matchers;
import org.jboss.logging.Logger;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
//Disable tests dynamically in beforeEach method, to reflect preferred env
name in case they are used (see README.adoc)
@QuarkusTest
@@ -43,12 +57,13 @@ class AzureStorageDatalakeTest {
public void beforeEach() {
Assumptions.assumeTrue(AzureStorageDatalakeUtil.isRalAccountProvided(),
"Azure Data Lake credentials were not provided");
+
}
@Test
public void crud() {
- final String filesystem = "cqfs" + RandomStringUtils.randomNumeric(16);
- final String filename = "file" + RandomStringUtils.randomNumeric(16);
+ final String filesystem = "cqfscrud" +
RandomStringUtils.randomNumeric(16);
+ final String filename = "file.txt";
/* The filesystem does not exist initially */
RestAssured.get("/azure-storage-datalake/filesystem/" + filesystem)
@@ -122,4 +137,296 @@ class AzureStorageDatalakeTest {
}
}
+
+ @Test
+ public void consumerRoutes() throws IOException {
+ final String filename = AzureStorageDatalakeRoutes.CONSUMER_FILE_NAME;
+ final String filename2 =
AzureStorageDatalakeRoutes.CONSUMER_FILE_NAME2;
+ String filesystem =
ConfigProvider.getConfig().getValue("cqCDatalakeConsumerFilesystem",
String.class);
+ final String content = "Hello from download test! " +
RandomStringUtils.randomNumeric(16);
+ final String tmpFolder =
ConfigProvider.getConfig().getValue("cqDatalakeTmpFolder", String.class);
+
+ /* The filesystem does not exist initially */
+ RestAssured.get("/azure-storage-datalake/filesystem/" + filesystem)
+ .then()
+ .statusCode(200)
+ .body("", Matchers.not(Matchers.hasItem(filesystem)));
+
+ try {
+ /* Create the filesystem */
+ RestAssured.given()
+ .post("/azure-storage-datalake/filesystem/" + filesystem)
+ .then()
+ .statusCode(201);
+
+ /* Upload */
+ RestAssured.given()
+ .body(content)
+ .post("/azure-storage-datalake/filesystem/" + filesystem +
"/path/" + filename)
+ .then()
+ .statusCode(201);
+
+ LOG.info("Consume a file from the storage datalake into a file
using the file component");
+
RestAssured.get("/azure-storage-datalake/start/consumeWithFileComponent")
+ .then()
+ .statusCode(200);
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(10,
TimeUnit.SECONDS).untilAsserted(
+ () -> {
+ Path downloadedFilePath = Path.of(tmpFolder,
"consumer-files").resolve(filename);
+
Assertions.assertTrue(downloadedFilePath.toFile().exists());
+ Assertions.assertEquals(content,
Files.readString(downloadedFilePath));
+ });
+
+ LOG.info("write to a file without using the file component");
+
+ /* Upload */
+ RestAssured.given()
+ .body(content)
+ .post("/azure-storage-datalake/filesystem/" + filesystem +
"/path/" + filename2)
+ .then()
+ .statusCode(201);
+
RestAssured.get("/azure-storage-datalake/start/consumeWithoutFileComponent")
+ .then()
+ .statusCode(200);
+ Awaitility.await().pollInterval(5, TimeUnit.SECONDS).atMost(10,
TimeUnit.SECONDS).untilAsserted(
+ () -> {
+ Path downloadedFilePath = Path.of(tmpFolder,
"consumer-files", filename2);
+
Assertions.assertTrue(downloadedFilePath.toFile().exists());
+ Assertions.assertEquals(content,
Files.readString(downloadedFilePath));
+ });
+
+ LOG.info("batch consumer");
+ Assertions.assertTrue(Path.of(tmpFolder, "consumer-files",
"batch").toFile().mkdir(),
+ "Folder for batch consumer has to exist");
+
+ RestAssured.get("/azure-storage-datalake/start/consumeBatch")
+ .then()
+ .statusCode(200);
+ Awaitility.await().pollInterval(5, TimeUnit.SECONDS).atMost(10,
TimeUnit.SECONDS).untilAsserted(
+ () -> {
+ Path downloadedFilePath = Path.of(tmpFolder,
"consumer-files", "batch").resolve(filename);
+
Assertions.assertTrue(downloadedFilePath.toFile().exists());
+ Assertions.assertEquals(content,
Files.readString(downloadedFilePath));
+ Path downloadedFilePath2 = Path.of(tmpFolder,
"consumer-files", "batch", filename2);
+
Assertions.assertTrue(downloadedFilePath2.toFile().exists());
+ Assertions.assertEquals(content,
Files.readString(downloadedFilePath2));
+ });
+
+ } finally {
+ /* Clean up */
+ RestAssured.given()
+ .delete("/azure-storage-datalake/filesystem/" + filesystem)
+ .then()
+ .statusCode(204);
+ }
+
+ }
+
+ @Test
+ public void producerRoutes() throws IOException {
+ final String filesystem = "cqfsops" +
RandomStringUtils.randomNumeric(16);
+ final String filename = AzureStorageDatalakeRoutes.FILE_NAME;
+ final String tmpFolder =
ConfigProvider.getConfig().getValue("cqDatalakeTmpFolder", String.class);
+
+ RestAssured.get("/azure-storage-datalake/filesystem/" + filesystem)
+ .then()
+ .statusCode(200)
+ .body("", Matchers.not(Matchers.hasItem(filesystem)));
+
+ try {
+ LOG.info("step - createFileSystem");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Map.of(DataLakeConstants.FILESYSTEM_NAME,
filesystem))
+
.post("/azure-storage-datalake/route/datalakeCreateFilesystem/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200);
+
+ LOG.info("step - listFileSystem");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+
.post("/azure-storage-datalake/route/datalakeListFileSystem/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200)
+ .body("", Matchers.hasItem(filesystem));
+
+ LOG.info("step - upload");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Map.of("fileContent", "Hello World from Camel!"))
+
.post("/azure-storage-datalake/route/datalakeUpload/filesystem/" + filesystem)
+ .then()
+ .statusCode(200);
+
+ LOG.info("step - listPaths");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Collections.emptyMap())
+
.post("/azure-storage-datalake/route/datalakeListPaths/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200)
+ .body("", Matchers.hasItem(filename));
+
+ LOG.info("step - getFile - via OutputStream");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .queryParam("useOutputStream", true)
+ .body(Map.of("fileName",
AzureStorageDatalakeRoutes.FILE_NAME))
+
.post("/azure-storage-datalake/route/datalakeGetFile/filesystem/" + filesystem)
+ .then()
+ .statusCode(200)
+ .body(Matchers.is("Hello World from Camel!"));
+
+ LOG.info("step - getFile - via InputStream");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Map.of("fileName",
AzureStorageDatalakeRoutes.FILE_NAME))
+
.post("/azure-storage-datalake/route/datalakeGetFile/filesystem/" + filesystem)
+ .then()
+ .statusCode(200)
+ .body(Matchers.is("Hello World from Camel!"));
+
+ LOG.info("step - downloadToFile");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Map.of("tmpFolder", tmpFolder))
+
.post("/azure-storage-datalake/route/datalakeDownloadToFile/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200)
+ .body(Matchers.is("Hello World from Camel!"));
+
+ Path path = Path.of(tmpFolder, filename);
+ Assertions.assertTrue(Files.exists(path));
+ Assertions.assertEquals("Hello World from Camel!",
Files.readString(path));
+
+ LOG.info("step - downloadLink");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Collections.emptyMap())
+
.post("/azure-storage-datalake/route/datalakeDownloadLink/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200)
+ .body(Matchers.startsWith(
+ "https://" +
ConfigProvider.getConfig().getValue("azure.storage.account-name",
String.class)));
+
+ LOG.info("step - appendToFile");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Map.of("append", "appended"))
+
.post("/azure-storage-datalake/route/datalakeAppendToFile/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200);
+ //append does not happen without flush
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Map.of("fileName",
AzureStorageDatalakeRoutes.FILE_NAME))
+
.post("/azure-storage-datalake/route/datalakeGetFile/filesystem/" + filesystem)
+ .then()
+ .statusCode(200)
+ .body(Matchers.is("Hello World from Camel!"));
+
+ LOG.info("step - datalakeFlushToFile");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Collections.emptyMap())
+
.post("/azure-storage-datalake/route/datalakeFlushToFile/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200);
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .queryParam("useOutputStream", true)
+ .body(Map.of("fileName",
AzureStorageDatalakeRoutes.FILE_NAME))
+
.post("/azure-storage-datalake/route/datalakeGetFile/filesystem/" + filesystem)
+ .then()
+ .statusCode(200)
+ .body(Matchers.is("Hello World from Camel!appended"));
+
+ LOG.info("step - openQueryInputStream");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Collections.emptyMap())
+
.post("/azure-storage-datalake/route/openQueryInputStream/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200)
+ .body(Matchers.is("Hello World from Camel!appended\n"));
+
+ LOG.info("step - deleteFile");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Collections.emptyMap())
+
.post("/azure-storage-datalake/route/datalakeDeleteFile/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200)
+ .body(Matchers.is("true"));
+
+ LOG.info("step - listPaths");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Collections.emptyMap())
+
.post("/azure-storage-datalake/route/datalakeListPaths/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200)
+ .body("", Matchers.not(Matchers.hasItem(filename)));
+
+ LOG.info("step - uploadFromFile");
+ File f = File.createTempFile("uploadFromFile", ".txt", new
File(tmpFolder));
+ String content2 = UUID.randomUUID().toString();
+ Files.writeString(f.toPath(), content2);
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Map.of(DataLakeConstants.PATH, f.getAbsolutePath()))
+
.post("/azure-storage-datalake/route/datalakeUploadFromFile/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200);
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .queryParam("useOutputStream", true)
+ .body(Map.of("fileName",
AzureStorageDatalakeRoutes.FILE_NAME2))
+
.post("/azure-storage-datalake/route/datalakeGetFile/filesystem/" + filesystem)
+ .then()
+ .statusCode(200)
+ .body(Matchers.is(content2));
+
+ LOG.info("step - createFile");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Map.of("fileName", "emptyFile.txt",
DataLakeConstants.DIRECTORY_NAME, "emptyTest"))
+
.post("/azure-storage-datalake/route/datalakeCreateFile/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200);
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Collections.emptyMap())
+
.post("/azure-storage-datalake/route/datalakeListPaths/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200)
+ .body("", Matchers.hasItem("test"))
+ .body("", Matchers.hasItem("emptyTest"));
+
+ LOG.info("step - deleteDirectory");
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Map.of(DataLakeConstants.DIRECTORY_NAME,
"emptyTest", "CamelAzureStorageDataLakeRecursive", true))
+
.post("/azure-storage-datalake/route/datalakeDeleteDirectory/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200);
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(Collections.emptyMap())
+
.post("/azure-storage-datalake/route/datalakeListPaths/filesystem/" +
filesystem)
+ .then()
+ .statusCode(200)
+ .body("", Matchers.hasItem("test"))
+ .body("", Matchers.not(Matchers.hasItem("emptyTest")));
+
+ } finally {
+ /* Clean up */
+ RestAssured.given()
+ .delete("/azure-storage-datalake/filesystem/" + filesystem)
+ .then()
+ .statusCode(204);
+ }
+
+ }
}
diff --git
a/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTestResource.java
b/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTestResource.java
index 27a09589b0..0417f14e54 100644
---
a/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTestResource.java
+++
b/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTestResource.java
@@ -17,16 +17,24 @@
package org.apache.camel.quarkus.component.azure.storage.datalake.it;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
import java.util.Map;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import org.apache.camel.quarkus.test.mock.backend.MockBackendUtils;
+import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AzureStorageDatalakeTestResource implements
QuarkusTestResourceLifecycleManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(AzureStorageDatalakeTestResource.class);
+ private Path tmpFolder;
+
@Override
public Map<String, String> start() {
String realAzureStorageAccountName =
AzureStorageDatalakeUtil.getRealAccountNameFromEnv();
@@ -41,13 +49,34 @@ public class AzureStorageDatalakeTestResource implements
QuarkusTestResourceLife
MockBackendUtils.logRealBackendUsed();
}
+ //create tmp folder (for routes)
+ String tmpFolderPath = null;
+ try {
+ tmpFolder =
Files.createTempDirectory("CqAzureDatalakeTestTmpFolder");
+ tmpFolderPath = tmpFolder.toFile().getAbsolutePath();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
return Map.of(
"azure.datalake.service.url",
- "https://" + realAzureStorageAccountName +
".dfs.core.windows.net");
+ "https://" + realAzureStorageAccountName +
".dfs.core.windows.net",
+ "cqDatalakeTmpFolder", tmpFolderPath,
+ "cqCDatalakeConsumerFilesystem", "cqfsconsumer" +
RandomStringUtils.randomNumeric(16));
}
+ @SuppressWarnings("ResultOfMethodCallIgnored")
@Override
public void stop() {
- //nothing
+ if (tmpFolder != null && tmpFolder.toFile().exists()) {
+ try (var dirStream = Files.walk(tmpFolder)) {
+ dirStream
+ .map(Path::toFile)
+ .sorted(Comparator.reverseOrder())
+ .forEach(File::delete);
+ } catch (IOException e) {
+ //nothing
+ }
+ }
}
}
diff --git a/integration-tests/azure-grouped/pom.xml
b/integration-tests/azure-grouped/pom.xml
index ecb4596d0a..caf025aaab 100644
--- a/integration-tests/azure-grouped/pom.xml
+++ b/integration-tests/azure-grouped/pom.xml
@@ -76,6 +76,10 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-direct</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-file</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-integration-test-support</artifactId>
@@ -271,6 +275,19 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-file-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-mock-deployment</artifactId>