This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 21dcd4d09838ffe50733e1d79b6cdeadefe50c63
Author: Gabor Gyimesi <gamezb...@gmail.com>
AuthorDate: Thu Feb 1 17:43:42 2024 +0100

    MINIFICPP-2278 Add custom relationship support for python processors
    
    Signed-off-by: Ferenc Gerlits <fgerl...@gmail.com>
    This closes #1722
---
 docker/test/integration/cluster/ImageStore.py      | 13 +++---
 docker/test/integration/features/python.feature    | 20 +++++++++
 .../minifi/processors/RotatingForwarder.py         | 26 ++++++++++++
 .../resources/python/RotatingForwarder.py          | 33 +++++++++++++++
 extensions/python/PYTHON.md                        |  1 -
 .../pythonprocessors/nifiapi/flowfiletransform.py  |  5 ++-
 extensions/python/types/PyProcessSession.cpp       | 48 ++++++++++++++++++++++
 extensions/python/types/PyProcessSession.h         |  2 +
 libminifi/include/core/ProcessSession.h            |  1 +
 libminifi/src/core/ProcessSession.cpp              |  4 ++
 10 files changed, 145 insertions(+), 8 deletions(-)

diff --git a/docker/test/integration/cluster/ImageStore.py 
b/docker/test/integration/cluster/ImageStore.py
index 355eb32e0..e915c1b27 100644
--- a/docker/test/integration/cluster/ImageStore.py
+++ b/docker/test/integration/cluster/ImageStore.py
@@ -107,6 +107,7 @@ class ImageStore:
                 {pip3_install_command}
                 RUN pip3 install langchain
                 USER minificpp
+                COPY RotatingForwarder.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/RotatingForwarder.py
                 RUN wget {parse_document_url} 
--directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors
 && \\
                     wget {chunk_document_url} 
--directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors
                 """.format(base_image='apacheminificpp:' + 
MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION,
@@ -114,7 +115,7 @@ class ImageStore:
                            parse_document_url=parse_document_url,
                            chunk_document_url=chunk_document_url))
 
-        return self.__build_image(dockerfile)
+        return self.__build_image(dockerfile, [os.path.join(self.test_dir, 
"resources", "python", "RotatingForwarder.py")])
 
     def __build_http_proxy_image(self):
         dockerfile = dedent("""\
@@ -178,11 +179,11 @@ class ImageStore:
                 docker_context.addfile(dockerfile_info,
                                        fileobj=conf_dockerfile_buffer)
 
-                for context_file in context_files:
-                    file_info = tarfile.TarInfo(context_file['name'])
-                    file_info.size = context_file['size']
-                    docker_context.addfile(file_info,
-                                           fileobj=context_file['file_obj'])
+                for context_file_path in context_files:
+                    with open(context_file_path, 'rb') as file:
+                        file_info = 
tarfile.TarInfo(os.path.basename(context_file_path))
+                        file_info.size = os.path.getsize(context_file_path)
+                        docker_context.addfile(file_info, file)
             docker_context_buffer.seek(0)
 
             logging.info('Creating configured image...')
diff --git a/docker/test/integration/features/python.feature 
b/docker/test/integration/features/python.feature
index 2f2b042fe..90c9c3f3e 100644
--- a/docker/test/integration/features/python.feature
+++ b/docker/test/integration/features/python.feature
@@ -82,3 +82,23 @@ Feature: MiNiFi can use python processors in its flows
     When all instances start up
     Then at least one flowfile's content match the following regex: '{"text": 
"test_", "metadata": {"filename": "test_file.log", "uuid": "", "chunk_index": 
0, "chunk_count": 3}}' in less than 30 seconds
     And the Minifi logs contain the following message: "key:document.count 
