turcsanyip commented on code in PR #8540: URL: https://github.com/apache/nifi/pull/8540#discussion_r1535144271
########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java: ########## @@ -229,38 +259,65 @@ static void uploadContent(DataLakeFileClient fileClient, InputStream in, long le } /** - * This method serves as a "commit" for the upload process. Upon upload, a 0-byte file is created, then the payload is appended to it. - * Because of that, a work-in-progress file is available for readers before the upload is complete. It is not an efficient approach in - * case of conflicts because FlowFiles are uploaded unnecessarily, but it is a calculated risk because consistency is more important. + * Creates the file on Azure for 'Simple Write' strategy. Upon upload, a 0-byte file is created, then the payload is appended to it. + * Because of that, a work-in-progress file is available for readers before the upload is complete. + * + * @param directoryClient directory client of the uploaded file's parent directory + * @param fileName name of the uploaded file + * @param conflictResolution conflict resolution strategy + * @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore' + * @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors + */ + DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) { + final String destinationPath = createPath(directoryClient.getDirectoryPath(), fileName); + + try { + final boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION); + return directoryClient.createFile(fileName, overwrite); + } catch (DataLakeStorageException dataLakeStorageException) { + return handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution); + } + } + + /** + * This method serves as a "commit" for the upload process in case of 'Write and Rename' strategy. In order to prevent work-in-progress files from being available for readers, + * a temporary file is written first, and then renamed/moved to its final destination. It is not an efficient approach in case of conflicts because FlowFiles are uploaded unnecessarily, + * but it is a calculated risk because consistency is more important for 'Write and Rename' strategy. * <p> * Visible for testing * - * @param sourceFileClient client of the temporary file + * @param sourceFileClient file client of the temporary file * @param destinationDirectory final location of the uploaded file * @param destinationFileName final name of the uploaded file * @param conflictResolution conflict resolution strategy - * @return URL of the uploaded file + * @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore' + * @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors */ - String renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) { + DataLakeFileClient renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) { final String destinationPath = createPath(destinationDirectory, destinationFileName); try { final DataLakeRequestConditions destinationCondition = new DataLakeRequestConditions(); if (!conflictResolution.equals(REPLACE_RESOLUTION)) { destinationCondition.setIfNoneMatch("*"); } - return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue().getFileUrl(); + return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue(); } catch (DataLakeStorageException dataLakeStorageException) { - removeTempFile(sourceFileClient); - if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) { - getLogger().info("File [{}] already exists. Remote file not modified due to {} being set to '{}'.", - destinationPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); - return null; - } else if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(FAIL_RESOLUTION)) { - throw new ProcessException(String.format("File [%s] already exists.", destinationPath), dataLakeStorageException); - } else { - throw new ProcessException(String.format("Renaming File [%s] failed", destinationPath), dataLakeStorageException); - } + removeFile(sourceFileClient); + + return handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution); + } + } + + private DataLakeFileClient handleDataLakeStorageException(final DataLakeStorageException dataLakeStorageException, final String destinationPath, final String conflictResolution) { + if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) { Review Comment: `BlobErrorCode` comes from the Blob client library (`azure-storage-blob`) and it seems the ADLS service returns a different code: `PathAlreadyExists`. I did not find a similar error code class in the ADLS client library (`azure-storage-file-datalake`). In general, I did not want to change the logic here because this code section was extracted from an existing method but introducing a local variable makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org