turcsanyip commented on a change in pull request #4287: URL: https://github.com/apache/nifi/pull/4287#discussion_r430546794
########## File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java ########## @@ -76,27 +109,45 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); - final DataLakeFileClient fileClient = directoryClient.createFile(fileName); + final DataLakeFileClient fileClient; + + final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); + boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION); + + try { + fileClient = directoryClient.createFile(fileName, overwrite); + + final long length = flowFile.getSize(); + if (length > 0) { + try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) { + fileClient.append(in, 0, length); + } + } + fileClient.flush(length); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("azure.filesystem", fileSystem); + attributes.put("azure.directory", directory); + attributes.put("azure.filename", fileName); + attributes.put("azure.primaryUri", fileClient.getFileUrl()); + attributes.put("azure.length", String.valueOf(length)); + flowFile = session.putAllAttributes(flowFile, attributes); - final long length = flowFile.getSize(); - if (length > 0) { - try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) { - fileClient.append(in, 0, length); + session.transfer(flowFile, REL_SUCCESS); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis); + } catch (DataLakeStorageException dlsException) { + if (dlsException.getStatusCode() == 409) { + if (conflictResolution.equals(IGNORE_RESOLUTION)) { + session.transfer(flowFile, REL_SUCCESS); + getLogger().warn("Transferring {} to success because file with same name already exists", new Object[]{flowFile}); Review comment: The warning message does not properly describe the cause and the effect: file exists => transfer to success The reason for transferring to success is the 'Ignore' resolution policy rather. It should also be mentioned that the file has not been overwritten in Azure. ########## File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java ########## @@ -253,6 +300,14 @@ private void assertFlowFile(String directory, String fileName, byte[] fileData) flowFile.assertAttributeEquals("azure.length", Integer.toString(fileData.length)); } + private void assertSimpleFlowFile(byte[] fileData) throws Exception { Review comment: I could be called from `assertFlowFile` because the first section of that method is the same. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org