Repository: nifi Updated Branches: refs/heads/master 50010fb34 -> c13cfa6ea
NIFI-1322 fixed breaking changes introduced in previous commit This closes #1181 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c13cfa6e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c13cfa6e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c13cfa6e Branch: refs/heads/master Commit: c13cfa6ea6009070db74d7dd9be9c66703d56942 Parents: a7d0641 Author: Oleg Zhurakousky <o...@suitcase.io> Authored: Fri Nov 4 08:22:24 2016 -0400 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Fri Nov 4 08:23:10 2016 -0400 ---------------------------------------------------------------------- .../apache/nifi/processors/hadoop/PutHDFS.java | 38 +++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c13cfa6e/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index cb49d59..90b25e0 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -75,15 +75,19 @@ import java.util.concurrent.TimeUnit; @SeeAlso(GetHDFS.class) public class PutHDFS extends AbstractHadoopProcessor { - public static final String REPLACE = "replace"; - public static final String IGNORE = "ignore"; - public static final String FAIL = "fail"; - public static final String APPEND = "append"; - - public static final AllowableValue REPLACE_RESOLUTION = new AllowableValue(REPLACE, REPLACE, "Replaces the existing file if any."); - public static final AllowableValue IGNORE_RESOLUTION = new AllowableValue(IGNORE, IGNORE, "Ignores the flow file and routes it to success."); - public static final AllowableValue FAIL_RESOLUTION = new AllowableValue(FAIL, FAIL, "Penalizes the flow file and routes it to failure."); - public static final AllowableValue APPEND_RESOLUTION = new AllowableValue(APPEND, APPEND, "Appends to the existing file if any, creates a new file otherwise."); + public static final String REPLACE_RESOLUTION = "replace"; + public static final String IGNORE_RESOLUTION = "ignore"; + public static final String FAIL_RESOLUTION = "fail"; + public static final String APPEND_RESOLUTION = "append"; + + public static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION, + REPLACE_RESOLUTION, "Replaces the existing file if any."); + public static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION, + "Ignores the flow file and routes it to success."); + public static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION, + "Penalizes the flow file and routes it to failure."); + public static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION, + "Appends to the existing file if any, creates a new file otherwise."); public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; public static final int BUFFER_SIZE_DEFAULT = 4096; @@ -108,8 +112,8 @@ public class PutHDFS extends AbstractHadoopProcessor { .name("Conflict Resolution Strategy") .description("Indicates what should happen when a file with the same name already exists in the output directory") .required(true) - .defaultValue(FAIL_RESOLUTION.getValue()) - .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION, APPEND_RESOLUTION) +.defaultValue(FAIL_RESOLUTION_AV.getValue()) + .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV) .build(); public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder() @@ -258,18 +262,18 @@ public class PutHDFS extends AbstractHadoopProcessor { // If destination file already exists, resolve that based on processor configuration if (destinationExists) { switch (conflictResponse) { - case REPLACE: + case REPLACE_RESOLUTION: if (hdfs.delete(copyFile, false)) { getLogger().info("deleted {} in order to replace with the contents of {}", new Object[]{copyFile, flowFile}); } break; - case IGNORE: + case IGNORE_RESOLUTION: session.transfer(flowFile, REL_SUCCESS); getLogger().info("transferring {} to success because file with same name already exists", new Object[]{flowFile}); return; - case FAIL: + case FAIL_RESOLUTION: flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); getLogger().warn("penalizing {} and routing to failure because file with same name already exists", @@ -289,7 +293,7 @@ public class PutHDFS extends AbstractHadoopProcessor { OutputStream fos = null; Path createdFile = null; try { - if(conflictResponse.equals(APPEND_RESOLUTION.getValue()) && destinationExists) { + if (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) { fos = hdfs.append(copyFile, bufferSize); } else { fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize); @@ -328,8 +332,8 @@ public class PutHDFS extends AbstractHadoopProcessor { final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); tempDotCopyFile = tempCopyFile; - if(!conflictResponse.equals(APPEND_RESOLUTION.getValue()) - || (conflictResponse.equals(APPEND_RESOLUTION.getValue()) && !destinationExists)) { + if (!conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) + || (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && !destinationExists)) { boolean renamed = false; for (int i = 0; i < 10; i++) { // try to rename multiple times. if (hdfs.rename(tempCopyFile, copyFile)) {