This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.15 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 534e6eafe7b8622e6d141802a40d7c6e01a09361 Author: Bence Simon <simonbence....@gmail.com> AuthorDate: Thu Sep 30 15:57:41 2021 +0200 NIFI-9260 Making the 'write and rename' behaviour optional for PutHDFS This closes #5423. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../org/apache/nifi/processors/hadoop/PutHDFS.java | 35 ++++++++-- .../apache/nifi/processors/hadoop/PutHDFSTest.java | 74 +++++++++++++++++----- 2 files changed, 90 insertions(+), 19 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 62b7996..d5d85fc 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -125,6 +125,9 @@ public class PutHDFS extends AbstractHadoopProcessor { protected static final String FAIL_RESOLUTION = "fail"; protected static final String APPEND_RESOLUTION = "append"; + protected static final String WRITE_AND_RENAME = "writeAndRename"; + protected static final String SIMPLE_WRITE = "simpleWrite"; + protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION, REPLACE_RESOLUTION, "Replaces the existing file if any."); protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION, @@ -134,6 +137,11 @@ public class PutHDFS extends AbstractHadoopProcessor { protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION, "Appends to the existing file if any, creates a new file otherwise."); + protected static final AllowableValue WRITE_AND_RENAME_AV = new AllowableValue(WRITE_AND_RENAME, "Write and rename", + "The processor writes FlowFile data into a temporary file and renames it after completion. This prevents other processes from reading partially written files."); + protected static final AllowableValue SIMPLE_WRITE_AV = new AllowableValue(SIMPLE_WRITE, "Simple write", + "The processor writes FlowFile data directly to the destination file. In some cases this might cause reading partially written files."); + protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() .name("Conflict Resolution Strategy") .description("Indicates what should happen when a file with the same name already exists in the output directory") @@ -142,6 +150,15 @@ public class PutHDFS extends AbstractHadoopProcessor { .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV) .build(); + protected static final PropertyDescriptor WRITING_STRATEGY = new PropertyDescriptor.Builder() + .name("writing-strategy") + .displayName("Writing Strategy") + .description("Defines the approach for writing the FlowFile data.") + .required(true) + .defaultValue(WRITE_AND_RENAME_AV.getValue()) + .allowableValues(WRITE_AND_RENAME_AV, SIMPLE_WRITE_AV) + .build(); + public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder() .name("Block Size") .description("Size of each block as written to HDFS. This overrides the Hadoop Configuration") @@ -219,6 +236,7 @@ public class PutHDFS extends AbstractHadoopProcessor { .description("The parent HDFS directory to which files should be written. The directory will be created if it doesn't exist.") .build()); props.add(CONFLICT_RESOLUTION); + props.add(WRITING_STRATEGY); props.add(BLOCK_SIZE); props.add(BUFFER_SIZE); props.add(REPLICATION_FACTOR); @@ -280,6 +298,7 @@ public class PutHDFS extends AbstractHadoopProcessor { Path tempDotCopyFile = null; FlowFile putFlowFile = flowFile; try { + final String writingStrategy = context.getProperty(WRITING_STRATEGY).getValue(); final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile); final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); final long blockSize = getBlockSize(context, session, putFlowFile, dirPath); @@ -295,6 +314,11 @@ public class PutHDFS extends AbstractHadoopProcessor { final Path tempCopyFile = new Path(dirPath, "." + filename); final Path copyFile = new Path(dirPath, filename); + // Depending on the writing strategy, we might need a temporary file + final Path actualCopyFile = (writingStrategy.equals(WRITE_AND_RENAME)) + ? tempCopyFile + : copyFile; + // Create destination directory if it does not exist boolean targetDirCreated = false; try { @@ -361,7 +385,7 @@ public class PutHDFS extends AbstractHadoopProcessor { cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); } - fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, null, null); } @@ -369,7 +393,7 @@ public class PutHDFS extends AbstractHadoopProcessor { if (codec != null) { fos = codec.createOutputStream(fos); } - createdFile = tempCopyFile; + createdFile = actualCopyFile; BufferedInputStream bis = new BufferedInputStream(in); StreamUtils.copy(bis, fos); bis = null; @@ -399,9 +423,12 @@ public class PutHDFS extends AbstractHadoopProcessor { final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); tempDotCopyFile = tempCopyFile; - if (!conflictResponse.equals(APPEND_RESOLUTION) - || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) { + if ( + writingStrategy.equals(WRITE_AND_RENAME) + && (!conflictResponse.equals(APPEND_RESOLUTION) || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) + ) { boolean renamed = false; + for (int i = 0; i < 10; i++) { // try to rename multiple times. if (hdfs.rename(tempCopyFile, copyFile)) { renamed = true; diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index c41114b..22b3ec2 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -71,6 +71,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; public class PutHDFSTest { + private final static String TARGET_DIRECTORY = "target/test-classes"; + private final static String SOURCE_DIRECTORY = "src/test/resources/testdata"; + private final static String FILE_NAME = "randombytes-1"; private KerberosProperties kerberosProperties; private FileSystem mockFileSystem; @@ -197,27 +200,32 @@ public class PutHDFSTest { @Test public void testPutFile() throws IOException { - PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); - runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); - try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { - Map<String, String> attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + // given + final FileSystem spyFileSystem = Mockito.spy(mockFileSystem); + final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutHDFS.DIRECTORY, TARGET_DIRECTORY); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, PutHDFS.REPLACE_RESOLUTION); + + // when + try (final FileInputStream fis = new FileInputStream(SOURCE_DIRECTORY + "/" + FILE_NAME)) { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME); runner.enqueue(fis, attributes); runner.run(); } - List<MockFlowFile> failedFlowFiles = runner - .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); + // then + final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE); assertTrue(failedFlowFiles.isEmpty()); - List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS); + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS); assertEquals(1, flowFiles.size()); - MockFlowFile flowFile = flowFiles.get(0); - assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1"))); - assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key())); - assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); + + final MockFlowFile flowFile = flowFiles.get(0); + assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + FILE_NAME))); + assertEquals(FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key())); + assertEquals(TARGET_DIRECTORY, flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE)); final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents(); @@ -225,7 +233,43 @@ public class PutHDFSTest { final ProvenanceEventRecord sendEvent = provenanceEvents.get(0); assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType()); // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename. - assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1")); + assertTrue(sendEvent.getTransitUri().endsWith(TARGET_DIRECTORY + "/" + FILE_NAME)); + + Mockito.verify(spyFileSystem, Mockito.times(1)).rename(Mockito.any(Path.class), Mockito.any(Path.class)); + } + + @Test + public void testPutFileWithSimpleWrite() throws IOException { + // given + final FileSystem spyFileSystem = Mockito.spy(mockFileSystem); + final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutHDFS.DIRECTORY, TARGET_DIRECTORY); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, PutHDFS.REPLACE_RESOLUTION); + runner.setProperty(PutHDFS.WRITING_STRATEGY, PutHDFS.SIMPLE_WRITE); + + // when + try (final FileInputStream fis = new FileInputStream(SOURCE_DIRECTORY + "/" + FILE_NAME)) { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME); + runner.enqueue(fis, attributes); + runner.run(); + } + + // then + final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE); + assertTrue(failedFlowFiles.isEmpty()); + + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + + final MockFlowFile flowFile = flowFiles.get(0); + assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + FILE_NAME))); + assertEquals(FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key())); + assertEquals(TARGET_DIRECTORY, flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); + assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE)); + + Mockito.verify(spyFileSystem, Mockito.never()).rename(Mockito.any(Path.class), Mockito.any(Path.class)); } @Test