This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 1975101 NIFI-6361: Add fix to PutFile processor 1975101 is described below commit 1975101292fcd0fceb0d30594da54e5257bc7700 Author: Andres Garagiola <andresgaragi...@gmail.com> AuthorDate: Fri Jun 7 09:54:44 2019 -0300 NIFI-6361: Add fix to PutFile processor When PutFile uses 'replace' conflict resolution and max files, there is an issue when the folder has X files, and the limit is also X. The processor fails instead of replacing it, leaving X files. This commit fixes that issue. This closes #3524. Signed-off-by: Andres Garagiola <andresgaragi...@gmail.com> Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> --- .../apache/nifi/processors/standard/PutFile.java | 15 +- .../nifi/processors/standard/TestPutFile.java | 210 +++++++++++++++++++++ 2 files changed, 222 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java index 0c95c75..8784c75 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java @@ -49,6 +49,7 @@ import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.UserPrincipalLookupService; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -203,8 +204,9 @@ public class PutFile extends AbstractProcessor { Path tempDotCopyFile = null; try { final Path rootDirPath = configuredRootDirPath; - final Path tempCopyFile = rootDirPath.resolve("." + flowFile.getAttribute(CoreAttributes.FILENAME.key())); - final Path copyFile = rootDirPath.resolve(flowFile.getAttribute(CoreAttributes.FILENAME.key())); + String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final Path tempCopyFile = rootDirPath.resolve("." + filename); + final Path copyFile = rootDirPath.resolve(filename); if (!Files.exists(rootDirPath)) { if (context.getProperty(CREATE_DIRS).asBoolean()) { @@ -224,7 +226,7 @@ public class PutFile extends AbstractProcessor { final Path finalCopyFileDir = finalCopyFile.getParent(); if (Files.exists(finalCopyFileDir) && maxDestinationFiles != null) { // check if too many files already - final int numFiles = finalCopyFileDir.toFile().list().length; + final long numFiles = getFilesNumberInFolder(finalCopyFileDir, filename); if (numFiles >= maxDestinationFiles) { flowFile = session.penalize(flowFile); @@ -336,6 +338,13 @@ public class PutFile extends AbstractProcessor { } } + private long getFilesNumberInFolder(Path folder, String filename) { + String[] filesInFolder = folder.toFile().list(); + return Arrays.stream(filesInFolder) + .filter(eachFilename -> !eachFilename.equals(filename)) + .count(); + } + protected String stringPermissions(String perms) { String permissions = ""; final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java new file mode 100644 index 0000000..51ad7c0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestPutFile { + + public static final String TARGET_DIRECTORY = "target/put-file"; + private File targetDir; + + @Before + public void prepDestDirectory() throws IOException { + targetDir = new File(TARGET_DIRECTORY); + if (!targetDir.exists()) { + Files.createDirectories(targetDir.toPath()); + return; + } + + targetDir.setReadable(true); + + deleteDirectoryContent(targetDir); + } + + private void deleteDirectoryContent(File directory) throws IOException { + for (final File file : directory.listFiles()) { + if (file.isDirectory()) { + deleteDirectoryContent(file); + } + Files.delete(file.toPath()); + } + } + + @Test + public void testCreateDirectory() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + String newDir = targetDir.getAbsolutePath()+"/new-folder"; + runner.setProperty(PutFile.DIRECTORY, newDir); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION); + + Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY + "/new-folder/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + } + + @Test + public void testReplaceConflictResolution() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath()); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION); + + Map<String,String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + + //Second file + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Another file".getBytes(), attributes); + runner.run(); + runner.assertTransferCount(FetchFile.REL_SUCCESS, 2); + File dir = new File(TARGET_DIRECTORY); + assertEquals(1, dir.list().length); + targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + content = Files.readAllBytes(targetPath); + assertEquals("Another file", new String(content)); + } + + @Test + public void testIgnoreConflictResolution() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath()); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.IGNORE_RESOLUTION); + + Map<String,String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + + //Second file + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Another file".getBytes(), attributes); + runner.run(); + runner.assertTransferCount(FetchFile.REL_SUCCESS, 2); + File dir = new File(TARGET_DIRECTORY); + assertEquals(1, dir.list().length); + targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + } + + @Test + public void testFailConflictResolution() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath()); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.FAIL_RESOLUTION); + + Map<String,String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + + //Second file + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Another file".getBytes(), attributes); + runner.run(); + runner.assertTransferCount(PutFile.REL_SUCCESS, 1); + runner.assertTransferCount(PutFile.REL_FAILURE, 1); + runner.assertPenalizeCount(1); + } + + @Test + public void testMaxFileLimitReach() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath()); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION); + runner.setProperty(PutFile.MAX_DESTINATION_FILES, "1"); + + Map<String,String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + + //Second file + attributes.put(CoreAttributes.FILENAME.key(), "secondFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertTransferCount(PutFile.REL_FAILURE, 1); + runner.assertPenalizeCount(1); + } + + @Test + public void testReplaceAndMaxFileLimitReach() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath()); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION); + runner.setProperty(PutFile.MAX_DESTINATION_FILES, "1"); + + Map<String,String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + + //Second file + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Another file".getBytes(), attributes); + runner.run(); + runner.assertTransferCount(FetchFile.REL_SUCCESS, 2); + File dir = new File(TARGET_DIRECTORY); + assertEquals(1, dir.list().length); + targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + content = Files.readAllBytes(targetPath); + assertEquals("Another file", new String(content)); + } + +}