This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new cb72cfde8a NIFI-13324 Set FlowFile attributes for Python Processors on failure cb72cfde8a is described below commit cb72cfde8a2640c1beb1270f96073928b817e497 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Mon Jun 10 10:59:41 2024 -0400 NIFI-13324 Set FlowFile attributes for Python Processors on failure In case a Python Processor routes a FlowFile to failure and returns attributes, add those attributes to the 'original' FlowFile before routing to 'failure' This closes #8943 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../python/processor/FlowFileTransformProxy.java | 7 +++++- .../python/processor/RecordTransformProxy.java | 6 ++++- .../PythonControllerInteractionIT.java | 23 ++++++++--------- .../resources/extensions/FailWithAttributes.py | 29 ++++++++++++++++++++++ 4 files changed, 50 insertions(+), 15 deletions(-) diff --git a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java index 073158f59b..4d654d7c4e 100644 --- a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java +++ b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java @@ -62,13 +62,18 @@ public class FlowFileTransformProxy extends PythonProcessorProxy<FlowFileTransfo try { final String relationshipName = result.getRelationship(); final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); + final Map<String, String> attributes = result.getAttributes(); + if (REL_FAILURE.getName().equals(relationshipName)) { session.remove(transformed); + if (attributes != null) { + original = session.putAllAttributes(original, attributes); + } + session.transfer(original, REL_FAILURE); return; } - final Map<String, String> attributes = result.getAttributes(); if (attributes != null) { transformed = session.putAllAttributes(transformed, attributes); } diff --git a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java index 00389f5b61..91d774e130 100644 --- a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java +++ b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java @@ -17,6 +17,7 @@ package org.apache.nifi.python.processor; +import org.apache.commons.io.IOUtils; import org.apache.nifi.NullSuppression; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -264,13 +265,16 @@ public class RecordTransformProxy extends PythonProcessorProxy<RecordTransform> final FlowFile destinationFlowFile = session.create(originalFlowFile); final RecordSetWriter writer; + OutputStream out = null; try { - final OutputStream out = session.write(destinationFlowFile); + out = session.write(destinationFlowFile); final Map<String, String> originalAttributes = originalFlowFile.getAttributes(); final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, transformed.getSchema()); writer = writerFactory.createWriter(getLogger(), writeSchema, out, originalAttributes); writer.beginRecordSet(); } catch (final Exception e) { + // If we failed to create the RecordSetWriter, ensure that we close the Output Stream + IOUtils.closeQuietly(out); session.remove(destinationFlowFile); throw e; } diff --git a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java index 1303b98ac9..f84ad13b20 100644 --- a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java +++ b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java @@ -30,10 +30,6 @@ import org.apache.nifi.python.PythonBridgeInitializationContext; import org.apache.nifi.python.PythonProcessConfig; import org.apache.nifi.python.PythonProcessorDetails; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -52,7 +48,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -559,16 +554,18 @@ public class PythonControllerInteractionIT { runner.assertTransferCount("success", 7); } - private RecordSchema createSimpleRecordSchema(final List<String> fieldNames) { - final List<RecordField> recordFields = new ArrayList<>(); - for (final String fieldName : fieldNames) { - recordFields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType(), true)); - } - final RecordSchema schema = new SimpleRecordSchema(recordFields); - return schema; - } + @Test + public void testRouteToFailureWithAttributes() { + final TestRunner runner = createFlowFileTransform("FailWithAttributes"); + runner.enqueue("Hello, World"); + runner.run(); + runner.assertAllFlowFilesTransferred("failure", 1); + final MockFlowFile out = runner.getFlowFilesForRelationship("failure").getFirst(); + out.assertAttributeEquals("number", "1"); + out.assertAttributeEquals("failureReason", "Intentional failure of unit test"); + } public interface StringLookupService extends ControllerService { Optional<String> lookup(Map<String, String> coordinates); diff --git a/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/FailWithAttributes.py b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/FailWithAttributes.py new file mode 100644 index 0000000000..3ea1afab5b --- /dev/null +++ b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/FailWithAttributes.py @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult + +class FailWithAttributes(FlowFileTransform): + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileTransform'] + class ProcessorDetails: + version = '0.0.1-SNAPSHOT' + description = 'Routes a FlowFile to failure and adds attributes to it.' + + def __init__(self, **kwargs): + pass + + def transform(self, context, flowFile): + return FlowFileTransformResult(relationship="failure", attributes={"number": "1", "failureReason": "Intentional failure of unit test"})