This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 21dcd4d09838ffe50733e1d79b6cdeadefe50c63 Author: Gabor Gyimesi <gamezb...@gmail.com> AuthorDate: Thu Feb 1 17:43:42 2024 +0100 MINIFICPP-2278 Add custom relationship support for python processors Signed-off-by: Ferenc Gerlits <fgerl...@gmail.com> This closes #1722 --- docker/test/integration/cluster/ImageStore.py | 13 +++--- docker/test/integration/features/python.feature | 20 +++++++++ .../minifi/processors/RotatingForwarder.py | 26 ++++++++++++ .../resources/python/RotatingForwarder.py | 33 +++++++++++++++ extensions/python/PYTHON.md | 1 - .../pythonprocessors/nifiapi/flowfiletransform.py | 5 ++- extensions/python/types/PyProcessSession.cpp | 48 ++++++++++++++++++++++ extensions/python/types/PyProcessSession.h | 2 + libminifi/include/core/ProcessSession.h | 1 + libminifi/src/core/ProcessSession.cpp | 4 ++ 10 files changed, 145 insertions(+), 8 deletions(-) diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index 355eb32e0..e915c1b27 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -107,6 +107,7 @@ class ImageStore: {pip3_install_command} RUN pip3 install langchain USER minificpp + COPY RotatingForwarder.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/RotatingForwarder.py RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION, @@ -114,7 +115,7 @@ class ImageStore: parse_document_url=parse_document_url, chunk_document_url=chunk_document_url)) - return self.__build_image(dockerfile) + return self.__build_image(dockerfile, [os.path.join(self.test_dir, "resources", "python", "RotatingForwarder.py")]) def __build_http_proxy_image(self): dockerfile = dedent("""\ @@ -178,11 +179,11 @@ class ImageStore: docker_context.addfile(dockerfile_info, fileobj=conf_dockerfile_buffer) - for context_file in context_files: - file_info = tarfile.TarInfo(context_file['name']) - file_info.size = context_file['size'] - docker_context.addfile(file_info, - fileobj=context_file['file_obj']) + for context_file_path in context_files: + with open(context_file_path, 'rb') as file: + file_info = tarfile.TarInfo(os.path.basename(context_file_path)) + file_info.size = os.path.getsize(context_file_path) + docker_context.addfile(file_info, file) docker_context_buffer.seek(0) logging.info('Creating configured image...') diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index 2f2b042fe..90c9c3f3e 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -82,3 +82,23 @@ Feature: MiNiFi can use python processors in its flows When all instances start up Then at least one flowfile's content match the following regex: '{"text": "test_", "metadata": {"filename": "test_file.log", "uuid": "", "chunk_index": 0, "chunk_count": 3}}' in less than 30 seconds And the Minifi logs contain the following message: "key:document.count value:3" in less than 10 seconds + + @USE_NIFI_PYTHON_PROCESSORS + Scenario: MiNiFi C++ can use custom relationships in NiFi native python processors + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with filename "test_file.log" and content "test_data_one" is present in "/tmp/input" + And a file with filename "test_file2.log" and content "test_data_two" is present in "/tmp/input" + And a file with filename "test_file3.log" and content "test_data_three" is present in "/tmp/input" + And a file with filename "test_file4.log" and content "test_data_four" is present in "/tmp/input" + And a RotatingForwarder processor + And a PutFile processor with the "Directory" property set to "/tmp/output" + + And the "success" relationship of the GetFile processor is connected to the RotatingForwarder + And the "first" relationship of the RotatingForwarder processor is connected to the PutFile + And the "second" relationship of the RotatingForwarder processor is connected to the PutFile + And the "third" relationship of the RotatingForwarder processor is connected to the PutFile + And the "fourth" relationship of the RotatingForwarder processor is connected to the PutFile + + When all instances start up + + Then flowfiles with these contents are placed in the monitored directory in less than 10 seconds: "test_data_one,test_data_two,test_data_three,test_data_four" diff --git a/docker/test/integration/minifi/processors/RotatingForwarder.py b/docker/test/integration/minifi/processors/RotatingForwarder.py new file mode 100644 index 000000000..4571538e6 --- /dev/null +++ b/docker/test/integration/minifi/processors/RotatingForwarder.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class RotatingForwarder(Processor): + def __init__(self, context): + super(RotatingForwarder, self).__init__( + context=context, + clazz='RotatingForwarder', + class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.', + properties={}, + schedule={'scheduling strategy': 'EVENT_DRIVEN'}, + auto_terminate=[]) diff --git a/docker/test/integration/resources/python/RotatingForwarder.py b/docker/test/integration/resources/python/RotatingForwarder.py new file mode 100644 index 000000000..09df09fbf --- /dev/null +++ b/docker/test/integration/resources/python/RotatingForwarder.py @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult + + +class RotatingForwarder(FlowFileTransform): + """ + Forwards flow files to a different relationship each time it is called in a round robin manner. + """ + def __init__(self, **kwargs): + self.relationship_index = 0 + self.relationships = ["first", "second", "third", "fourth"] + + def transform(self, context, flowFile): + content = flowFile.getContentsAsBytes().decode() + + relationship = self.relationships[self.relationship_index % len(self.relationships)] + self.relationship_index += 1 + self.relationship_index %= len(self.relationships) + return FlowFileTransformResult(relationship, contents=content) diff --git a/extensions/python/PYTHON.md b/extensions/python/PYTHON.md index 788321c60..bb496daf8 100644 --- a/extensions/python/PYTHON.md +++ b/extensions/python/PYTHON.md @@ -159,7 +159,6 @@ Due to some differences between the NiFi and MiNiFi C++ processors and implement - Controller properties are not supported at the moment. - There are some validators in NiFi that are not present in MiNiFi C++, so some property validations will be missing using the NiFi Python processors. - Allowable values specified in NiFi Python processors are ignored in MiNiFi C++ (due to MiNiFi C++ requiring them to be specified at compile time), so the property values are not pre-verified. -- MiNiFi C++ does not support custom relationship names in Python processors, the only available relationships are "success", "failure" and "original". - MiNiFi C++ only supports expression language with flow file attributes, so only FLOWFILE_ATTRIBUTES expression language scope is supported, otherwise the expression language will not be evaluated. - MiNiFi C++ does not support property dependencies, so the property dependencies will be ignored. If a property depends on another property, the property will not be required. - MiNiFi C++ does not support the use of self.jvm member in Python processors that provides JVM bindings in NiFi, it is set to None in MiNiFi C++. diff --git a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py index 125cf54ae..a1edcd76f 100644 --- a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py +++ b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py @@ -108,7 +108,10 @@ class FlowFileTransform(ABC): if result_content is not None: session.write(flow_file, WriteCallback(result_content)) - session.transfer(flow_file, self.REL_SUCCESS) + if result.getRelationship() == "success": + session.transfer(flow_file, self.REL_SUCCESS) + else: + session.transferToCustomRelationship(flow_file, result.getRelationship()) session.transfer(original_flow_file, self.REL_ORIGINAL) @abstractmethod diff --git a/extensions/python/types/PyProcessSession.cpp b/extensions/python/types/PyProcessSession.cpp index 02540c70b..def6817cc 100644 --- a/extensions/python/types/PyProcessSession.cpp +++ b/extensions/python/types/PyProcessSession.cpp @@ -63,6 +63,18 @@ void PyProcessSession::transfer(const std::shared_ptr<core::FlowFile>& flow_file session_->transfer(flow_file, relationship); } +void PyProcessSession::transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow_file, const std::string& relationship_name) { + if (!session_) { + throw std::runtime_error("Access of ProcessSession after it has been released"); + } + + if (!flow_file) { + throw std::runtime_error("Access of FlowFile after it has been released"); + } + + session_->transferToCustomRelationship(flow_file, relationship_name); +} + void PyProcessSession::read(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject input_stream_callback) { if (!session_) { throw std::runtime_error("Access of ProcessSession after it has been released"); @@ -153,6 +165,7 @@ static PyMethodDef PyProcessSessionObject_methods[] = { // NOLINT(cppcoreguidel {"read", (PyCFunction) PyProcessSessionObject::read, METH_VARARGS, nullptr}, {"write", (PyCFunction) PyProcessSessionObject::write, METH_VARARGS, nullptr}, {"transfer", (PyCFunction) PyProcessSessionObject::transfer, METH_VARARGS, nullptr}, + {"transferToCustomRelationship", (PyCFunction) PyProcessSessionObject::transferToCustomRelationship, METH_VARARGS, nullptr}, {"remove", (PyCFunction) PyProcessSessionObject::remove, METH_VARARGS, nullptr}, {"getContentsAsBytes", (PyCFunction) PyProcessSessionObject::getContentsAsBytes, METH_VARARGS, nullptr}, {} /* Sentinel */ @@ -308,6 +321,41 @@ PyObject* PyProcessSessionObject::transfer(PyProcessSessionObject* self, PyObjec Py_RETURN_NONE; } +PyObject* PyProcessSessionObject::transferToCustomRelationship(PyProcessSessionObject* self, PyObject* args) { + auto session = self->process_session_.lock(); + if (!session) { + PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'"); + Py_RETURN_NONE; + } + + PyObject* script_flow_file = nullptr; + const char* relationship_name = nullptr; + if (!PyArg_ParseTuple(args, "O!s", PyScriptFlowFile::typeObject(), &script_flow_file, &relationship_name)) { + throw PyException(); + } + + if (!relationship_name) { + PyErr_SetString(PyExc_AttributeError, "Custom relationship name is invalid!"); + return nullptr; + } + + std::string relationship_name_str(relationship_name); + if (relationship_name_str.empty()) { + PyErr_SetString(PyExc_AttributeError, "Custom relationship name is empty!"); + return nullptr; + } + + const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock(); + if (!flow_file) { + PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'"); + Py_RETURN_NONE; + } + + BorrowedStr name = BorrowedStr::fromTuple(args, 0); + session->transferToCustomRelationship(flow_file, relationship_name_str); + Py_RETURN_NONE; +} + PyObject* PyProcessSessionObject::getContentsAsBytes(PyProcessSessionObject* self, PyObject* args) { auto session = self->process_session_.lock(); if (!session) { diff --git a/extensions/python/types/PyProcessSession.h b/extensions/python/types/PyProcessSession.h index 60a788d5b..14866a820 100644 --- a/extensions/python/types/PyProcessSession.h +++ b/extensions/python/types/PyProcessSession.h @@ -33,6 +33,7 @@ class PyProcessSession { std::shared_ptr<core::FlowFile> create(const std::shared_ptr<core::FlowFile>& flow_file = nullptr); std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile>& flow_file); void transfer(const std::shared_ptr<core::FlowFile>& flow_file, const core::Relationship& relationship); + void transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow_file, const std::string& relationship_name); void read(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject input_stream_callback); void write(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject output_stream_callback); void remove(const std::shared_ptr<core::FlowFile>& flow_file); @@ -59,6 +60,7 @@ struct PyProcessSessionObject { static PyObject* read(PyProcessSessionObject* self, PyObject* args); static PyObject* write(PyProcessSessionObject* self, PyObject* args); static PyObject* transfer(PyProcessSessionObject* self, PyObject* args); + static PyObject* transferToCustomRelationship(PyProcessSessionObject* self, PyObject* args); static PyObject* remove(PyProcessSessionObject* self, PyObject* args); static PyObject* getContentsAsBytes(PyProcessSessionObject* self, PyObject* args); diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index 54c188719..87da7d974 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -82,6 +82,7 @@ class ProcessSession : public ReferenceContainer { std::shared_ptr<core::FlowFile> clone(const core::FlowFile& parent, int64_t offset, int64_t size); // Transfer the FlowFile to the relationship virtual void transfer(const std::shared_ptr<core::FlowFile>& flow, const Relationship& relationship); + void transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow, const std::string& relationship_name); void putAttribute(core::FlowFile& flow, std::string_view key, const std::string& value); void removeAttribute(core::FlowFile& flow, std::string_view key); diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 49f41cfd8..c8d28a263 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -230,6 +230,10 @@ void ProcessSession::transfer(const std::shared_ptr<core::FlowFile>& flow, const flow->setDeleted(false); } +void ProcessSession::transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow, const std::string& relationship_name) { + transfer(flow, Relationship{relationship_name, relationship_name}); +} + void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) { gsl_ExpectsAudit(updated_flowfiles_.contains(flow->getUUID()) || added_flowfiles_.contains(flow->getUUID())