This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.28 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 8e641723b428bffd902b65650fdb95997227afa3 Author: Peter Turcsanyi <[email protected]> AuthorDate: Mon Oct 28 21:22:57 2024 +0100 NIFI-13930 PutAzureDataLakeStorage sets close flag on file write so that Azure can emit FlushWithClose event (#9458) Signed-off-by: David Handermann <[email protected]> --- .../azure/storage/PutAzureDataLakeStorage.java | 25 +++++++++++----------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index c15c737f58..0fc360597a 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -16,12 +16,14 @@ */ package org.apache.nifi.processors.azure.storage; +import com.azure.core.util.Context; import com.azure.storage.file.datalake.DataLakeDirectoryClient; import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.models.DataLakeRequestConditions; import com.azure.storage.file.datalake.models.DataLakeStorageException; +import com.azure.storage.file.datalake.options.DataLakeFileFlushOptions; import org.apache.commons.io.input.BoundedInputStream; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -218,16 +220,14 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess private void uploadFile(final ProcessSession session, final FlowFile flowFile, final Optional<FileResource> fileResourceFound, final long transferSize, final DataLakeFileClient fileClient) throws Exception { - if (transferSize > 0) { - try (final InputStream inputStream = new BufferedInputStream( - fileResourceFound.map(FileResource::getInputStream) - .orElseGet(() -> session.read(flowFile))) - ) { - uploadContent(fileClient, inputStream, transferSize); - } catch (final Exception e) { - removeFile(fileClient); - throw e; - } + try (final InputStream inputStream = new BufferedInputStream( + fileResourceFound.map(FileResource::getInputStream) + .orElseGet(() -> session.read(flowFile))) + ) { + uploadContent(fileClient, inputStream, transferSize); + } catch (final Exception e) { + removeFile(fileClient); + throw e; } } @@ -248,8 +248,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess chunkStart += chunkSize; } - // use overwrite mode due to https://github.com/Azure/azure-sdk-for-java/issues/31248 - fileClient.flush(length, true); + fileClient.flushWithResponse(length, new DataLakeFileFlushOptions().setClose(true), null, Context.NONE); } /** @@ -262,7 +261,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess * @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore' * @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors */ - DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) { + private DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) { final String destinationPath = createPath(directoryClient.getDirectoryPath(), fileName); try {