value:3" in less than 10 seconds
+
+  @USE_NIFI_PYTHON_PROCESSORS
+  Scenario: MiNiFi C++ can use custom relationships in NiFi native python 
processors
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with filename "test_file.log" and content "test_data_one" is 
present in "/tmp/input"
+    And a file with filename "test_file2.log" and content "test_data_two" is 
present in "/tmp/input"
+    And a file with filename "test_file3.log" and content "test_data_three" is 
present in "/tmp/input"
+    And a file with filename "test_file4.log" and content "test_data_four" is 
present in "/tmp/input"
+    And a RotatingForwarder processor
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+
+    And the "success" relationship of the GetFile processor is connected to 
the RotatingForwarder
+    And the "first" relationship of the RotatingForwarder processor is 
connected to the PutFile
+    And the "second" relationship of the RotatingForwarder processor is 
connected to the PutFile
+    And the "third" relationship of the RotatingForwarder processor is 
connected to the PutFile
+    And the "fourth" relationship of the RotatingForwarder processor is 
connected to the PutFile
+
+    When all instances start up
+
+    Then flowfiles with these contents are placed in the monitored directory 
in less than 10 seconds: 
"test_data_one,test_data_two,test_data_three,test_data_four"
diff --git a/docker/test/integration/minifi/processors/RotatingForwarder.py 
b/docker/test/integration/minifi/processors/RotatingForwarder.py
new file mode 100644
index 000000000..4571538e6
--- /dev/null
+++ b/docker/test/integration/minifi/processors/RotatingForwarder.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ..core.Processor import Processor
+
+
+class RotatingForwarder(Processor):
+    def __init__(self, context):
+        super(RotatingForwarder, self).__init__(
+            context=context,
+            clazz='RotatingForwarder',
+            
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.',
+            properties={},
+            schedule={'scheduling strategy': 'EVENT_DRIVEN'},
+            auto_terminate=[])
diff --git a/docker/test/integration/resources/python/RotatingForwarder.py 
b/docker/test/integration/resources/python/RotatingForwarder.py
new file mode 100644
index 000000000..09df09fbf
--- /dev/null
+++ b/docker/test/integration/resources/python/RotatingForwarder.py
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+
+
+class RotatingForwarder(FlowFileTransform):
+    """
+    Forwards flow files to a different relationship each time it is called in 
a round robin manner.
+    """
+    def __init__(self, **kwargs):
+        self.relationship_index = 0
+        self.relationships = ["first", "second", "third", "fourth"]
+
+    def transform(self, context, flowFile):
+        content = flowFile.getContentsAsBytes().decode()
+
+        relationship = self.relationships[self.relationship_index % 
len(self.relationships)]
+        self.relationship_index += 1
+        self.relationship_index %= len(self.relationships)
+        return FlowFileTransformResult(relationship, contents=content)
diff --git a/extensions/python/PYTHON.md b/extensions/python/PYTHON.md
index 788321c60..bb496daf8 100644
--- a/extensions/python/PYTHON.md
+++ b/extensions/python/PYTHON.md
@@ -159,7 +159,6 @@ Due to some differences between the NiFi and MiNiFi C++ 
processors and implement
 - Controller properties are not supported at the moment.
 - There are some validators in NiFi that are not present in MiNiFi C++, so 
some property validations will be missing using the NiFi Python processors.
 - Allowable values specified in NiFi Python processors are ignored in MiNiFi 
C++ (due to MiNiFi C++ requiring them to be specified at compile time), so the 
property values are not pre-verified.
-- MiNiFi C++ does not support custom relationship names in Python processors, 
the only available relationships are "success", "failure" and "original".
 - MiNiFi C++ only supports expression language with flow file attributes, so 
only FLOWFILE_ATTRIBUTES expression language scope is supported, otherwise the 
expression language will not be evaluated.
 - MiNiFi C++ does not support property dependencies, so the property 
dependencies will be ignored. If a property depends on another property, the 
property will not be required.
 - MiNiFi C++ does not support the use of self.jvm member in Python processors 
that provides JVM bindings in NiFi, it is set to None in MiNiFi C++.
diff --git a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py 
b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py
index 125cf54ae..a1edcd76f 100644
--- a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py
+++ b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py
@@ -108,7 +108,10 @@ class FlowFileTransform(ABC):
         if result_content is not None:
             session.write(flow_file, WriteCallback(result_content))
 
-        session.transfer(flow_file, self.REL_SUCCESS)
+        if result.getRelationship() == "success":
+            session.transfer(flow_file, self.REL_SUCCESS)
+        else:
+            session.transferToCustomRelationship(flow_file, 
result.getRelationship())
         session.transfer(original_flow_file, self.REL_ORIGINAL)
 
     @abstractmethod
diff --git a/extensions/python/types/PyProcessSession.cpp 
b/extensions/python/types/PyProcessSession.cpp
index 02540c70b..def6817cc 100644
--- a/extensions/python/types/PyProcessSession.cpp
+++ b/extensions/python/types/PyProcessSession.cpp
@@ -63,6 +63,18 @@ void PyProcessSession::transfer(const 
std::shared_ptr<core::FlowFile>& flow_file
   session_->transfer(flow_file, relationship);
 }
 
