This is an automated email from the ASF dual-hosted git repository.
ppalaga pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 9bc10ec CAMEL-16307 azure-storage-datalake:...?operation=getFile
appends newline to the file content
9bc10ec is described below
commit 9bc10ec094d16d76c864ba719998b9ca0104e837
Author: Peter Palaga <[email protected]>
AuthorDate: Thu Mar 4 21:32:16 2021 +0100
CAMEL-16307 azure-storage-datalake:...?operation=getFile appends newline
to the file content
---
components/camel-azure-storage-datalake/pom.xml | 7 +-
.../datalake/client/DataLakeFileClientWrapper.java | 5 +-
.../datalake/integration/DataLakeProducerIT.java | 113 +++++++++++++++++++++
.../apache/camel/util/SkipLastByteInputStream.java | 108 ++++++++++++++++++++
.../camel/util/SkipLastNewlineInputStreamTest.java | 72 +++++++++++++
5 files changed, 303 insertions(+), 2 deletions(-)
diff --git a/components/camel-azure-storage-datalake/pom.xml
b/components/camel-azure-storage-datalake/pom.xml
index c9a66ce..0c8c1da 100644
--- a/components/camel-azure-storage-datalake/pom.xml
+++ b/components/camel-azure-storage-datalake/pom.xml
@@ -34,7 +34,7 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-support</artifactId>
</dependency>
-
+
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
@@ -85,6 +85,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-infra-common</artifactId>
<version>${project.version}</version>
diff --git
a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
index 3d9f7bd..36bb300 100644
---
a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
+++
b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
@@ -37,6 +37,7 @@ import com.azure.storage.file.datalake.models.PathProperties;
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
import com.azure.storage.file.datalake.options.FileQueryOptions;
import com.azure.storage.file.datalake.sas.DataLakeServiceSasSignatureValues;
+import org.apache.camel.util.SkipLastByteInputStream;
public class DataLakeFileClientWrapper {
private final DataLakeFileClient client;
@@ -63,7 +64,9 @@ public class DataLakeFileClientWrapper {
public InputStream openInputStream() {
String query = "SELECT * from BlobStorage";
- return client.openQueryInputStream(query);
+ final InputStream sourceInputStream =
client.openQueryInputStream(query);
+ /* Workaround for
https://github.com/Azure/azure-sdk-for-java/issues/19612 */
+ return new SkipLastByteInputStream(sourceInputStream, (byte) '\n');
}
public Response<InputStream> openQueryInputStreamWithResponse(final
FileQueryOptions queryOptions) {
diff --git
a/components/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/integration/DataLakeProducerIT.java
b/components/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/integration/DataLakeProducerIT.java
new file mode 100644
index 0000000..d306d77
--- /dev/null
+++
b/components/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/integration/DataLakeProducerIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.storage.datalake.integration;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Locale;
+
+import com.azure.storage.file.datalake.models.FileSystemItem;
+import org.apache.camel.component.azure.storage.datalake.DataLakeConstants;
+import
org.apache.camel.component.azure.storage.datalake.DataLakeOperationsDefinition;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataLakeProducerIT extends BaseIT {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DataLakeProducerIT.class);
+
+ private String fileName;
+ private byte[] fileContent;
+
+ @BeforeAll
+ public void setup() {
+ final String randomSuffix =
RandomStringUtils.randomAlphabetic(5).toLowerCase(Locale.ROOT);
+ fileName = "file" + randomSuffix + ".txt";
+ fileContent = ("Hello " +
randomSuffix).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Test
+ void testConsumer() throws Exception {
+
+ {
+ @SuppressWarnings("unchecked")
+ List<FileSystemItem> filesystems = template.requestBody(
+ componentUri(fileSystemName,
DataLakeOperationsDefinition.listFileSystem),
+ null,
+ List.class);
+
+
Assertions.assertThat(filesystems.stream().map(FileSystemItem::getName)).doesNotContain(fileSystemName);
+ }
+
+ template.sendBody(
+ componentUri(fileSystemName,
DataLakeOperationsDefinition.createFileSystem),
+ null);
+
+ {
+ @SuppressWarnings("unchecked")
+ List<FileSystemItem> filesystems = template.requestBody(
+ componentUri(fileSystemName,
DataLakeOperationsDefinition.listFileSystem),
+ null,
+ List.class);
+
+
Assertions.assertThat(filesystems.stream().map(FileSystemItem::getName)).contains(fileSystemName);
+ }
+
+ try {
+ template.sendBodyAndHeader(
+ componentUri(fileSystemName,
DataLakeOperationsDefinition.upload),
+ fileContent,
+ DataLakeConstants.FILE_NAME,
+ fileName);
+
+ byte[] actual = template.requestBodyAndHeader(
+ componentUri(fileSystemName,
DataLakeOperationsDefinition.getFile),
+ null,
+ DataLakeConstants.FILE_NAME,
+ fileName,
+ byte[].class);
+
+ Assertions.assertThat(actual).containsExactly(fileContent);
+ } finally {
+ /* Cleanup */
+ template.sendBody(
+ componentUri(fileSystemName,
DataLakeOperationsDefinition.deleteFileSystem),
+ null);
+
+ @SuppressWarnings("unchecked")
+ List<FileSystemItem> filesystems = template.requestBody(
+ componentUri(fileSystemName,
DataLakeOperationsDefinition.listFileSystem),
+ null,
+ List.class);
+
+
Assertions.assertThat(filesystems.stream().map(FileSystemItem::getName)).doesNotContain(fileSystemName);
+ }
+
+ }
+
+ private String componentUri(final String filesystem, final
DataLakeOperationsDefinition operation) {
+ return String.format("azure-storage-datalake://%s%s?operation=%s",
+ service.azureCredentials().accountName(),
+ filesystem == null ? "" : ("/" + filesystem),
+ operation.name());
+ }
+
+}
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/SkipLastByteInputStream.java
b/core/camel-util/src/main/java/org/apache/camel/util/SkipLastByteInputStream.java
new file mode 100644
index 0000000..c68233c
--- /dev/null
+++
b/core/camel-util/src/main/java/org/apache/camel/util/SkipLastByteInputStream.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An {@link InputStream} that skips the last byte of the underlying delegate
{@link InputStream} if the last byte is
+ * equal to the given {@code matchLast} value.
+ */
+public class SkipLastByteInputStream extends BufferedInputStream {
+
+ private final byte matchLast;
+
+ public SkipLastByteInputStream(InputStream delegate, byte matchLast) {
+ super(delegate);
+ this.matchLast = matchLast;
+ }
+
+ public SkipLastByteInputStream(InputStream delegate, int size, byte
matchLast) {
+ super(delegate, size);
+ this.matchLast = matchLast;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int c = super.read();
+ if (c < 0) {
+ return -1;
+ } else if (c == matchLast) {
+ /* look ahead */
+ super.mark(1);
+ int nextC = super.read();
+ if (nextC < 0) {
+ /* matchLast is the last byte */
+ return -1;
+ }
+ super.reset();
+ }
+ return c;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+
+ @Override
+ public int read(byte[] buffer, int off, int len) throws IOException {
+ final int count = super.read(buffer, off, len);
+ if (count < 0) {
+ return -1;
+ }
+ final int lastIndex = off + count - 1;
+ if (lastIndex >= 0) {
+ byte lastByte = buffer[lastIndex];
+ if (lastByte == matchLast) {
+ /* look ahead */
+ super.mark(1);
+ int nextC = super.read();
+ if (nextC < 0) {
+ /* matchLast is the last byte - cut it away and do not
reset */
+ return count - 1;
+ } else {
+ super.reset();
+ }
+ }
+ }
+ return count;
+ }
+
+ public boolean markSupported() {
+ /* we do not want callers to mess with mark() and reset() because we
use it ourselves */
+ return false;
+ }
+
+ @Override
+ public synchronized long skip(long n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized void reset() {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git
a/core/camel-util/src/test/java/org/apache/camel/util/SkipLastNewlineInputStreamTest.java
b/core/camel-util/src/test/java/org/apache/camel/util/SkipLastNewlineInputStreamTest.java
new file mode 100644
index 0000000..94e3f1a
--- /dev/null
+++
b/core/camel-util/src/test/java/org/apache/camel/util/SkipLastNewlineInputStreamTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class SkipLastNewlineInputStreamTest {
+
+ @Test
+ void read() throws IOException {
+
+ assertRead("foo bar\n", "foo bar");
+ assertRead("foo bar\n\n", "foo bar\n");
+ assertRead("foo\nbar\n", "foo\nbar");
+ assertRead("foo\n\nbar\n", "foo\n\nbar");
+ assertRead("", "");
+ assertRead("foo bar", "foo bar");
+ assertRead("\n", "");
+ assertRead("f\n", "f");
+ assertRead("fo\n", "fo");
+ assertRead("foo\n", "foo");
+ assertRead("foo \n", "foo ");
+
+ }
+
+ private void assertRead(String input, String expected) throws IOException {
+
+ try (InputStream in
+ = new SkipLastByteInputStream(new
ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)), (byte) '\n');
+ ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ int c;
+ while ((c = in.read()) >= 0) {
+ out.write(c);
+ }
+ Assertions.assertEquals(expected, new String(out.toByteArray(),
StandardCharsets.UTF_8));
+ }
+
+ try (InputStream in
+ = new SkipLastByteInputStream(new
ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)), (byte) '\n');
+ ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ byte[] buf = new byte[3];
+ int len;
+ while ((len = in.read(buf)) >= 0) {
+ out.write(buf, 0, len);
+ }
+ Assertions.assertEquals(expected, new String(out.toByteArray(),
StandardCharsets.UTF_8));
+ }
+
+ }
+
+}