Repository: nifi Updated Branches: refs/heads/0.x 58868abad -> fd7462b65
NIFI-2611: Fixing bugs in UnpackContent with flow file unpackers Signed-off-by: Mike Moser <[email protected]> This closes #905 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fd7462b6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fd7462b6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fd7462b6 Branch: refs/heads/0.x Commit: fd7462b6532728373071314d44171be92762dc43 Parents: 58868ab Author: Joe Gresock <[email protected]> Authored: Sun Aug 21 16:35:39 2016 +0000 Committer: Mike Moser <[email protected]> Committed: Mon Aug 22 14:44:42 2016 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/UnpackContent.java | 52 +++++++++--------- .../processors/standard/TestUnpackContent.java | 56 +++++++++++--------- 2 files changed, 57 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fd7462b6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index c9c8191..45e17c1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -102,6 +102,13 @@ public class UnpackContent extends AbstractProcessor { public static final String FRAGMENT_COUNT = "fragment.count"; public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename"; + public static final String AUTO_DETECT_FORMAT_NAME = "use mime.type attribute"; + public static final String TAR_FORMAT_NAME = "tar"; + public static final String ZIP_FORMAT_NAME = "zip"; + public static final String FLOWFILE_STREAM_FORMAT_V3_NAME = "flowfile-stream-v3"; + public static final String FLOWFILE_STREAM_FORMAT_V2_NAME = "flowfile-stream-v2"; + public static final String FLOWFILE_TAR_FORMAT_NAME = "flowfile-tar-v1"; + public static final String OCTET_STREAM = "application/octet-stream"; public static final PropertyDescriptor PACKAGING_FORMAT = new PropertyDescriptor.Builder() @@ -144,9 +151,6 @@ public class UnpackContent extends AbstractProcessor { private Unpacker tarUnpacker; private Unpacker zipUnpacker; - private Unpacker flowFileStreamV3Unpacker; - private Unpacker flowFileStreamV2Unpacker; - private Unpacker flowFileTarUnpacker; @Override protected void init(final ProcessorInitializationContext context) { @@ -184,14 +188,6 @@ public class UnpackContent extends AbstractProcessor { fileFilter = Pattern.compile(context.getProperty(FILE_FILTER).getValue()); tarUnpacker = new TarUnpacker(fileFilter); zipUnpacker = new ZipUnpacker(fileFilter); - flowFileStreamV3Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3()); - flowFileStreamV2Unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); - flowFileTarUnpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1()); - } - - PackageFormat format = PackageFormat.getFormat(context.getProperty(PACKAGING_FORMAT).getValue()); - if (format != PackageFormat.AUTO_DETECT_FORMAT && unpacker == null) { - initUnpacker(format); } } @@ -207,15 +203,15 @@ public class UnpackContent extends AbstractProcessor { addFragmentAttrs = true; break; case FLOWFILE_STREAM_FORMAT_V2: - unpacker = flowFileStreamV2Unpacker; + unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); addFragmentAttrs = false; break; case FLOWFILE_STREAM_FORMAT_V3: - unpacker = flowFileStreamV3Unpacker; + unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3()); addFragmentAttrs = false; break; case FLOWFILE_TAR_FORMAT: - unpacker = flowFileTarUnpacker; + unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1()); addFragmentAttrs = false; break; case AUTO_DETECT_FORMAT: @@ -254,6 +250,8 @@ public class UnpackContent extends AbstractProcessor { } else { initUnpacker(packagingFormat); } + } else { + initUnpacker(packagingFormat); } final List<FlowFile> unpacked = new ArrayList<>(); @@ -495,13 +493,13 @@ public class UnpackContent extends AbstractProcessor { } protected enum PackageFormat { - AUTO_DETECT_FORMAT("use mime.type attribute"), - TAR_FORMAT("tar", "application/tar"), - X_TAR_FORMAT("tar", "application/x-tar"), - ZIP_FORMAT("zip", "application/zip"), - FLOWFILE_STREAM_FORMAT_V3("flowfile-stream-v3", "application/flowfile-v3"), - FLOWFILE_STREAM_FORMAT_V2("flowfile-stream-v2", "application/flowfile-v2"), - FLOWFILE_TAR_FORMAT("flowfile-tar-v1", "application/flowfile-v1"); + AUTO_DETECT_FORMAT(AUTO_DETECT_FORMAT_NAME), + TAR_FORMAT(TAR_FORMAT_NAME, "application/tar"), + X_TAR_FORMAT(TAR_FORMAT_NAME, "application/x-tar"), + ZIP_FORMAT(ZIP_FORMAT_NAME, "application/zip"), + FLOWFILE_STREAM_FORMAT_V3(FLOWFILE_STREAM_FORMAT_V3_NAME, "application/flowfile-v3"), + FLOWFILE_STREAM_FORMAT_V2(FLOWFILE_STREAM_FORMAT_V2_NAME, "application/flowfile-v2"), + FLOWFILE_TAR_FORMAT(FLOWFILE_TAR_FORMAT_NAME, "application/flowfile-v1"); private final String textValue; @@ -526,17 +524,17 @@ public class UnpackContent extends AbstractProcessor { public static PackageFormat getFormat(String textValue) { switch (textValue) { - case "use mime.type attribute": + case AUTO_DETECT_FORMAT_NAME: return AUTO_DETECT_FORMAT; - case "tar": + case TAR_FORMAT_NAME: return TAR_FORMAT; - case "zip": + case ZIP_FORMAT_NAME: return ZIP_FORMAT; - case "flowfile-stream-v3": + case FLOWFILE_STREAM_FORMAT_V3_NAME: return FLOWFILE_STREAM_FORMAT_V3; - case "flowfile-stream-v2": + case FLOWFILE_STREAM_FORMAT_V2_NAME: return FLOWFILE_STREAM_FORMAT_V2; - case "flowfile-stream-v1": + case FLOWFILE_TAR_FORMAT_NAME: return FLOWFILE_TAR_FORMAT; } return null; http://git-wip-us.apache.org/repos/asf/nifi/blob/fd7462b6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java index acebdea..c107e95 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java @@ -45,17 +45,18 @@ public class TestUnpackContent { unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.TAR_FORMAT.toString()); autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.tar")); + unpackRunner.enqueue(dataPath.resolve("data.tar")); Map<String, String> attributes = new HashMap<>(1); Map<String, String> attributes2 = new HashMap<>(1); attributes.put("mime.type", UnpackContent.PackageFormat.TAR_FORMAT.getMimeType()); attributes2.put("mime.type", UnpackContent.PackageFormat.X_TAR_FORMAT.getMimeType()); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2); - unpackRunner.run(); + unpackRunner.run(2); autoUnpackRunner.run(2); - unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); - unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); + unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); @@ -82,17 +83,18 @@ public class TestUnpackContent { autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$"); unpackRunner.enqueue(dataPath.resolve("data.tar")); + unpackRunner.enqueue(dataPath.resolve("data.tar")); Map<String, String> attributes = new HashMap<>(1); Map<String, String> attributes2 = new HashMap<>(1); attributes.put("mime.type", "application/x-tar"); attributes2.put("mime.type", "application/tar"); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes); autoUnpackRunner.enqueue(dataPath.resolve("data.tar"), attributes2); - unpackRunner.run(); + unpackRunner.run(2); autoUnpackRunner.run(2); - unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); - unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); + unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); @@ -126,18 +128,20 @@ public class TestUnpackContent { unpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.ZIP_FORMAT.toString()); autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); unpackRunner.enqueue(dataPath.resolve("data.zip")); + unpackRunner.enqueue(dataPath.resolve("data.zip")); Map<String, String> attributes = new HashMap<>(1); attributes.put("mime.type", "application/zip"); autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes); - unpackRunner.run(); - autoUnpackRunner.run(); + autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes); + unpackRunner.run(2); + autoUnpackRunner.run(2); - unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); - unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); + unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); - autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); - autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); @@ -160,18 +164,20 @@ public class TestUnpackContent { autoUnpackRunner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.AUTO_DETECT_FORMAT.toString()); autoUnpackRunner.setProperty(UnpackContent.FILE_FILTER, "^folder/cal.txt$"); unpackRunner.enqueue(dataPath.resolve("data.zip")); + unpackRunner.enqueue(dataPath.resolve("data.zip")); Map<String, String> attributes = new HashMap<>(1); attributes.put("mime.type", "application/zip"); autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes); - unpackRunner.run(); - autoUnpackRunner.run(); + autoUnpackRunner.enqueue(dataPath.resolve("data.zip"), attributes); + unpackRunner.run(2); + autoUnpackRunner.run(2); - unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); - unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); + unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); - autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 1); - autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); + autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); @@ -199,11 +205,12 @@ public class TestUnpackContent { final TestRunner runner = TestRunners.newTestRunner(new UnpackContent()); runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V3.toString()); runner.enqueue(dataPath.resolve("data.flowfilev3")); + runner.enqueue(dataPath.resolve("data.flowfilev3")); - runner.run(); + runner.run(2); - runner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); - runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); + runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); runner.assertTransferCount(UnpackContent.REL_FAILURE, 0); final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); @@ -222,11 +229,12 @@ public class TestUnpackContent { final TestRunner runner = TestRunners.newTestRunner(new UnpackContent()); runner.setProperty(UnpackContent.PACKAGING_FORMAT, UnpackContent.PackageFormat.FLOWFILE_STREAM_FORMAT_V2.toString()); runner.enqueue(dataPath.resolve("data.flowfilev2")); + runner.enqueue(dataPath.resolve("data.flowfilev2")); - runner.run(); + runner.run(2); - runner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); - runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); + runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); runner.assertTransferCount(UnpackContent.REL_FAILURE, 0); final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
