This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new e93586df11 NIFI-12038 This closes #7760. introduce PackageFlowFile processor e93586df11 is described below commit e93586df11dd555d264a6c879cbb13c07ace7cf9 Author: Mike Moser <mose...@apache.org> AuthorDate: Thu Sep 14 19:48:09 2023 +0000 NIFI-12038 This closes #7760. introduce PackageFlowFile processor Signed-off-by: Joseph Witt <joew...@apache.org> --- .../org/apache/nifi/util/FlowFilePackagerV3.java | 2 +- .../apache/nifi/util/TestPackageUnpackageV3.java | 2 + .../nifi/processors/standard/PackageFlowFile.java | 134 ++++++++++++++++++++ .../services/org.apache.nifi.processor.Processor | 1 + .../processors/standard/TestPackageFlowFile.java | 139 +++++++++++++++++++++ 5 files changed, 277 insertions(+), 1 deletion(-) diff --git a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java index 181f3e3079..4487ed69fb 100644 --- a/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java +++ b/nifi-commons/nifi-flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java @@ -37,7 +37,7 @@ public class FlowFilePackagerV3 implements FlowFilePackager { writeFieldLength(out, attributes.size()); //write out the number of attributes for (final Map.Entry<String, String> entry : attributes.entrySet()) { //write out each attribute key/value pair writeString(entry.getKey(), out); - writeString(entry.getValue(), out); + writeString(entry.getValue() == null ? "" : entry.getValue(), out); } } diff --git a/nifi-commons/nifi-flowfile-packager/src/test/java/org/apache/nifi/util/TestPackageUnpackageV3.java b/nifi-commons/nifi-flowfile-packager/src/test/java/org/apache/nifi/util/TestPackageUnpackageV3.java index 1bd8f09821..cd6a1a523d 100644 --- a/nifi-commons/nifi-flowfile-packager/src/test/java/org/apache/nifi/util/TestPackageUnpackageV3.java +++ b/nifi-commons/nifi-flowfile-packager/src/test/java/org/apache/nifi/util/TestPackageUnpackageV3.java @@ -38,6 +38,7 @@ public class TestPackageUnpackageV3 { final byte[] data = "Hello, World!".getBytes(StandardCharsets.UTF_8); final Map<String, String> map = new HashMap<>(); map.put("abc", "cba"); + map.put("123", null); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayInputStream in = new ByteArrayInputStream(data); @@ -49,6 +50,7 @@ public class TestPackageUnpackageV3 { final Map<String, String> unpackagedAttributes = unpackager.unpackageFlowFile(encodedIn, decodedOut); final byte[] decoded = decodedOut.toByteArray(); + map.put("123", ""); // replace null attribute for verification, because it is packaged as empty string assertEquals(map, unpackagedAttributes); assertArrayEquals(data, decoded); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java new file mode 100644 index 0000000000..3e589a7104 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.StandardFlowFileMediaType; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.FlowFilePackager; +import org.apache.nifi.util.FlowFilePackagerV3; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +@SideEffectFree +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"flowfile", "flowfile-stream", "flowfile-stream-v3", "package", "attributes"}) +@CapabilityDescription("This processor will package FlowFile attributes and content into an output FlowFile that can be exported from NiFi" + + " and imported back into NiFi, preserving the original attributes and content.") +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "The mime.type will be changed to application/flowfile-v3") +}) +@SeeAlso({UnpackContent.class, MergeContent.class}) +public class PackageFlowFile extends AbstractProcessor { + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("max-batch-size") + .displayName("Maximum Batch Size") + .description("Maximum number of FlowFiles to package into one output FlowFile using a best effort, non guaranteed approach." + + " Multiple input queues can produce unexpected batching behavior.") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.createLongValidator(1, 10_000, true)) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The packaged FlowFile is sent to this relationship") + .build(); + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The FlowFiles that were used to create the package are sent to this relationship") + .build(); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet( + new LinkedHashSet<>(Arrays.asList( + REL_SUCCESS, + REL_ORIGINAL + ))); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList( + Arrays.asList( + BATCH_SIZE + )); + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger()); + if (flowFiles.isEmpty()) { + return; + } + + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), StandardFlowFileMediaType.VERSION_3.getMediaType()); + + final FlowFilePackager packager = new FlowFilePackagerV3(); + + FlowFile packagedFlowFile = session.create(flowFiles); + packagedFlowFile = session.write(packagedFlowFile, out -> { + try (final OutputStream bufferedOut = new BufferedOutputStream(out)) { + for (final FlowFile flowFile : flowFiles) { + session.read(flowFile, in -> { + try (final InputStream bufferedIn = new BufferedInputStream(in)) { + packager.packageFlowFile(bufferedIn, bufferedOut, flowFile.getAttributes(), flowFile.getSize()); + } + }); + } + } + }); + + packagedFlowFile = session.putAllAttributes(packagedFlowFile, attributes); + session.transfer(packagedFlowFile, REL_SUCCESS); + session.transfer(flowFiles, REL_ORIGINAL); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 6c0949ba8e..f3baf43949 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -79,6 +79,7 @@ org.apache.nifi.processors.standard.MergeRecord org.apache.nifi.processors.standard.ModifyBytes org.apache.nifi.processors.standard.MonitorActivity org.apache.nifi.processors.standard.Notify +org.apache.nifi.processors.standard.PackageFlowFile org.apache.nifi.processors.standard.ParseCEF org.apache.nifi.processors.standard.ParseSyslog org.apache.nifi.processors.standard.ParseSyslog5424 diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPackageFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPackageFlowFile.java new file mode 100644 index 0000000000..7a2992b3ae --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPackageFlowFile.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.StandardFlowFileMediaType; +import org.apache.nifi.util.FlowFileUnpackager; +import org.apache.nifi.util.FlowFileUnpackagerV3; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class TestPackageFlowFile { + + private static final String SAMPLE_CONTENT = "Hello, World!"; + private static final String SAMPLE_ATTR_FILENAME = "test.txt"; + private static final String SAMPLE_ATTR_MIME_TYPE = "text/plain"; + private static final String EXTRA_ATTR_KEY = "myAttribute"; + private static final String EXTRA_ATTR_VALUE = "my value"; + + @Test + public void testOne() throws IOException { + TestRunner runner = TestRunners.newTestRunner(new PackageFlowFile()); + Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), SAMPLE_ATTR_FILENAME); + attributes.put(CoreAttributes.MIME_TYPE.key(), SAMPLE_ATTR_MIME_TYPE); + attributes.put(EXTRA_ATTR_KEY, EXTRA_ATTR_VALUE); + + runner.enqueue(SAMPLE_CONTENT, attributes); + runner.run(); + + runner.assertTransferCount(PackageFlowFile.REL_SUCCESS, 1); + runner.assertTransferCount(PackageFlowFile.REL_ORIGINAL, 1); + final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(PackageFlowFile.REL_SUCCESS).get(0); + + // mime.type has changed + Assertions.assertEquals(StandardFlowFileMediaType.VERSION_3.getMediaType(), + outputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + // content can be unpacked with FlowFileUnpackagerV3 + FlowFileUnpackager unpackager = new FlowFileUnpackagerV3(); + try (ByteArrayInputStream bais = new ByteArrayInputStream(outputFlowFile.toByteArray()); + ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + Map<String, String> unpackedAttributes = unpackager.unpackageFlowFile(bais, baos); + // verify attributes in package + Assertions.assertEquals(5, unpackedAttributes.size()); + Assertions.assertNotNull(unpackedAttributes.get(CoreAttributes.UUID.key())); + Assertions.assertNotNull(unpackedAttributes.get(CoreAttributes.PATH.key())); + Assertions.assertEquals(SAMPLE_ATTR_FILENAME, unpackedAttributes.get(CoreAttributes.FILENAME.key())); + Assertions.assertEquals(SAMPLE_ATTR_MIME_TYPE, unpackedAttributes.get(CoreAttributes.MIME_TYPE.key())); + Assertions.assertEquals(EXTRA_ATTR_VALUE, unpackedAttributes.get(EXTRA_ATTR_KEY)); + // verify content in package + Assertions.assertArrayEquals(SAMPLE_CONTENT.getBytes(), baos.toByteArray()); + } + } + + @Test + public void testMany() throws IOException { + int FILE_COUNT = 10; + TestRunner runner = TestRunners.newTestRunner(new PackageFlowFile()); + runner.setProperty(PackageFlowFile.BATCH_SIZE, Integer.toString(FILE_COUNT)); + Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), SAMPLE_ATTR_MIME_TYPE); + + for (int i = 0; i < FILE_COUNT; i++) { + attributes.put(CoreAttributes.FILENAME.key(), i + SAMPLE_ATTR_FILENAME); + runner.enqueue(SAMPLE_CONTENT, attributes); + } + runner.run(); + + runner.assertTransferCount(PackageFlowFile.REL_SUCCESS, 1); + runner.assertTransferCount(PackageFlowFile.REL_ORIGINAL, FILE_COUNT); + final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(PackageFlowFile.REL_SUCCESS).get(0); + + // mime.type has changed + Assertions.assertEquals(StandardFlowFileMediaType.VERSION_3.getMediaType(), + outputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + // content can be unpacked with FlowFileUnpackagerV3 + FlowFileUnpackager unpackager = new FlowFileUnpackagerV3(); + try (ByteArrayInputStream bais = new ByteArrayInputStream(outputFlowFile.toByteArray()); + ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + for (int i = 0; i < FILE_COUNT; i++) { + Map<String, String> unpackedAttributes = unpackager.unpackageFlowFile(bais, baos); + // verify attributes in package + Assertions.assertEquals(4, unpackedAttributes.size()); + Assertions.assertNotNull(unpackedAttributes.get(CoreAttributes.UUID.key())); + Assertions.assertNotNull(unpackedAttributes.get(CoreAttributes.PATH.key())); + Assertions.assertEquals(i + SAMPLE_ATTR_FILENAME, unpackedAttributes.get(CoreAttributes.FILENAME.key())); + Assertions.assertEquals(SAMPLE_ATTR_MIME_TYPE, unpackedAttributes.get(CoreAttributes.MIME_TYPE.key())); + // verify content in package + Assertions.assertArrayEquals(SAMPLE_CONTENT.getBytes(), baos.toByteArray()); + baos.reset(); + } + } + } + + @Test + public void testBatchSize() throws IOException { + int FILE_COUNT = 10; + int BATCH_SIZE = 2; + TestRunner runner = TestRunners.newTestRunner(new PackageFlowFile()); + runner.setProperty(PackageFlowFile.BATCH_SIZE, Integer.toString(BATCH_SIZE)); + Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), SAMPLE_ATTR_MIME_TYPE); + + for (int i = 0; i < FILE_COUNT; i++) { + attributes.put(CoreAttributes.FILENAME.key(), i + SAMPLE_ATTR_FILENAME); + runner.enqueue(SAMPLE_CONTENT, attributes); + } + runner.run(); + + runner.assertTransferCount(PackageFlowFile.REL_SUCCESS, 1); + runner.assertTransferCount(PackageFlowFile.REL_ORIGINAL, BATCH_SIZE); + runner.assertQueueNotEmpty(); + } +}