This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new cb72cfde8a NIFI-13324 Set FlowFile attributes for Python Processors on 
failure
cb72cfde8a is described below

commit cb72cfde8a2640c1beb1270f96073928b817e497
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Mon Jun 10 10:59:41 2024 -0400

    NIFI-13324 Set FlowFile attributes for Python Processors on failure
    
    In case a Python Processor routes a FlowFile to failure and returns 
attributes, add those attributes to the 'original' FlowFile before routing to 
'failure'
    
    This closes #8943
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../python/processor/FlowFileTransformProxy.java   |  7 +++++-
 .../python/processor/RecordTransformProxy.java     |  6 ++++-
 .../PythonControllerInteractionIT.java             | 23 ++++++++---------
 .../resources/extensions/FailWithAttributes.py     | 29 ++++++++++++++++++++++
 4 files changed, 50 insertions(+), 15 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
index 073158f59b..4d654d7c4e 100644
--- 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
@@ -62,13 +62,18 @@ public class FlowFileTransformProxy extends 
PythonProcessorProxy<FlowFileTransfo
         try {
             final String relationshipName = result.getRelationship();
             final Relationship relationship = new 
Relationship.Builder().name(relationshipName).build();
+            final Map<String, String> attributes = result.getAttributes();
+
             if (REL_FAILURE.getName().equals(relationshipName)) {
                 session.remove(transformed);
+                if (attributes != null) {
+                    original = session.putAllAttributes(original, attributes);
+                }
+
                 session.transfer(original, REL_FAILURE);
                 return;
             }
 
-            final Map<String, String> attributes = result.getAttributes();
             if (attributes != null) {
                 transformed = session.putAllAttributes(transformed, 
attributes);
             }
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
index 00389f5b61..91d774e130 100644
--- 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.python.processor;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.NullSuppression;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -264,13 +265,16 @@ public class RecordTransformProxy extends 
PythonProcessorProxy<RecordTransform>
             final FlowFile destinationFlowFile = 
session.create(originalFlowFile);
 
             final RecordSetWriter writer;
+            OutputStream out = null;
             try {
-                final OutputStream out = session.write(destinationFlowFile);
+                out = session.write(destinationFlowFile);
                 final Map<String, String> originalAttributes = 
originalFlowFile.getAttributes();
                 final RecordSchema writeSchema = 
writerFactory.getSchema(originalAttributes, transformed.getSchema());
                 writer = writerFactory.createWriter(getLogger(), writeSchema, 
out, originalAttributes);
                 writer.beginRecordSet();
             } catch (final Exception e) {
+                // If we failed to create the RecordSetWriter, ensure that we 
close the Output Stream
+                IOUtils.closeQuietly(out);
                 session.remove(destinationFlowFile);
                 throw e;
             }
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
index 1303b98ac9..f84ad13b20 100644
--- 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
@@ -30,10 +30,6 @@ import 
org.apache.nifi.python.PythonBridgeInitializationContext;
 import org.apache.nifi.python.PythonProcessConfig;
 import org.apache.nifi.python.PythonProcessorDetails;
 import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -52,7 +48,6 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -559,16 +554,18 @@ public class PythonControllerInteractionIT {
         runner.assertTransferCount("success", 7);
     }
 
-    private RecordSchema createSimpleRecordSchema(final List<String> 
fieldNames) {
-        final List<RecordField> recordFields = new ArrayList<>();
-        for (final String fieldName : fieldNames) {
-            recordFields.add(new RecordField(fieldName, 
RecordFieldType.STRING.getDataType(), true));
-        }
 
-        final RecordSchema schema = new SimpleRecordSchema(recordFields);
-        return schema;
-    }
+    @Test
+    public void testRouteToFailureWithAttributes() {
+        final TestRunner runner = 
createFlowFileTransform("FailWithAttributes");
+        runner.enqueue("Hello, World");
+        runner.run();
 
+        runner.assertAllFlowFilesTransferred("failure", 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship("failure").getFirst();
+        out.assertAttributeEquals("number", "1");
+        out.assertAttributeEquals("failureReason", "Intentional failure of 
unit test");
+    }
 
     public interface StringLookupService extends ControllerService {
         Optional<String> lookup(Map<String, String> coordinates);
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/FailWithAttributes.py
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/FailWithAttributes.py
new file mode 100644
index 0000000000..3ea1afab5b
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/FailWithAttributes.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+
+class FailWithAttributes(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+    class ProcessorDetails:
+        version = '0.0.1-SNAPSHOT'
+        description = 'Routes a FlowFile to failure and adds attributes to it.'
+
+    def __init__(self, **kwargs):
+        pass
+
+    def transform(self, context, flowFile):
+        return FlowFileTransformResult(relationship="failure", 
attributes={"number": "1", "failureReason": "Intentional failure of unit test"})

Reply via email to