This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3efb0763da NIFI-13427 Added FlowFileSource interface for Python 
Processors
3efb0763da is described below

commit 3efb0763da4b2cb123011c46d2dccb1b1786e8af
Author: Peter Gyori <pgy...@apache.org>
AuthorDate: Wed Jun 19 10:29:14 2024 +0200

    NIFI-13427 Added FlowFileSource interface for Python Processors
    
    This closes #9000
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../src/main/asciidoc/python-developer-guide.adoc  | 58 ++++++++++++--
 .../org/apache/nifi/py4j/StandardPythonBridge.java |  5 ++
 .../nifi/python/processor/FlowFileSource.java      | 24 ++++++
 .../nifi/python/processor/FlowFileSourceProxy.java | 92 ++++++++++++++++++++++
 .../python/processor/FlowFileSourceResult.java     | 31 ++++++++
 .../PythonControllerInteractionIT.java             | 45 +++++++++++
 .../src/main/python/src/nifiapi/flowfilesource.py  | 65 +++++++++++++++
 .../src/main/python/framework/ExtensionManager.py  |  4 +-
 .../main/python/framework/ProcessorInspection.py   |  4 +-
 .../main/resources/extensions/CreateFlowFile.py    | 57 ++++++++++++++
 .../tests/system/python/PythonProcessorIT.java     | 36 ++++++++-
 11 files changed, 413 insertions(+), 8 deletions(-)

diff --git a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc 
b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
index 3adbec6eaa..e96ea33867 100644
--- a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
@@ -122,7 +122,7 @@ Processors such as ExecuteScript) tend to be around data 
manipulation and/or com
     - Calls from Python to Java (and vice versa) are far more expensive than 
native method calls. Having APIs that are more tailored toward
 specific use cases allows for fewer interactions between the two processes, 
which greatly improves performance.
 
-As a result, the Python API consists of two different Processor classes that 
can be implemented: `FlowFileTransform` and `RecordTransform`.
+As a result, the Python API consists of three different Processor classes that 
can be implemented: `FlowFileTransform`, `RecordTransform` and `FlowFileSource`.
 Others may emerge in the future.
 
 
@@ -283,6 +283,48 @@ If the partition has more than one field in the 
dictionary, all fields in the di
 the Records to be written to the same output FlowFile.
 
 
+[[flowfile-source]]
+=== FlowFileSource
+
+The `FlowFileSource` API provides a mechanism for creating a FlowFile and 
routing it based on its textual or binary contents.
+
+In order to implement the `FlowFileSource` API, a Python class must extend 
from the `nifiapi.FlowFileSource` class
+and implement the `create(ProcessContext)` method, which returns a 
`FlowFileSourceResult`. Notice, that the difference between
+`FlowFileSource's create(ProcessContext)` and `FlowFileTransform's 
transform(ProcessContext, InputFlowFile)` methods is
+that the former does not expect an InputFlowFile object. That is because 
processors based on the `FlowFileSource` API
+are "source" processors that do not accept incoming connections but are 
capable of creating FlowFiles themselves.
+
+Implementing a Processor based on `FlowFileSource` is very similar to 
implementing one based on `FlowFileTransform`.
+A simple implementation looks like this:
+
+----
+from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
+
+class CreateFlowFile(FlowFileSource):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileSource']
+
+    class ProcessorDetails:
+        version = '0.0.1-SNAPSHOT'
+        description = '''A Python processor that creates FlowFiles.'''
+
+    def __init__(self, **kwargs):
+        pass
+
+    def create(self, context):
+        return FlowFileSourceResult(relationship = 'success', attributes = 
{'greeting': 'hello'}, contents = 'Hello World!')
+----
+
+As mentioned above, the `create` method only takes one argument: the context 
(of type `nifiapi.properties.ProcessContext`).
+
+The return type is a `FlowFileSourceResult` that indicates which Relationship 
the FlowFile should be transferred to,
+any attributes that should be added to the FlowFile and the contents of the 
FlowFile. The `relationship` is a required argument.
+Each processor based on the `FlowFileSource` API has a `success` relationship 
and additional relationships can be
+created in the Processor's Python code. `attributes` and `contents` are both 
optional. If `attributes` is not provided,
+the FlowFile will still have the usual `filename`, `path` and `uuid` 
attributes, but no additional ones.
+If `contents` is not provided, a FlowFile with no contents (only attributes) 
will be created.
+
+
 
 [[property-descriptors]]
 === PropertyDescriptors