+void PyProcessSession::transferToCustomRelationship(const 
std::shared_ptr<core::FlowFile>& flow_file, const std::string& 
relationship_name) {
+  if (!session_) {
+    throw std::runtime_error("Access of ProcessSession after it has been 
released");
+  }
+
+  if (!flow_file) {
+    throw std::runtime_error("Access of FlowFile after it has been released");
+  }
+
+  session_->transferToCustomRelationship(flow_file, relationship_name);
+}
+
 void PyProcessSession::read(const std::shared_ptr<core::FlowFile>& flow_file, 
BorrowedObject input_stream_callback) {
   if (!session_) {
     throw std::runtime_error("Access of ProcessSession after it has been 
released");
@@ -153,6 +165,7 @@ static PyMethodDef PyProcessSessionObject_methods[] = {  // 
NOLINT(cppcoreguidel
     {"read", (PyCFunction) PyProcessSessionObject::read, METH_VARARGS, 
nullptr},
     {"write", (PyCFunction) PyProcessSessionObject::write, METH_VARARGS, 
nullptr},
     {"transfer", (PyCFunction) PyProcessSessionObject::transfer, METH_VARARGS, 
nullptr},
+    {"transferToCustomRelationship", (PyCFunction) 
PyProcessSessionObject::transferToCustomRelationship, METH_VARARGS, nullptr},
     {"remove", (PyCFunction) PyProcessSessionObject::remove, METH_VARARGS, 
nullptr},
     {"getContentsAsBytes", (PyCFunction) 
PyProcessSessionObject::getContentsAsBytes, METH_VARARGS, nullptr},
     {}  /* Sentinel */
@@ -308,6 +321,41 @@ PyObject* 
PyProcessSessionObject::transfer(PyProcessSessionObject* self, PyObjec
   Py_RETURN_NONE;
 }
 
+PyObject* 
PyProcessSessionObject::transferToCustomRelationship(PyProcessSessionObject* 
self, PyObject* args) {
+  auto session = self->process_session_.lock();
+  if (!session) {
+    PyErr_SetString(PyExc_AttributeError, "tried reading process session 
outside 'on_trigger'");
+    Py_RETURN_NONE;
+  }
+
+  PyObject* script_flow_file = nullptr;
+  const char* relationship_name = nullptr;
+  if (!PyArg_ParseTuple(args, "O!s", PyScriptFlowFile::typeObject(), 
&script_flow_file, &relationship_name)) {
+    throw PyException();
+  }
+
+  if (!relationship_name) {
+    PyErr_SetString(PyExc_AttributeError, "Custom relationship name is 
invalid!");
+    return nullptr;
+  }
+
+  std::string relationship_name_str(relationship_name);
+  if (relationship_name_str.empty()) {
+    PyErr_SetString(PyExc_AttributeError, "Custom relationship name is 
empty!");
+    return nullptr;
+  }
+
+  const auto flow_file = 
reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
+  if (!flow_file) {
+    PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 
'on_trigger'");
+    Py_RETURN_NONE;
+  }
+
+  BorrowedStr name = BorrowedStr::fromTuple(args, 0);
+  session->transferToCustomRelationship(flow_file, relationship_name_str);
+  Py_RETURN_NONE;
+}
+
 PyObject* PyProcessSessionObject::getContentsAsBytes(PyProcessSessionObject* 
self, PyObject* args) {
   auto session = self->process_session_.lock();
   if (!session) {
diff --git a/extensions/python/types/PyProcessSession.h 
b/extensions/python/types/PyProcessSession.h
index 60a788d5b..14866a820 100644
--- a/extensions/python/types/PyProcessSession.h
+++ b/extensions/python/types/PyProcessSession.h
@@ -33,6 +33,7 @@ class PyProcessSession {
   std::shared_ptr<core::FlowFile> create(const 
std::shared_ptr<core::FlowFile>& flow_file = nullptr);
   std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile>& 
flow_file);
   void transfer(const std::shared_ptr<core::FlowFile>& flow_file, const 
core::Relationship& relationship);
+  void transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& 
flow_file, const std::string& relationship_name);
   void read(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject 
input_stream_callback);
   void write(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject 
output_stream_callback);
   void remove(const std::shared_ptr<core::FlowFile>& flow_file);
@@ -59,6 +60,7 @@ struct PyProcessSessionObject {
   static PyObject* read(PyProcessSessionObject* self, PyObject* args);
   static PyObject* write(PyProcessSessionObject* self, PyObject* args);
   static PyObject* transfer(PyProcessSessionObject* self, PyObject* args);
+  static PyObject* transferToCustomRelationship(PyProcessSessionObject* self, 
PyObject* args);
   static PyObject* remove(PyProcessSessionObject* self, PyObject* args);
   static PyObject* getContentsAsBytes(PyProcessSessionObject* self, PyObject* 
args);
 
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index 54c188719..87da7d974 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -82,6 +82,7 @@ class ProcessSession : public ReferenceContainer {
   std::shared_ptr<core::FlowFile> clone(const core::FlowFile& parent, int64_t 
offset, int64_t size);
   // Transfer the FlowFile to the relationship
   virtual void transfer(const std::shared_ptr<core::FlowFile>& flow, const 
Relationship& relationship);
+  void transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& 
flow, const std::string& relationship_name);
 
   void putAttribute(core::FlowFile& flow, std::string_view key, const 
std::string& value);
   void removeAttribute(core::FlowFile& flow, std::string_view key);
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 49f41cfd8..c8d28a263 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -230,6 +230,10 @@ void ProcessSession::transfer(const 
std::shared_ptr<core::FlowFile>& flow, const
   flow->setDeleted(false);
 }
 
+void ProcessSession::transferToCustomRelationship(const 
std::shared_ptr<core::FlowFile>& flow, const std::string& relationship_name) {
+  transfer(flow, Relationship{relationship_name, relationship_name});
+}
+
 void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, const 
io::OutputStreamCallback& callback) {
   gsl_ExpectsAudit(updated_flowfiles_.contains(flow->getUUID())
       || added_flowfiles_.contains(flow->getUUID())

Reply via email to