This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 205c58230b NIFI-9879 Add min/max age and min/max size properties to ListAzure processors 205c58230b is described below commit 205c58230b03131f15b6d0bc41cc82d9f3806016 Author: Timea Barna <timea.ba...@gmail.com> AuthorDate: Wed Apr 6 12:38:14 2022 +0200 NIFI-9879 Add min/max age and min/max size properties to ListAzure processors This closes #5933. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../azure/storage/AbstractListAzureProcessor.java | 85 ++++++++++++++++++++++ .../azure/storage/ListAzureBlobStorage.java | 18 +++-- .../azure/storage/ListAzureBlobStorage_v12.java | 11 ++- .../azure/storage/ListAzureDataLakeStorage.java | 11 ++- .../azure/storage/AbstractAzureBlobStorageIT.java | 9 ++- .../storage/AbstractAzureBlobStorage_v12IT.java | 3 +- .../storage/AbstractAzureDataLakeStorageIT.java | 5 +- .../azure/storage/ITListAzureBlobStorage.java | 68 ++++++++++++++++- .../azure/storage/ITListAzureBlobStorage_v12.java | 57 +++++++++++++-- .../azure/storage/ITListAzureDataLakeStorage.java | 45 +++++++++++- .../azure/storage/ITMoveAzureDataLakeStorage.java | 11 +-- .../azure/storage/ITPutAzureBlobStorage.java | 19 ++--- .../azure/storage/ITPutAzureDataLakeStorage.java | 5 +- 13 files changed, 300 insertions(+), 47 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/AbstractListAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/AbstractListAzureProcessor.java new file mode 100644 index 0000000000..a58a2e96f3 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/AbstractListAzureProcessor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListableEntity; + +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +public abstract class AbstractListAzureProcessor<T extends ListableEntity> extends AbstractListProcessor<T> { + public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() + .name("Minimum File Age") + .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored") + .required(true) + .addValidator(TIME_PERIOD_VALIDATOR) + .defaultValue("0 sec") + .build(); + + public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder() + .name("Maximum File Age") + .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) + .build(); + + public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder() + .name("Minimum File Size") + .description("The minimum size that a file must be in order to be pulled") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("0 B") + .build(); + + public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder() + .name("Maximum File Size") + .description("The maximum size that a file can be in order to be pulled") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + protected boolean isFileInfoMatchesWithAgeAndSize(final ProcessContext context, final long minimumTimestamp, final long lastModified, final long size) { + final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue(); + final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B); + final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + + if (lastModified < minimumTimestamp) { + return false; + } + final long fileAge = System.currentTimeMillis() - lastModified; + if (minAge > fileAge) { + return false; + } + if (maxAge != null && maxAge < fileAge) { + return false; + } + if (minSize > size) { + return false; + } + if (maxSize != null && maxSize < size) { + return false; + } + return true; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java index 3e08e481f7..d84f9b8cfa 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -87,7 +87,7 @@ import java.util.Optional; "This allows the Processor to list only blobs that have been added or modified after this date the next time that the Processor is run. State is " + "stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " + "where the previous node left off, without duplicating the data.") -public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { +public class ListAzureBlobStorage extends AbstractListAzureProcessor<BlobInfo> { private static final PropertyDescriptor PROP_PREFIX = new PropertyDescriptor.Builder() .name("prefix") @@ -111,8 +111,12 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { AzureStorageUtils.PROXY_CONFIGURATION_SERVICE, ListedEntityTracker.TRACKING_STATE_CACHE, ListedEntityTracker.TRACKING_TIME_WINDOW, - ListedEntityTracker.INITIAL_LISTING_TARGET - )); + ListedEntityTracker.INITIAL_LISTING_TARGET, + MIN_AGE, + MAX_AGE, + MIN_SIZE, + MAX_SIZE + )); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -157,9 +161,9 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { protected boolean isListingResetNecessary(final PropertyDescriptor property) { // re-list if configuration changed, but not when security keys are rolled (not included in the condition) return PROP_PREFIX.equals(property) - || AzureStorageUtils.ACCOUNT_NAME.equals(property) - || AzureStorageUtils.CONTAINER.equals(property) - || AzureStorageUtils.PROP_SAS_TOKEN.equals(property); + || AzureStorageUtils.ACCOUNT_NAME.equals(property) + || AzureStorageUtils.CONTAINER.equals(property) + || AzureStorageUtils.PROP_SAS_TOKEN.equals(property); } @Override @@ -205,7 +209,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { final CloudBlob cloudBlob = (CloudBlob) blob; final BlobProperties properties = cloudBlob.getProperties(); - if (properties.getLastModified().getTime() >= minimumTimestamp) { + if (isFileInfoMatchesWithAgeAndSize(context, minimumTimestamp, properties.getLastModified().getTime(), properties.getLength())) { final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri(); final Builder builder = new BlobInfo.Builder() diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java index 82bb821774..483a3c9314 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java @@ -41,7 +41,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processor.util.list.AbstractListProcessor; import org.apache.nifi.processor.util.list.ListedEntityTracker; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.BlobInfo; @@ -99,7 +98,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR "(by default). This allows the Processor to list only blobs that have been added or modified after this date the next time that the Processor is run. State is " + "stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " + "where the previous node left off, without duplicating the data.") -public class ListAzureBlobStorage_v12 extends AbstractListProcessor<BlobInfo> { +public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInfo> { public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder() .fromPropertyDescriptor(AzureStorageUtils.CONTAINER) @@ -138,7 +137,11 @@ public class ListAzureBlobStorage_v12 extends AbstractListProcessor<BlobInfo> { LISTING_STRATEGY, TRACKING_STATE_CACHE, TRACKING_TIME_WINDOW, - INITIAL_LISTING_TARGET + INITIAL_LISTING_TARGET, + MIN_AGE, + MAX_AGE, + MIN_SIZE, + MAX_SIZE )); private BlobServiceClient storageClient; @@ -219,7 +222,7 @@ public class ListAzureBlobStorage_v12 extends AbstractListProcessor<BlobInfo> { final BlobItem blob = result.next(); final BlobItemProperties properties = blob.getProperties(); - if (properties.getLastModified().toInstant().toEpochMilli() >= minimumTimestamp) { + if (isFileInfoMatchesWithAgeAndSize(context, minimumTimestamp, properties.getLastModified().toInstant().toEpochMilli(), properties.getContentLength())) { final Builder builder = new Builder() .containerName(containerName) .blobName(blob.getName()) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java index 1de2ee9b99..6bb3b0e17d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java @@ -41,7 +41,6 @@ import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processor.util.list.AbstractListProcessor; import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo; import org.apache.nifi.serialization.record.RecordSchema; @@ -100,7 +99,7 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR "This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. State is " + "stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " + "where the previous node left off, without duplicating the data.") -public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo> { +public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFileInfo> { public static final PropertyDescriptor RECURSE_SUBDIRECTORIES = new PropertyDescriptor.Builder() .name("recurse-subdirectories") @@ -140,7 +139,11 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo LISTING_STRATEGY, TRACKING_STATE_CACHE, TRACKING_TIME_WINDOW, - INITIAL_LISTING_TARGET)); + INITIAL_LISTING_TARGET, + MIN_AGE, + MAX_AGE, + MIN_SIZE, + MAX_SIZE)); private static final Set<PropertyDescriptor> LISTING_RESET_PROPERTIES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( ADLS_CREDENTIALS_SERVICE, @@ -260,7 +263,7 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo final List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream() .filter(pathItem -> !pathItem.isDirectory()) - .filter(pathItem -> pathItem.getLastModified().toInstant().toEpochMilli() >= minimumTimestamp) + .filter(pathItem -> isFileInfoMatchesWithAgeAndSize(context, minimumTimestamp, pathItem.getLastModified().toInstant().toEpochMilli(), pathItem.getContentLength())) .map(pathItem -> new ADLSFileInfo.Builder() .fileSystem(fileSystem) .filePath(pathItem.getName()) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java index 93cd8bd206..afd6a97502 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.BeforeEach; import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.UUID; import static org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLOB_ENDPOINT_SUFFIX; @@ -59,8 +60,12 @@ public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT } protected void uploadTestBlob() throws Exception { - CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME); - byte[] buf = TEST_FILE_CONTENT.getBytes(); + uploadTestBlob(TEST_BLOB_NAME, TEST_FILE_CONTENT); + } + + protected void uploadTestBlob(final String blobName, final String fileContent) throws Exception { + CloudBlob blob = container.getBlockBlobReference(blobName); + byte[] buf = fileContent.getBytes(StandardCharsets.UTF_8); InputStream in = new ByteArrayInputStream(buf); blob.upload(in, buf.length); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java index 608a49221b..b95d18a2c7 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java @@ -34,6 +34,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -43,7 +44,7 @@ import static org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLO public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorageIT { protected static final String BLOB_NAME = "blob1"; - protected static final byte[] BLOB_DATA = "0123456789".getBytes(); + protected static final byte[] BLOB_DATA = "0123456789".getBytes(StandardCharsets.UTF_8); protected static final String EL_CONTAINER_NAME = "az.containername"; protected static final String EL_BLOB_NAME = "az.blobname"; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java index 6240a5cce7..e683e6f697 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; import java.util.UUID; import static org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_ADLS_ENDPOINT_SUFFIX; @@ -87,7 +88,7 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag } protected void uploadFile(String directory, String filename, String fileContent) { - uploadFile(directory, filename, fileContent.getBytes()); + uploadFile(directory, filename, fileContent.getBytes(StandardCharsets.UTF_8)); } protected void uploadFile(TestFile testFile) { @@ -102,7 +103,7 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag } protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) { - createDirectoryAndUploadFile(directory, filename, fileContent.getBytes()); + createDirectoryAndUploadFile(directory, filename, fileContent.getBytes(StandardCharsets.UTF_8)); } protected void createDirectoryAndUploadFile(String directory, String filename, byte[] fileData) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java index 484d4953a4..ef1670bdd9 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java @@ -21,7 +21,11 @@ import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; + +import static org.junit.jupiter.api.Assertions.assertEquals; public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT { @@ -33,8 +37,7 @@ public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT { @BeforeEach public void setUp() throws Exception { uploadTestBlob(); - - Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2); + waitForUpload(); } @Test @@ -55,13 +58,72 @@ public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT { assertResult(); } + @Test + public void testListWithMinAge() throws Exception { + runner.setProperty(ListAzureBlobStorage.MIN_AGE, "1 hour"); + + runner.assertValid(); + runner.run(1); + + runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 0); + } + + @Test + public void testListWithMaxAge() throws Exception { + runner.setProperty(ListAzureBlobStorage.MAX_AGE, "1 hour"); + + runner.assertValid(); + runner.run(1); + + assertResult(TEST_FILE_CONTENT); + } + + @Test + public void testListWithMinSize() throws Exception { + uploadTestBlob("nifi-test-blob2", "Test"); + waitForUpload(); + assertListCount(); + runner.setProperty(ListAzureBlobStorage.MIN_SIZE, "5 B"); + + runner.assertValid(); + runner.run(1); + + assertResult(TEST_FILE_CONTENT); + } + + @Test + public void testListWithMaxSize() throws Exception { + uploadTestBlob("nifi-test-blob2", "Test"); + waitForUpload(); + assertListCount(); + runner.setProperty(ListAzureBlobStorage.MAX_SIZE, "5 B"); + + runner.assertValid(); + runner.run(1); + + assertResult("Test"); + } + + private void waitForUpload() throws InterruptedException { + Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2); + } + private void assertResult() { + assertResult(TEST_FILE_CONTENT); + } + + private void assertResult(final String content) { runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1); for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) { - entry.assertAttributeEquals("azure.length", "36"); + entry.assertAttributeEquals("azure.length", String.valueOf(content.getBytes(StandardCharsets.UTF_8).length)); entry.assertAttributeEquals("mime.type", "application/octet-stream"); } } + + private void assertListCount() { + final long listCount = StreamSupport.stream(container.listBlobs().spliterator(), false).count(); + assertEquals(2, listCount, "There should be 2 uploaded files but found only " + listCount); + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java index 307dd6c2ad..942e834074 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java @@ -23,6 +23,7 @@ import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.Test; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -112,15 +113,59 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT { MockRecordWriter recordWriter = new MockRecordWriter(null, false); runner.addControllerService("record-writer", recordWriter); runner.enableControllerService(recordWriter); - runner.setProperty(ListAzureDataLakeStorage.RECORD_WRITER, "record-writer"); + runner.setProperty(ListAzureBlobStorage_v12.RECORD_WRITER, "record-writer"); runner.run(); - runner.assertAllFlowFilesTransferred(ListAzureDataLakeStorage.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS).get(0); + runner.assertAllFlowFilesTransferred(ListAzureBlobStorage_v12.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureBlobStorage_v12.REL_SUCCESS).get(0); flowFile.assertAttributeEquals("record.count", "4"); } + @Test + public void testListWithMinAge() throws Exception { + uploadBlobs(); + runner.setProperty(ListAzureBlobStorage_v12.MIN_AGE, "1 hour"); + + runProcessor(); + + runner.assertTransferCount(ListAzureBlobStorage_v12.REL_SUCCESS, 0); + } + + @Test + public void testListWithMaxAge() throws Exception { + uploadBlobs(); + runner.setProperty(ListAzureBlobStorage_v12.MAX_AGE, "1 hour"); + + runProcessor(); + + assertSuccess(BLOB_NAME_1, BLOB_NAME_2, BLOB_NAME_3, BLOB_NAME_4); + } + + @Test + public void testListWithMinSize() throws Exception { + uploadBlobs(); + runner.setProperty(ListAzureBlobStorage_v12.MIN_SIZE, "5 B"); + uploadBlob("blob5", "Test".getBytes(StandardCharsets.UTF_8)); + + runProcessor(); + + assertSuccess(BLOB_NAME_1, BLOB_NAME_2, BLOB_NAME_3, BLOB_NAME_4); + } + + @Test + public void testListWithMaxSize() throws Exception { + uploadBlobs(); + runner.setProperty(ListAzureBlobStorage_v12.MAX_SIZE, "5 B"); + uploadBlob("blob5", "Test".getBytes(StandardCharsets.UTF_8)); + + runProcessor(); + + runner.assertAllFlowFilesTransferred(ListAzureBlobStorage_v12.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureBlobStorage_v12.REL_SUCCESS).get(0); + assertFlowFileBlobAttributes(flowFile, getContainerName(), "blob5", "Test".length()); + } + private void uploadBlobs() throws Exception { uploadBlob(BLOB_NAME_1, BLOB_DATA); uploadBlob(BLOB_NAME_2, BLOB_DATA); @@ -134,9 +179,9 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT { } private void assertSuccess(String... blobNames) throws Exception { - runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, blobNames.length); + runner.assertTransferCount(ListAzureBlobStorage_v12.REL_SUCCESS, blobNames.length); - List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListAzureBlobStorage_v12.REL_SUCCESS); Set<String> expectedBlobNames = new HashSet<>(Arrays.asList(blobNames)); @@ -160,6 +205,6 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT { private void assertFailure() { assertFalse(runner.getLogger().getErrorMessages().isEmpty()); - runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0); + runner.assertTransferCount(ListAzureBlobStorage_v12.REL_SUCCESS, 0); } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java index a60403b48f..09ca9b2cfe 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java @@ -24,6 +24,7 @@ import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -72,7 +73,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { createDirectoryAndUploadFile(testFile111); testFiles.put(testFile111.getFilePath(), testFile111); - TestFile testFile21 = new TestFile("dir 2", "file 21"); + TestFile testFile21 = new TestFile("dir 2", "file 21", "Test"); createDirectoryAndUploadFile(testFile21); testFiles.put(testFile21.getFilePath(), testFile21); @@ -225,6 +226,46 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { flowFile.assertAttributeEquals("record.count", "3"); } + @Test + public void testListWithMinAge() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour"); + + runProcessor(); + + runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0); + } + + @Test + public void testListWithMaxAge() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour"); + + runProcessor(); + + assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21"); + } + + @Test + public void testListWithMinSize() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B"); + + runProcessor(); + + assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111"); + } + + @Test + public void testListWithMaxSize() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B"); + + runProcessor(); + + assertSuccess("dir 2/file 21"); + } + private void runProcessor() { runner.assertValid(); runner.run(); @@ -251,7 +292,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { flowFile.assertAttributeEquals(ATTR_NAME_FILE_PATH, testFile.getFilePath()); flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, testFile.getDirectory()); flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, testFile.getFilename()); - flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, String.valueOf(testFile.getFileContent().length())); + flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, String.valueOf(testFile.getFileContent().getBytes(StandardCharsets.UTF_8).length)); flowFile.assertAttributeExists(ATTR_NAME_LAST_MODIFIED); flowFile.assertAttributeExists(ATTR_NAME_ETAG); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java index d049d9db5c..b98317f733 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java @@ -25,9 +25,10 @@ import org.apache.nifi.processor.Processor; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -50,7 +51,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { private static final String SOURCE_DIRECTORY = "sourceDir1"; private static final String DESTINATION_DIRECTORY = "destDir1"; private static final String FILE_NAME = "file1"; - private static final byte[] FILE_DATA = "0123456789".getBytes(); + private static final byte[] FILE_DATA = "0123456789".getBytes(StandardCharsets.UTF_8); private static final String EL_FILESYSTEM = "az.filesystem"; private static final String EL_DIRECTORY = "az.directory"; @@ -61,7 +62,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { return MoveAzureDataLakeStorage.class; } - @Before + @BeforeEach public void setUp() throws InterruptedException { runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM, fileSystemName); runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, SOURCE_DIRECTORY); @@ -234,7 +235,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { runProcessor(FILE_DATA); - assertSuccessWithIgnoreResolution(DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA, fileContent.getBytes()); + assertSuccessWithIgnoreResolution(DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA, fileContent.getBytes(StandardCharsets.UTF_8)); } @Test diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java index 8a1a2190c8..212b48d8b1 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java @@ -25,6 +25,7 @@ import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.nio.charset.StandardCharsets; import java.util.List; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -54,7 +55,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT { @Test public void testPutBlob() throws Exception { runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes()); + runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); runner.run(); assertResult(); @@ -74,7 +75,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT { runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE); runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_128B_VALUE); runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes()); + runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); runner.run(); assertResult(); @@ -86,7 +87,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT { runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE); runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_192B_VALUE); runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes()); + runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); runner.run(); assertResult(); @@ -98,7 +99,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT { runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE); runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_256B_VALUE); runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes()); + runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); runner.run(); assertResult(); @@ -110,7 +111,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT { runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE); runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_384B_VALUE); runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes()); + runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); runner.run(); assertResult(); @@ -122,7 +123,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT { runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE); runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_512B_VALUE); runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes()); + runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); runner.run(); assertResult(); @@ -133,7 +134,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT { configureCredentialsService(); runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes()); + runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); runner.run(); assertResult(); @@ -144,7 +145,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT { runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "invalid"); runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "aW52YWxpZGludmFsaWQ="); runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes()); + runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); runner.run(); runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1); @@ -154,7 +155,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT { runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1); List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS); for (MockFlowFile flowFile : flowFilesForRelationship) { - flowFile.assertContentEquals(TEST_FILE_CONTENT.getBytes()); + flowFile.assertContentEquals(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); flowFile.assertAttributeEquals("azure.length", "10"); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java index 81d7245dd5..18f4341e8b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -51,7 +52,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { private static final String DIRECTORY = "dir1"; private static final String FILE_NAME = "file1"; - private static final byte[] FILE_DATA = "0123456789".getBytes(); + private static final byte[] FILE_DATA = "0123456789".getBytes(StandardCharsets.UTF_8); private static final String EL_FILESYSTEM = "az.filesystem"; private static final String EL_DIRECTORY = "az.directory"; @@ -207,7 +208,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { runProcessor(FILE_DATA); - assertSuccessWithIgnoreResolution(DIRECTORY, FILE_NAME, FILE_DATA, azureFileContent.getBytes()); + assertSuccessWithIgnoreResolution(DIRECTORY, FILE_NAME, FILE_DATA, azureFileContent.getBytes(StandardCharsets.UTF_8)); } @Test