@@ -295,7 +337,7 @@ A `PropertyDescriptor` is created using the 
`nifiapi.properties.PropertyDescript
 arguments: `name` and `description`. All other arguments are optional.
 
 Typically, a Processor will have multiple Property Descriptors. These 
descriptors are then returned to the NiFi framework by implementing the 
following
-method in the Processor (regardless of whether it is a `FlowFileTransform` or 
a `RecordTransform`):
+method in the Processor (regardless of whether it is a `FlowFileTransform`, a 
`RecordTransform` or a `FlowFileSource`):
 ----
 def getPropertyDescriptors(self)
 ----
@@ -351,7 +393,7 @@ invalid. Of course, we might also specify explicit 
validators that can be used,
 Each Processor in NiFi must route its outgoing data to some destination. In 
NiFi, those destinations are called "Relationships."
 Each Processor is responsible for declaring its Relationships.
 
-Both the FlowFileTransform an RecordTransform Processors already have a 
Relationship named `original` and one named `failure.`
+Both the FlowFileTransform and RecordTransform Processors already have a 
Relationship named `original` and one named `failure.`
 The `original` relationship should not be used by implementations. This is 
used only by the framework and allows the input FlowFile
 to be passed on without modification. If the Processor cannot transform its 
input (because the data is not valid, for example),
 the Processor may route the data to the `failure` relationship.
@@ -365,6 +407,11 @@ This method returns a list or a set of 
`nifiapi.relationship.Relationship` objec
 Relationship will not automatically be made available. It will need to be 
created and returned within this list, if it is to be used.
 Regardless of which Relationships are exposed by the implementation, the 
`failure` and `original` will always be made available.
 
+Unlike FlowFileTransform and RecordTransform Processors, FlowFileSource 
Processors only have a `success` relationship by default.
+Implementations can use this relationship to route the created FlowFiles. 
Additional relationships can be exposed by implementing
+the `getRelationships` method. In the case of FlowFileSource implementing 
`getRelationships` does not remove the `success` relationship.
+Any relationship returned by `getRelationships` appears besides the `success` 
relationship.
+
 
 [[inner-classes]]
 === ProcessorDetails and Java inner classes
@@ -630,8 +677,9 @@ For example, by default, 
`nifi.python.extensions.source.directory.default` is se
 in the property name with some other value.
 
 Any `.py` file found in the directory will be parsed and examined in order to 
determine whether or not it is a valid NiFi Processor.
-In order to be found, the Processor must have a valid parent 
(`FlowFileTransform` or `RecordTransform`) and must have an inner class named 
`Java`
-with a `implements = ['org.apache.nifi.python.processor.FlowFileTransform']` 
or `implements = ['org.apache.nifi.python.processor.RecordFileTransform']`.
+In order to be found, the Processor must have a valid parent 
(`FlowFileTransform`, `RecordTransform` or `FlowFileSource`) and must have an 
inner class named `Java`
+with a `implements = ['org.apache.nifi.python.processor.FlowFileTransform']` 
or `implements = ['org.apache.nifi.python.processor.RecordFileTransform']`
+or `implements = ['org.apache.nifi.python.processor.FlowFileSource']`.
 This will allow NiFi to automatically discover the Processor.
 
 Note, however, that if the Processor implementation is broken into multiple 
Python modules, those modules will not be made available by default. In order
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
index dc9c0451b7..7fa70c2a6d 100644
--- 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
@@ -27,6 +27,8 @@ import org.apache.nifi.python.PythonBridge;
 import org.apache.nifi.python.PythonBridgeInitializationContext;
 import org.apache.nifi.python.PythonProcessConfig;
 import org.apache.nifi.python.PythonProcessorDetails;
+import org.apache.nifi.python.processor.FlowFileSource;
+import org.apache.nifi.python.processor.FlowFileSourceProxy;
 import org.apache.nifi.python.processor.FlowFileTransform;
 import org.apache.nifi.python.processor.FlowFileTransformProxy;
 import org.apache.nifi.python.processor.PythonProcessorBridge;
@@ -145,6 +147,9 @@ public class StandardPythonBridge implements PythonBridge {
         if (RecordTransform.class.getName().equals(implementedInterface)) {
             return new RecordTransformProxy(type, processorBridgeFactory, 
initialize);
         }
+        if (FlowFileSource.class.getName().equals(implementedInterface)) {
+            return new FlowFileSourceProxy(type, processorBridgeFactory, 
initialize);
+        }
         return null;
     }
 
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSource.java
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSource.java
new file mode 100644
index 0000000000..c79af4ea8c
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.python.processor;
+
+public interface FlowFileSource extends PythonProcessor {
+
+    FlowFileSourceResult createFlowFile();
+
+}
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSourceProxy.java
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSourceProxy.java
new file mode 100644
index 0000000000..f21c5c69b2
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSourceProxy.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.python.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import py4j.Py4JNetworkException;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+public class FlowFileSourceProxy extends PythonProcessorProxy<FlowFileSource> {
+
+    protected static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+            .name("success")
+            .description("FlowFiles created by this processor can be routed to 
this relationship.")
+            .build();
+
+    private static final Set<Relationship> implicitRelationships = 
Set.of(REL_SUCCESS);
+
+    public FlowFileSourceProxy(final String processorType, final 
Supplier<PythonProcessorBridge> bridgeFactory, final boolean initialize) {
+        super(processorType, bridgeFactory, initialize);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFileSourceResult result;
+        try {
+            result = getTransform().createFlowFile();
+        } catch (final Py4JNetworkException e) {
+            throw new ProcessException("Failed to communicate with Python 
Process", e);
+        } catch (final Exception e) {
+            getLogger().error("Failed to create FlowFile {}", e);
+            return;
+        }
+
+        try {
+            final String relationshipName = result.getRelationship();
+            final Relationship relationship = new 
Relationship.Builder().name(relationshipName).build();
+            final Map<String, String> attributes = result.getAttributes();
+            final byte[] contents = result.getContents();
+
+            FlowFile output = createFlowFile(session, attributes, contents);
+
+            if (REL_SUCCESS.getName().equals(relationshipName)) {
+                session.transfer(output, REL_SUCCESS);
+            } else {
+                session.transfer(output, relationship);
+            }
+        } finally {
+            result.free();
+        }
+    }
+
+    protected FlowFile createFlowFile(final ProcessSession session, final 
Map<String, String> attributes, final byte[] contents) {
+        FlowFile flowFile = session.create();
+        if (attributes != null) {
+            flowFile = session.putAllAttributes(flowFile, attributes);
+        }
+        if (contents != null) {
+            flowFile = session.write(flowFile, out -> out.write(contents));
+        }
+        return flowFile;
+    }
+
+    @Override
+    protected Set<Relationship> getImplicitRelationships() {
+        return implicitRelationships;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSourceResult.java
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSourceResult.java
new file mode 100644
index 0000000000..9600bf6d86
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSourceResult.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.python.processor;
+
+import org.apache.nifi.python.PythonObjectProxy;
+
+import java.util.Map;
+
+public interface FlowFileSourceResult extends PythonObjectProxy {
+
+    String getRelationship();
+
+    byte[] getContents();
+
+    Map<String, String> getAttributes();
+}
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
index f84ad13b20..19688e99d2 100644
--- 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
@@ -567,6 +567,28 @@ public class PythonControllerInteractionIT {
         out.assertAttributeEquals("failureReason", "Intentional failure of 
unit test");
     }
 
+    @Test
+    public void testCreateFlowFile() throws IOException {
+        final String processorName = "CreateFlowFile";
+        final String propertyName = "FlowFile Contents";
+        final String relationshipSuccess = "success";
+        final String relationshipMultiline = "multiline";
+
+        final String singleLineContent = "Hello World!";
+        testSourceProcessor(processorName,
+                Map.of(propertyName, singleLineContent),
+                Map.of(relationshipSuccess, 1, relationshipMultiline, 0),
+                relationshipSuccess,
+                singleLineContent.getBytes(StandardCharsets.UTF_8));
+
+        final String multiLineContent = "Hello\nWorld!";
+        testSourceProcessor(processorName,
+                Map.of(propertyName, multiLineContent),
+                Map.of(relationshipSuccess, 0, relationshipMultiline, 1),
+                relationshipMultiline,
+                multiLineContent.getBytes(StandardCharsets.UTF_8));
+    }
+
     public interface StringLookupService extends ControllerService {
         Optional<String> lookup(Map<String, String> coordinates);
     }
@@ -624,4 +646,27 @@ public class PythonControllerInteractionIT {
         return runner;
     }
 
+    private void testSourceProcessor(final String processorName,
+                                     final Map<String, String> 
propertiesWithValues,
+                                     final Map<String, Integer> 
relationshipsWithFlowFileCounts,
+                                     final String expectedOuputRelationship,
+                                     final byte[] expectedContent) throws 
IOException {
+
+        final TestRunner runner = createProcessor(processorName);
+
+        propertiesWithValues.forEach((propertyName, propertyValue) -> {
+            runner.setProperty(propertyName, propertyValue);
+        });
+
+        waitForValid(runner);
+        runner.run();
+
+        relationshipsWithFlowFileCounts.forEach((relationship, count) -> {
+            runner.assertTransferCount(relationship, count);
+        });
+
+        final MockFlowFile output = 
runner.getFlowFilesForRelationship(expectedOuputRelationship).get(0);
+        output.assertContentEquals(expectedContent);
+    }
+
 }
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/flowfilesource.py
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/flowfilesource.py
new file mode 100644
index 0000000000..d5d88de21e
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/flowfilesource.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from nifiapi.__jvm__ import JvmHolder
+from nifiapi.properties import ProcessContext
+
+
+class FlowFileSource(ABC):
+    # These will be set by the PythonProcessorAdapter when the component is 
created
+    identifier = None
+    logger = None
+
+    def __init__(self):
+        self.arrayList = JvmHolder.jvm.java.util.ArrayList
+
+    def setContext(self, context):
+        self.process_context = ProcessContext(context)
+
+    def createFlowFile(self):
+        return self.create(self.process_context)
+
+    @abstractmethod
+    def create(self, context):
+        pass
+
+class FlowFileSourceResult:
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileSourceResult']
+
+    def __init__(self, relationship, attributes = None, contents = None):
+        self.relationship = relationship
+        self.attributes = attributes
+        if contents is not None and isinstance(contents, str):
+            self.contents = str.encode(contents)
+        else:
+            self.contents = contents
+
+    def getRelationship(self):
+        return self.relationship
+
+    def getContents(self):
+        return self.contents
+
+    def getAttributes(self):
+        if self.attributes is None:
+            return None
+
+        map = JvmHolder.jvm.java.util.HashMap()
+        for key, value in self.attributes.items():
+            map.put(key, value)
+
+        return map
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py
index ee2e28ef7d..c5a1e1f0a5 100644
--- 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py
@@ -53,7 +53,9 @@ class ExtensionManager:
     third-party dependencies have been imported.
     """
 
-    processor_interfaces = 
['org.apache.nifi.python.processor.FlowFileTransform', 
'org.apache.nifi.python.processor.RecordTransform']
+    processor_interfaces = 
['org.apache.nifi.python.processor.FlowFileTransform',
+                            'org.apache.nifi.python.processor.RecordTransform',
+                            'org.apache.nifi.python.processor.FlowFileSource']
     processor_details = {}
     processor_class_by_name = {}
     module_files_by_extension_type = {}
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py
index 2b0b9d56cb..c396ffe785 100644
--- 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py
@@ -20,7 +20,9 @@ from nifiapi.documentation import UseCaseDetails, 
MultiProcessorUseCaseDetails,
 
 import ExtensionDetails
 
-PROCESSOR_INTERFACES = ['org.apache.nifi.python.processor.FlowFileTransform', 
'org.apache.nifi.python.processor.RecordTransform']
+PROCESSOR_INTERFACES = ['org.apache.nifi.python.processor.FlowFileTransform',
+                        'org.apache.nifi.python.processor.RecordTransform',
+                        'org.apache.nifi.python.processor.FlowFileSource']
 
 logger = logging.getLogger("python.ProcessorInspection")
 
diff --git 
a/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/CreateFlowFile.py
 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/CreateFlowFile.py
new file mode 100644
index 0000000000..5eec97a8a0
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/CreateFlowFile.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
PropertyDependency
+from nifiapi.relationship import Relationship
+
+class CreateFlowFile(FlowFileSource):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileSource']
+
+    class ProcessorDetails:
+        version = '0.0.1-SNAPSHOT'
+        description = '''A Python processor that creates FlowFiles with given 
contents.'''
+        tags = ['text', 'test', 'python', 'source']
+
+    FF_CONTENTS = PropertyDescriptor(
+        name='FlowFile Contents',
+        description='''The contents of the FlowFile.''',
+        required=True,
+        default_value='Hello World!'
+    )
+
+    property_descriptors = [FF_CONTENTS]
+
+    REL_MULTILINE = Relationship(name='multiline', description='FlowFiles that 
contain multiline text.')
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.property_descriptors
+
+    def getRelationships(self):
+        return [self.REL_MULTILINE]
+
+    def create(self, context):
+        contents = context.getProperty(self.FF_CONTENTS).getValue()
+
+        if contents is not None and isinstance(contents, str):
+            contents_str = str.encode(contents)
+            if b'\n' in contents_str:
+                return FlowFileSourceResult(relationship='multiline', 
attributes=None, contents=contents_str)
+
+        return FlowFileSourceResult(relationship='success', attributes=None, 
contents=contents)
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java
index 244ee9f7d8..9e9064ea11 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java
@@ -84,7 +84,6 @@ public class PythonProcessorIT extends NiFiSystemIT {
         assertEquals(messageContents, contents);
     }
 
-
     @Test
     public void testRecordTransform() throws NiFiClientException, IOException, 
InterruptedException {
         final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
@@ -171,4 +170,39 @@ public class PythonProcessorIT extends NiFiSystemIT {
         assertEquals("Ball", secondRecordValues.get( 
headerIndices.get("sport") ));
         assertEquals("HELLO", secondRecordValues.get( 
headerIndices.get("greeting") ));
     }
+
+    @Test
+    public void testFlowFileSource() throws NiFiClientException, IOException, 
InterruptedException {
+        final String messageContents = "Hello World";
+
+        final ProcessorEntity createFlowFilePython = 
getClientUtil().createPythonProcessor("CreateFlowFile");
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
+
+        // Config CreateFlowFile with "Hello World" as the value of "FlowFile 
Contents" attribute
+        final ProcessorConfigDTO generateConfig = 
createFlowFilePython.getComponent().getConfig();
+        generateConfig.setProperties(Collections.singletonMap("FlowFile 
Contents", messageContents));
+        getClientUtil().updateProcessorConfig(createFlowFilePython, 
generateConfig);
+
+        // Connect the processors
+        final ConnectionEntity outputConnection = 
getClientUtil().createConnection(createFlowFilePython, terminate, "success");
+        getClientUtil().setAutoTerminatedRelationships(createFlowFilePython, 
"multiline");
+
+        // Wait for processor validation to complete
+        getClientUtil().waitForValidProcessor(createFlowFilePython.getId());
+
+        // Run the flow
+        runProcessorOnce(createFlowFilePython);
+
+        // Wait for output to be queued up
+        waitForQueueCount(outputConnection.getId(), 1);
+
+        // Validate the output
+        final String contents = 
getClientUtil().getFlowFileContentAsUtf8(outputConnection.getId(), 0);
+        assertEquals(messageContents, contents);
+    }
+
+    private void runProcessorOnce(final ProcessorEntity processorEntity) 
throws NiFiClientException, IOException, InterruptedException {
+        getNifiClient().getProcessorClient().runProcessorOnce(processorEntity);
+        getClientUtil().waitForStoppedProcessor(processorEntity.getId());
+    }
 }

Reply via email to