This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit b0767d8d474c53f88f4a99166ee623d5ae105aee Author: Gabor Gyimesi <gamezb...@gmail.com> AuthorDate: Wed Nov 29 12:57:35 2023 +0100 MINIFICPP-2276 Support FlowFileTransform NiFi Python processors Signed-off-by: Ferenc Gerlits <fgerl...@gmail.com> This closes #1712 --- CMakeLists.txt | 8 + CONFIGURE.md | 3 + README.md | 2 +- cmake/VerifyPythonCompatibility.cmake | 8 +- docker/python-verify/conda.Dockerfile | 9 +- docker/python-verify/venv.Dockerfile | 2 +- docker/test/integration/cluster/ContainerStore.py | 3 + .../test/integration/cluster/DockerTestCluster.py | 3 + docker/test/integration/cluster/ImageStore.py | 51 +++- .../cluster/containers/MinifiC2ServerContainer.py | 6 +- .../cluster/containers/MinifiContainer.py | 3 + .../cluster/containers/NifiContainer.py | 2 +- .../features/MiNiFi_integration_test_driver.py | 3 + docker/test/integration/features/environment.py | 8 + .../integration/features/minifi_c2_server.feature | 2 +- docker/test/integration/features/python.feature | 19 ++ docker/test/integration/features/steps/steps.py | 3 +- .../integration/minifi/processors/ChunkDocument.py | 26 ++ .../integration/minifi/processors/ParseDocument.py | 26 ++ .../resources/minifi-c2-server/config-ssl.json | 53 ---- .../resources/minifi-c2-server/config.json | 41 --- .../resources/minifi-c2-server/config.yml | 31 +++ extensions/lua/LuaScriptExecutor.cpp | 1 + extensions/lua/LuaScriptExecutor.h | 1 + extensions/python/CMakeLists.txt | 19 ++ extensions/python/ExecutePythonProcessor.cpp | 54 +++- extensions/python/ExecutePythonProcessor.h | 29 ++- extensions/python/{README.md => PYTHON.md} | 62 ++++- extensions/python/PythonBindings.cpp | 9 +- extensions/python/PythonCreator.h | 42 ++- extensions/python/PythonObjectFactory.h | 28 +- extensions/python/PythonProcessor.cpp | 5 +- extensions/python/PythonProcessor.h | 4 +- extensions/python/PythonScriptEngine.cpp | 92 ++++++- extensions/python/PythonScriptEngine.h | 51 +++- extensions/python/PythonScriptExecutor.cpp | 5 +- extensions/python/PythonScriptExecutor.h | 1 + .../nifi_python_processors/__init__.py | 14 + .../python/pythonprocessors/nifiapi/__init__.py | 14 + .../pythonprocessors/nifiapi/documentation.py | 142 +++++++++++ .../pythonprocessors/nifiapi/flowfiletransform.py | 119 +++++++++ .../python/pythonprocessors/nifiapi/properties.py | 283 +++++++++++++++++++++ extensions/python/tests/PythonManifestTests.cpp | 4 +- .../PyDataConverter.cpp} | 33 +-- .../{PythonProcessor.h => types/PyDataConverter.h} | 28 +- extensions/python/types/PyProcessContext.cpp | 30 ++- extensions/python/types/PyProcessSession.cpp | 86 ++++++- extensions/python/types/PyProcessSession.h | 4 + extensions/python/types/PyProcessor.cpp | 29 ++- extensions/python/types/PyScriptFlowFile.cpp | 47 +++- extensions/python/types/PyScriptFlowFile.h | 2 + extensions/python/types/Types.h | 14 +- extensions/script/ExecuteScript.cpp | 2 +- extensions/script/ExecuteScript.h | 3 +- extensions/script/ScriptExecutor.h | 1 + libminifi/include/core/ConfigurableComponent.h | 11 +- libminifi/include/core/PropertyType.h | 14 + libminifi/include/core/TypedValues.h | 1 + libminifi/src/core/ConfigurableComponent.cpp | 52 ++-- libminifi/src/core/PropertyType.cpp | 25 ++ msi/WixWin.wsi | 69 ++++- msi/WixWinMergeModules.wsi | 85 +++++-- run_flake8.sh | 2 +- 63 files changed, 1563 insertions(+), 266 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9d710a7b7..99a3f3665 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -630,6 +630,14 @@ foreach(extension ${selected_extensions}) endif() cpack_add_component(${component-name} DISPLAY_NAME ${extension} ${maybe_disabled} GROUP extensions DEPENDS bin) endforeach() +if (NOT WIN32 AND "python_script_extension" IN_LIST CPACK_COMPONENTS_ALL) + list(APPEND CPACK_COMPONENTS_ALL minifi_python_native_module) + cpack_add_component(minifi_python_native_module + DISPLAY_NAME "MiNiFi Python native module" + FILES "${CMAKE_BINARY_DIR}/bin/minifi_native$<TARGET_FILE_SUFFIX:minifi-python-script-extension>" + GROUP extensions + DEPENDS bin) +endif() include(CPack) ### include modules diff --git a/CONFIGURE.md b/CONFIGURE.md index a7a3f9313..16469db58 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -634,6 +634,9 @@ To notify the agent which extensions it should load see [Loading extensions](Ext ### JNI Functionality Please see the [JNI Configuration Guide](JNI.md). +### Python processors +Please see the [Python Processors Readme](extensions/python/PYTHON.md). + ## Log configuration By default the application logs for Apache MiNiFi C++ can be found in the ${MINIFI_HOME}/logs/minifi-app.log file with default INFO level logging. The logger can be reconfigured in the ${MINIFI_HOME}/conf/minifi-log.properties file to use different output streams, log level, and output format. diff --git a/README.md b/README.md index 3e5986311..449591457 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension | USB Camera | [GetUSBCamera](PROCESSORS.md#getusbcamera) [...] | Windows Event Log (Windows) | [CollectorInitiatedSubscription](PROCESSORS.md#collectorinitiatedsubscription)<br/>[ConsumeWindowsEventLog](PROCESSORS.md#consumewindowseventlog)<br/>[TailEventLog](PROCESSORS.md#taileventlog) [...] - Please see our [Python guide](extensions/script/README.md) on how to write Python processors and use them within MiNiFi C++. + Please see our [Python guide](extensions/python/PYTHON.md) on how to write Python processors and use them within MiNiFi C++. ## Caveats * We follow semver with regards to API compatibility, but no ABI compatibility is provided. See [semver's website](https://semver.org/) for more information diff --git a/cmake/VerifyPythonCompatibility.cmake b/cmake/VerifyPythonCompatibility.cmake index 7923710b5..e537e2dd1 100644 --- a/cmake/VerifyPythonCompatibility.cmake +++ b/cmake/VerifyPythonCompatibility.cmake @@ -67,10 +67,10 @@ function(ADD_VENV_TO_DOCKER TAG_PREFIX) endfunction() -ADD_DOCKER_TARGET_FROM_CENTOS(debian:bullseye patched_bullseye "apt update \\&\\& apt install -y patchelf libpython3-dev python3-venv \\&\\& patchelf /opt/minifi/minifi-current/extensions/libminifi-python-script-extension.so --replace-needed libpython3.so libpython3.9.so") -ADD_DOCKER_TARGET_FROM_CENTOS(ubuntu:jammy patched_jammy "apt update \\&\\& apt install -y patchelf libpython3.10-dev python3.10-venv \\&\\& patchelf /opt/minifi/minifi-current/extensions/libminifi-python-script-extension.so --replace-needed libpython3.so libpython3.10.so.1.0") -ADD_DOCKER_TARGET_FROM_CENTOS(rockylinux:8 rocky8 "yum install -y python3-libs") -ADD_DOCKER_TARGET_FROM_CENTOS(rockylinux:9 rocky9 "yum install -y python3-libs") +ADD_DOCKER_TARGET_FROM_CENTOS(debian:bullseye patched_bullseye "apt update \\&\\& apt install -y patchelf libpython3-dev python3-venv python3-pip wget \\&\\& patchelf /opt/minifi/minifi-current/extensions/libminifi-python-script-extension.so --replace-needed libpython3.so libpython3.9.so") +ADD_DOCKER_TARGET_FROM_CENTOS(ubuntu:jammy patched_jammy "apt update \\&\\& apt install -y patchelf libpython3.10-dev python3.10-venv python3-pip wget \\&\\& patchelf /opt/minifi/minifi-current/extensions/libminifi-python-script-extension.so --replace-needed libpython3.so libpython3.10.so.1.0") +ADD_DOCKER_TARGET_FROM_CENTOS(rockylinux:8 rocky8 "yum install -y python3-libs python3-pip python3-devel gcc-c++ wget") +ADD_DOCKER_TARGET_FROM_CENTOS(rockylinux:9 rocky9 "yum install -y python3-libs python3-pip wget") ADD_DOCKER_TARGET_FROM_CENTOS(ubuntu:jammy jammy "apt update \\&\\& apt install -y wget") ADD_CONDA_TO_DOCKER(jammy) ADD_VENV_TO_DOCKER(rocky9) diff --git a/docker/python-verify/conda.Dockerfile b/docker/python-verify/conda.Dockerfile index 84f423e6c..911e2bd22 100644 --- a/docker/python-verify/conda.Dockerfile +++ b/docker/python-verify/conda.Dockerfile @@ -24,12 +24,13 @@ ENV CONDA_HOME /opt/conda ENV LD_LIBRARY_PATH /opt/conda/lib ENV MINIFI_BASE_DIR /opt/minifi ENV MINIFI_HOME ${MINIFI_BASE_DIR}/minifi-current +ENV PATH ${CONDA_HOME}/bin:${PATH} USER root -RUN wget https://repo.anaconda.com/archive/Anaconda3-2022.10-Linux-x86_64.sh -P /tmp \ - && echo "e7ecbccbc197ebd7e1f211c59df2e37bc6959d081f2235d387e08c9026666acd /tmp/Anaconda3-2022.10-Linux-x86_64.sh" | sha256sum -c \ - && bash /tmp/Anaconda3-2022.10-Linux-x86_64.sh -b -p /opt/conda \ +RUN wget https://repo.anaconda.com/archive/Anaconda3-2023.09-0-Linux-x86_64.sh -P /tmp \ + && echo "6c8a4abb36fbb711dc055b7049a23bbfd61d356de9468b41c5140f8a11abd851 /tmp/Anaconda3-2023.09-0-Linux-x86_64.sh" | sha256sum -c \ + && bash /tmp/Anaconda3-2023.09-0-Linux-x86_64.sh -b -p /opt/conda \ && chown -R ${USER}:${USER} /opt/conda \ && mkdir /home/${USER} \ && chown -R ${USER}:${USER} /home/${USER} @@ -37,6 +38,8 @@ RUN wget https://repo.anaconda.com/archive/Anaconda3-2022.10-Linux-x86_64.sh -P USER ${USER} RUN ${CONDA_HOME}/bin/conda init bash +RUN ${CONDA_HOME}/bin/conda install langchain -c conda-forge + WORKDIR ${MINIFI_HOME} # Start MiNiFi CPP in the foreground diff --git a/docker/python-verify/venv.Dockerfile b/docker/python-verify/venv.Dockerfile index 102926335..c00a0321c 100644 --- a/docker/python-verify/venv.Dockerfile +++ b/docker/python-verify/venv.Dockerfile @@ -26,7 +26,7 @@ USER ${USER} WORKDIR ${MINIFI_HOME} RUN python3 -m venv venv -RUN . ./venv/bin/activate && pip install --upgrade pip && pip install numpy +RUN . ./venv/bin/activate && pip install --upgrade pip && pip install numpy langchain # Start MiNiFi CPP in the foreground CMD ["/bin/bash", "-c", "source ./venv/bin/activate && ./bin/minifi.sh run"] diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index 1f8fb52ab..ee30f944c 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -349,6 +349,9 @@ class ContainerStore: def enable_sql_in_minifi(self): self.minifi_options.enable_sql = True + def use_nifi_python_processors_in_minifi(self): + self.minifi_options.use_nifi_python_processors = True + def set_yaml_in_minifi(self): self.minifi_options.config_format = "yaml" diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index 19c43a732..1363c81e8 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -107,6 +107,9 @@ class DockerTestCluster: def enable_multi_tenancy_in_grafana_loki(self): self.container_store.enable_multi_tenancy_in_grafana_loki() + def use_nifi_python_processors_in_minifi(self): + self.container_store.use_nifi_python_processors_in_minifi() + def set_yaml_in_minifi(self): self.container_store.set_yaml_in_minifi() diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index ab6fd5bd9..355eb32e0 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -15,6 +15,7 @@ from .containers.MinifiContainer import MinifiContainer +from .containers.NifiContainer import NifiContainer import logging import tarfile import docker @@ -41,6 +42,8 @@ class ImageStore: if container_engine == "minifi-cpp-sql": image = self.__build_minifi_cpp_sql_image() + elif container_engine == "minifi-cpp-nifi-python": + image = self.__build_minifi_cpp_image_with_nifi_python_processors() elif container_engine == "http-proxy": image = self.__build_http_proxy_image() elif container_engine == "postgresql-server": @@ -88,7 +91,28 @@ class ImageStore: echo "Password = password" >> /etc/odbc.ini && \ echo "Database = postgres" >> /etc/odbc.ini USER minificpp - """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION)) + """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION)) + + return self.__build_image(dockerfile) + + def __build_minifi_cpp_image_with_nifi_python_processors(self): + parse_document_url = "https://raw.githubusercontent.com/apache/nifi/rel/nifi-" + NifiContainer.NIFI_VERSION + "/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/ParseDocument.py" + chunk_document_url = "https://raw.githubusercontent.com/apache/nifi/rel/nifi-" + NifiContainer.NIFI_VERSION + "/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/ChunkDocument.py" + pip3_install_command = "" + if not MinifiContainer.MINIFI_TAG_PREFIX: + pip3_install_command = "RUN apk --update --no-cache add py3-pip" + dockerfile = dedent("""\ + FROM {base_image} + USER root + {pip3_install_command} + RUN pip3 install langchain + USER minificpp + RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ + wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors + """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION, + pip3_install_command=pip3_install_command, + parse_document_url=parse_document_url, + chunk_document_url=chunk_document_url)) return self.__build_image(dockerfile) @@ -186,3 +210,28 @@ class ImageStore: except Exception as e: logging.info(e) raise + + def get_minifi_image_python_version(self): + result = self.client.containers.run( + image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION, + command=['python3', '-c', 'import platform; print(platform.python_version())'], + remove=True + ) + + python_ver_str = result.decode('utf-8') + logging.info('MiNiFi python version: %s', python_ver_str) + return tuple(map(int, python_ver_str.split('.'))) + + def is_conda_available_in_minifi_image(self): + container = self.client.containers.create( + image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION, + command=['conda', '--version'], + ) + try: + result = container.start() + except docker.errors.APIError: + container.remove() + return False + + container.remove() + return result.decode('utf-8').startswith('conda ') diff --git a/docker/test/integration/cluster/containers/MinifiC2ServerContainer.py b/docker/test/integration/cluster/containers/MinifiC2ServerContainer.py index eef4c446c..93a4f88ab 100644 --- a/docker/test/integration/cluster/containers/MinifiC2ServerContainer.py +++ b/docker/test/integration/cluster/containers/MinifiC2ServerContainer.py @@ -64,8 +64,8 @@ CN=minifi-cpp-flow-{feature_id}: test_dir = os.environ['TEST_DIRECTORY'] # Based on DockerVerify.sh mounts = [docker.types.Mount( type='bind', - source=os.path.join(test_dir, "resources/minifi-c2-server/config-ssl.json" if self.ssl else "resources/minifi-c2-server/config.json"), - target='/opt/minifi-c2/minifi-c2-current/files/minifi-test-class/config.application.json.v1' + source=os.path.join(test_dir, "resources/minifi-c2-server/config.yml"), + target='/opt/minifi-c2/minifi-c2-current/files/minifi-test-class/config.text.yml.v1' )] if self.ssl: @@ -95,7 +95,7 @@ CN=minifi-cpp-flow-{feature_id}: target='/opt/minifi-c2/minifi-c2-current/certs/minifi-c2-server-truststore.p12' )) self.docker_container = self.client.containers.run( - image='apache/nifi-minifi-c2:2.0.0-M1', + image='apache/nifi-minifi-c2:1.25.0', detach=True, name=self.name, network=self.network.name, diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py index 6a0bc8301..05cc7ff52 100644 --- a/docker/test/integration/cluster/containers/MinifiContainer.py +++ b/docker/test/integration/cluster/containers/MinifiContainer.py @@ -33,6 +33,7 @@ class MinifiOptions: self.enable_prometheus = False self.enable_prometheus_with_ssl = False self.enable_sql = False + self.use_nifi_python_processors = False self.config_format = "json" self.use_flow_config_from_url = False self.set_ssl_context_properties = False @@ -165,6 +166,8 @@ class MinifiContainer(FlowContainer): if self.options.enable_sql: image = self.image_store.get_image('minifi-cpp-sql') + elif self.options.use_nifi_python_processors: + image = self.image_store.get_image('minifi-cpp-nifi-python') else: image = 'apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION diff --git a/docker/test/integration/cluster/containers/NifiContainer.py b/docker/test/integration/cluster/containers/NifiContainer.py index 192f8a00f..20cbd7282 100644 --- a/docker/test/integration/cluster/containers/NifiContainer.py +++ b/docker/test/integration/cluster/containers/NifiContainer.py @@ -23,7 +23,7 @@ import os class NifiContainer(FlowContainer): - NIFI_VERSION = '2.0.0-M1' + NIFI_VERSION = '2.0.0-M2' NIFI_ROOT = '/opt/nifi/nifi-' + NIFI_VERSION def __init__(self, feature_context, config_dir, name, vols, network, image_store, command=None): diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 5f2a6d166..1c1df5f2d 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -417,6 +417,9 @@ class MiNiFi_integration_test: def enable_multi_tenancy_in_grafana_loki(self): self.cluster.enable_multi_tenancy_in_grafana_loki() + def use_nifi_python_processors_in_minifi(self): + self.cluster.use_nifi_python_processors_in_minifi() + def set_yaml_in_minifi(self): self.cluster.set_yaml_in_minifi() diff --git a/docker/test/integration/features/environment.py b/docker/test/integration/features/environment.py index 489c25ada..12fc1403a 100644 --- a/docker/test/integration/features/environment.py +++ b/docker/test/integration/features/environment.py @@ -47,6 +47,13 @@ def before_scenario(context, scenario): logging.info("Integration test setup at {time:%H:%M:%S.%f}".format(time=datetime.datetime.now())) context.test = MiNiFi_integration_test(context=context, feature_id=context.feature_id) + + if "USE_NIFI_PYTHON_PROCESSORS" in scenario.effective_tags: + if not context.image_store.is_conda_available_in_minifi_image() and context.image_store.get_minifi_image_python_version() < (3, 8, 1): + scenario.skip("NiFi Python processor tests use langchain library which requires Python 3.8.1 or later.") + return + context.test.use_nifi_python_processors_in_minifi() + for step in scenario.steps: inject_feature_id(context, step) @@ -74,6 +81,7 @@ def before_feature(context, feature): is_x86 = platform.machine() in ("i386", "AMD64", "x86_64") if not is_x86: feature.skip("This feature is only x86/x64 compatible") + feature_id = shortuuid.uuid() context.feature_id = feature_id context.directory_bindings = DockerTestDirectoryBindings(feature_id) diff --git a/docker/test/integration/features/minifi_c2_server.feature b/docker/test/integration/features/minifi_c2_server.feature index 1b847407a..1f11a0a00 100644 --- a/docker/test/integration/features/minifi_c2_server.feature +++ b/docker/test/integration/features/minifi_c2_server.feature @@ -48,7 +48,7 @@ Feature: MiNiFi can communicate with Apache NiFi MiNiFi C2 server Scenario: MiNiFi flow config is updated from MiNiFi C2 server through SSL with SSL properties Given a file with the content "test" is present in "/tmp/input" And a GenerateFlowFile processor - And a ssl properties are set up for MiNiFi C2 server + And ssl properties are set up for MiNiFi C2 server And a MiNiFi C2 server is set up with SSL When all instances start up Then the MiNiFi C2 SSL server logs contain the following message: "acknowledged with a state of FULLY_APPLIED(DONE)" in less than 60 seconds diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index 29f054c95..2f2b042fe 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -63,3 +63,22 @@ Feature: MiNiFi can use python processors in its flows When all instances start up Then flowfiles with these contents are placed in the monitored directory in less than 5 seconds: "0,1,2,3,4,5" + + @USE_NIFI_PYTHON_PROCESSORS + Scenario: MiNiFi C++ can use NiFi native python processors + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with filename "test_file.log" and content "test_data" is present in "/tmp/input" + And a ParseDocument processor + And a ChunkDocument processor with the "Chunk Size" property set to "5" + And the "Chunk Overlap" property of the ChunkDocument processor is set to "3" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + + And the "success" relationship of the GetFile processor is connected to the ParseDocument + And the "success" relationship of the ParseDocument processor is connected to the ChunkDocument + And the "success" relationship of the ChunkDocument processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When all instances start up + Then at least one flowfile's content match the following regex: '{"text": "test_", "metadata": {"filename": "test_file.log", "uuid": "", "chunk_index": 0, "chunk_count": 3}}' in less than 30 seconds + And the Minifi logs contain the following message: "key:document.count value:3" in less than 10 seconds diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 6cd01976f..149fe2b16 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -794,6 +794,7 @@ def step_impl(context, content, duration): @then("at least one flowfile's content match the following regex: \"{regex}\" in less than {duration}") +@then("at least one flowfile's content match the following regex: '{regex}' in less than {duration}") def step_impl(context, regex: str, duration: str): context.test.check_for_at_least_one_file_with_matching_content(regex, humanfriendly.parse_timespan(duration)) @@ -1047,7 +1048,7 @@ def step_impl(context): context.test.enable_c2_with_ssl_in_minifi() -@given("a ssl properties are set up for MiNiFi C2 server") +@given("ssl properties are set up for MiNiFi C2 server") def step_impl(context): context.test.enable_c2_with_ssl_in_minifi() context.test.set_ssl_context_properties_in_minifi() diff --git a/docker/test/integration/minifi/processors/ChunkDocument.py b/docker/test/integration/minifi/processors/ChunkDocument.py new file mode 100644 index 000000000..7dbe170d1 --- /dev/null +++ b/docker/test/integration/minifi/processors/ChunkDocument.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class ChunkDocument(Processor): + def __init__(self, context): + super(ChunkDocument, self).__init__( + context=context, + clazz='ChunkDocument', + class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.', + properties={}, + schedule={'scheduling strategy': 'EVENT_DRIVEN'}, + auto_terminate=['success', 'original', 'failure']) diff --git a/docker/test/integration/minifi/processors/ParseDocument.py b/docker/test/integration/minifi/processors/ParseDocument.py new file mode 100644 index 000000000..a20167849 --- /dev/null +++ b/docker/test/integration/minifi/processors/ParseDocument.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class ParseDocument(Processor): + def __init__(self, context): + super(ParseDocument, self).__init__( + context=context, + clazz='ParseDocument', + class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.', + properties={}, + schedule={'scheduling strategy': 'EVENT_DRIVEN'}, + auto_terminate=['success', 'original', 'failure']) diff --git a/docker/test/integration/resources/minifi-c2-server/config-ssl.json b/docker/test/integration/resources/minifi-c2-server/config-ssl.json deleted file mode 100644 index af05c8ff7..000000000 --- a/docker/test/integration/resources/minifi-c2-server/config-ssl.json +++ /dev/null @@ -1,53 +0,0 @@ -{ - "MiNiFi Config Version": 3, - "Flow Controller": { - "name": "MiNiFi Flow" - }, - "Processors": [ - { - "name": "Get files from /tmp/input", - "id": "2f2a3b47-f5ba-49f6-82b5-bc1c86b96e27", - "class": "org.apache.nifi.minifi.processors.GetFile", - "scheduling strategy": "TIMER_DRIVEN", - "scheduling period": "1000 ms", - "Properties": { - "Input Directory": "/tmp/input" - } - }, - { - "name": "Put files to /tmp/output", - "id": "e143601d-de4f-44ba-a6ec-d1f97d77ec94", - "class": "org.apache.nifi.minifi.processors.PutFile", - "scheduling strategy": "EVENT_DRIVEN", - "auto-terminated relationships list": ["failure", "success"], - "Properties": { - "Conflict Resolution Strategy": "fail", - "Create Missing Directories": "true", - "Directory": "/tmp/output" - } - } - ], - "Connections": [ - { - "name": "GetFile/success/PutFile", - "id": "098a56ba-f4bf-4323-a3f3-6f8a5e3586bf", - "source id": "2f2a3b47-f5ba-49f6-82b5-bc1c86b96e27", - "source relationship names": ["success"], - "destination id": "e143601d-de4f-44ba-a6ec-d1f97d77ec94" - } - ], - "Controller Services": [ - { - "name": "SSLContextService", - "id": "2438e3c8-015a-1000-79ca-83af40ec1994", - "class": "SSLContextService", - "Properties": { - "Client Certificate": [{"value": "/tmp/resources/minifi-cpp-flow.crt"}], - "Private Key": [{"value": "/tmp/resources/minifi-cpp-flow.key"}], - "Passphrase": [{"value": "abcdefgh"}], - "CA Certificate": [{"value": "/tmp/resources/root_ca.crt"}] - } - } - ], - "Remote Process Groups": [] -} diff --git a/docker/test/integration/resources/minifi-c2-server/config.json b/docker/test/integration/resources/minifi-c2-server/config.json deleted file mode 100644 index ff488a8f4..000000000 --- a/docker/test/integration/resources/minifi-c2-server/config.json +++ /dev/null @@ -1,41 +0,0 @@ -{ - "MiNiFi Config Version": 3, - "Flow Controller": { - "name": "MiNiFi Flow" - }, - "Processors": [ - { - "name": "Get files from /tmp/input", - "id": "2f2a3b47-f5ba-49f6-82b5-bc1c86b96e27", - "class": "org.apache.nifi.minifi.processors.GetFile", - "scheduling strategy": "TIMER_DRIVEN", - "scheduling period": "1000 ms", - "Properties": { - "Input Directory": "/tmp/input" - } - }, - { - "name": "Put files to /tmp/output", - "id": "e143601d-de4f-44ba-a6ec-d1f97d77ec94", - "class": "org.apache.nifi.minifi.processors.PutFile", - "scheduling strategy": "EVENT_DRIVEN", - "auto-terminated relationships list": ["failure", "success"], - "Properties": { - "Conflict Resolution Strategy": "fail", - "Create Missing Directories": "true", - "Directory": "/tmp/output" - } - } - ], - "Connections": [ - { - "name": "GetFile/success/PutFile", - "id": "098a56ba-f4bf-4323-a3f3-6f8a5e3586bf", - "source id": "2f2a3b47-f5ba-49f6-82b5-bc1c86b96e27", - "source relationship names": ["success"], - "destination id": "e143601d-de4f-44ba-a6ec-d1f97d77ec94" - } - ], - "Controller Services": [], - "Remote Process Groups": [] -} diff --git a/docker/test/integration/resources/minifi-c2-server/config.yml b/docker/test/integration/resources/minifi-c2-server/config.yml new file mode 100644 index 000000000..ddac4cd77 --- /dev/null +++ b/docker/test/integration/resources/minifi-c2-server/config.yml @@ -0,0 +1,31 @@ +MiNiFi Config Version: 3 +Flow Controller: + name: MiNiFi Flow +Processors: +- name: Get files from /tmp/input + id: 2f2a3b47-f5ba-49f6-82b5-bc1c86b96e27 + class: org.apache.nifi.minifi.processors.GetFile + scheduling strategy: TIMER_DRIVEN + scheduling period: 1000 ms + Properties: + Input Directory: /tmp/input +- name: Put files to /tmp/output + id: e143601d-de4f-44ba-a6ec-d1f97d77ec94 + class: org.apache.nifi.minifi.processors.PutFile + scheduling strategy: EVENT_DRIVEN + auto-terminated relationships list: + - failure + - success + Properties: + Conflict Resolution Strategy: fail + Create Missing Directories: 'true' + Directory: /tmp/output +Connections: +- name: GetFile/success/PutFile + id: 098a56ba-f4bf-4323-a3f3-6f8a5e3586bf + source id: 2f2a3b47-f5ba-49f6-82b5-bc1c86b96e27 + source relationship names: + - success + destination id: e143601d-de4f-44ba-a6ec-d1f97d77ec94 +Controller Services: [] +Remote Process Groups: [] diff --git a/extensions/lua/LuaScriptExecutor.cpp b/extensions/lua/LuaScriptExecutor.cpp index 93e898269..22a40e391 100644 --- a/extensions/lua/LuaScriptExecutor.cpp +++ b/extensions/lua/LuaScriptExecutor.cpp @@ -48,6 +48,7 @@ void LuaScriptExecutor::initialize(std::filesystem::path script_file, size_t max_concurrent_engines, const core::Relationship& success, const core::Relationship& failure, + const core::Relationship& /*original*/, const std::shared_ptr<core::logging::Logger>& logger) { if (script_file.empty() == script_body.empty()) throw std::runtime_error("Exactly one of these must be non-empty: ScriptBody, ScriptFile"); diff --git a/extensions/lua/LuaScriptExecutor.h b/extensions/lua/LuaScriptExecutor.h index a150317a5..d001058c2 100644 --- a/extensions/lua/LuaScriptExecutor.h +++ b/extensions/lua/LuaScriptExecutor.h @@ -39,6 +39,7 @@ class LuaScriptExecutor : public script::ScriptExecutor { size_t max_concurrent_engines, const core::Relationship& success, const core::Relationship& failure, + const core::Relationship& original, const std::shared_ptr<core::logging::Logger>& logger) override; diff --git a/extensions/python/CMakeLists.txt b/extensions/python/CMakeLists.txt index 174c32569..f9e8cf5cf 100644 --- a/extensions/python/CMakeLists.txt +++ b/extensions/python/CMakeLists.txt @@ -43,5 +43,24 @@ endif() target_include_directories(minifi-python-script-extension PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/python") target_include_directories(minifi-python-script-extension PUBLIC ${Python_INCLUDE_DIRS}) +get_target_property(TARGET_EXT minifi-python-script-extension SUFFIX) + +if (NOT WIN32) + add_custom_command( + TARGET minifi-python-script-extension + POST_BUILD + COMMAND ${CMAKE_COMMAND} -E create_symlink + "libminifi-python-script-extension$<TARGET_FILE_SUFFIX:minifi-python-script-extension>" + "${CMAKE_BINARY_DIR}/bin/minifi_native$<TARGET_FILE_SUFFIX:minifi-python-script-extension>" + COMMENT "Creating symlink for minifi-python-script-extension" + ) + + install( + FILES "${CMAKE_BINARY_DIR}/bin/minifi_native$<TARGET_FILE_SUFFIX:minifi-python-script-extension>" + DESTINATION extensions + COMPONENT minifi_python_native_module + ) +endif() + register_extension(minifi-python-script-extension "PYTHON SCRIPTING ENGINE" PYTHON-SCRIPTING-EXTENSIONS "This enables python script engine" "extensions/python/tests") register_extension_linter(minifi-python-script-extension-linter) diff --git a/extensions/python/ExecutePythonProcessor.cpp b/extensions/python/ExecutePythonProcessor.cpp index 870548299..9f9743df7 100644 --- a/extensions/python/ExecutePythonProcessor.cpp +++ b/extensions/python/ExecutePythonProcessor.cpp @@ -27,6 +27,7 @@ #include "utils/file/FileUtils.h" #include "core/Resource.h" #include "range/v3/range/conversion.hpp" +#include "range/v3/algorithm/find_if.hpp" namespace org::apache::nifi::minifi::extensions::python::processors { @@ -45,7 +46,6 @@ void ExecutePythonProcessor::initialize() { try { loadScript(); } catch(const std::runtime_error&) { - logger_->log_warn("Could not load python script while initializing. In case of non-native python processor this is normal and will be done in the schedule phase."); return; } @@ -57,13 +57,18 @@ void ExecutePythonProcessor::initialize() { void ExecutePythonProcessor::initalizeThroughScriptEngine() { appendPathForImportModules(); + python_script_engine_->appendModulePaths(python_paths_); python_script_engine_->eval(script_to_exec_); + if (python_class_name_) { + python_script_engine_->initializeProcessorObject(*python_class_name_); + } python_script_engine_->describe(this); python_script_engine_->onInitialize(this); processor_initialized_ = true; } void ExecutePythonProcessor::onScheduleSharedPtr(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) { + setAutoTerminatedRelationships(std::vector<core::Relationship>{Original}); if (!processor_initialized_) { loadScript(); python_script_engine_ = createScriptEngine(); @@ -95,7 +100,7 @@ void ExecutePythonProcessor::appendPathForImportModules() { std::string module_directory; getProperty(ModuleDirectory, module_directory); if (!module_directory.empty()) { - python_script_engine_->setModulePaths(utils::string::splitAndTrimRemovingEmpty(module_directory, ",") | ranges::to<std::vector<std::filesystem::path>>()); + python_script_engine_->appendModulePaths(utils::string::splitAndTrimRemovingEmpty(module_directory, ",") | ranges::to<std::vector<std::filesystem::path>>()); } } @@ -146,11 +151,54 @@ std::unique_ptr<PythonScriptEngine> ExecutePythonProcessor::createScriptEngine() auto engine = std::make_unique<PythonScriptEngine>(); python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName()); - engine->initialize(Success, Failure, python_logger_); + engine->initialize(Success, Failure, Original, python_logger_); return engine; } +void ExecutePythonProcessor::addProperty(const std::string &name, const std::string &description, const std::optional<std::string> &defaultvalue, bool required, bool el, + bool sensitive, const std::optional<int64_t>& property_type_code) { + auto property = core::PropertyDefinitionBuilder<>::createProperty(name).withDescription(description).isRequired(required).supportsExpressionLanguage(el).isSensitive(sensitive); + if (defaultvalue) { + property.withDefaultValue(*defaultvalue); + } + if (property_type_code) { + property.withPropertyType(core::StandardPropertyTypes::translateCodeToPropertyType(static_cast<core::StandardPropertyTypes::PropertyTypeCode>(*property_type_code))); + } + + std::lock_guard<std::mutex> lock(python_properties_mutex_); + python_properties_.emplace_back(property.build()); +} + +const core::Property* ExecutePythonProcessor::findProperty(const std::string& name) const { + if (auto prop_ptr = core::ConfigurableComponent::findProperty(name)) { + return prop_ptr; + } + + std::lock_guard<std::mutex> lock(python_properties_mutex_); + + auto it = ranges::find_if(python_properties_, [&name](const auto& item){ + return item.getName() == name; + }); + if (it != python_properties_.end()) { + return &*it; + } + + return nullptr; +} + +std::map<std::string, core::Property> ExecutePythonProcessor::getProperties() const { + auto result = ConfigurableComponent::getProperties(); + + std::lock_guard<std::mutex> lock(python_properties_mutex_); + + for (const auto &property : python_properties_) { + result.insert({ property.getName(), property }); + } + + return result; +} + REGISTER_RESOURCE(ExecutePythonProcessor, Processor); } // namespace org::apache::nifi::minifi::extensions::python::processors diff --git a/extensions/python/ExecutePythonProcessor.h b/extensions/python/ExecutePythonProcessor.h index a80a92cd8..aea941374 100644 --- a/extensions/python/ExecutePythonProcessor.h +++ b/extensions/python/ExecutePythonProcessor.h @@ -24,6 +24,7 @@ #include <string> #include <utility> #include <vector> +#include <filesystem> #include "concurrentqueue.h" #include "core/Processor.h" @@ -81,7 +82,8 @@ class ExecutePythonProcessor : public core::Processor { EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Script succeeds"}; EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "Script fails"}; - EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; + EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", "Original flow file"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure, Original}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; @@ -97,12 +99,11 @@ class ExecutePythonProcessor : public core::Processor { python_dynamic_ = true; } - void addProperty(const std::string &name, const std::string &description, const std::string &defaultvalue, bool required, bool el) { - python_properties_.emplace_back( - core::PropertyDefinitionBuilder<>::createProperty(name).withDefaultValue(defaultvalue).withDescription(description).isRequired(required).supportsExpressionLanguage(el).build()); - } + void addProperty(const std::string &name, const std::string &description, const std::optional<std::string> &defaultvalue, bool required, bool el, + bool sensitive, const std::optional<int64_t>& property_type_code); - const std::vector<core::Property> &getPythonProperties() const { + std::vector<core::Property> getPythonProperties() const { + std::lock_guard<std::mutex> lock(python_properties_mutex_); return python_properties_; } @@ -118,7 +119,21 @@ class ExecutePythonProcessor : public core::Processor { return description_; } + void setPythonClassName(const std::string& python_class_name) { + python_class_name_ = python_class_name; + } + + void setPythonPaths(const std::vector<std::filesystem::path>& python_paths) { + python_paths_ = python_paths; + } + + std::map<std::string, core::Property> getProperties() const override; + + protected: + const core::Property* findProperty(const std::string& name) const override; + private: + mutable std::mutex python_properties_mutex_; std::vector<core::Property> python_properties_; std::string description_; @@ -134,6 +149,8 @@ class ExecutePythonProcessor : public core::Processor { std::string script_file_path_; std::shared_ptr<core::logging::Logger> python_logger_; std::unique_ptr<PythonScriptEngine> python_script_engine_; + std::optional<std::string> python_class_name_; + std::vector<std::filesystem::path> python_paths_; void appendPathForImportModules(); void loadScriptFromFile(); diff --git a/extensions/python/README.md b/extensions/python/PYTHON.md similarity index 51% rename from extensions/python/README.md rename to extensions/python/PYTHON.md index ba75f4a35..788321c60 100644 --- a/extensions/python/README.md +++ b/extensions/python/PYTHON.md @@ -23,6 +23,8 @@ This readme defines the configuration parameters to use ExecutePythonProcessor t - [Requirements](#requirements) - [Description](#description) - [Configuration](#configuration) +- [Processors](#processors) +- [Using NiFi Python Processors](#using-nifi-python-processors) ## Requirements @@ -46,6 +48,15 @@ e.g. This will change the dependency from the generic libpython3.so to the speci patchelf extensions/libminifi-python-script-extension.so --replace-needed libpython3.so libpython3.9.so ``` +### Windows system python +When installing python on Windows, make sure to select the option to install python for all users. This prevents issues when running MiNiFi as a Windows service, as it makes sure +that the python libraries are available not just for the currently logged on user but for the user running the service too. + +If the python libraries are not available for the user running the service, MiNiFi starts with an error message similar to this: +``` +Failed to load extension 'minifi-python-script-extension' at 'C:\Program Files\ApacheNiFiMiNiFi\nifi-minifi-cpp\bin\..\extensions\minifi-python-script-extension.dll': The specified module could not be found. +``` + ### Anaconda Just make sure minifi finds the anaconda libraries. e.g.: ```shell @@ -64,9 +75,9 @@ export LD_LIBRARY_PATH="${PYENV_ROOT}/versions/${PY_VERSION}/lib${LD_LIBRARY_PAT Python native processors can be updated at any time by simply adding a new processor to the directory defined in the configuration options. The processor name, when provided to MiNiFi C++ and any C2 manifest will be that of the name of the python script. For example, "AttributePrinter.py" will be named and referenced in the flow -as "org.apache.nifi.minifi.processors.AttributePrinter" +as "org.apache.nifi.minifi.processors.AttributePrinter" -Methods that are enabled within the processor are describe, onSchedule, onInitialize, and onTrigger. +Methods that are enabled within the processor are describe, onSchedule, onInitialize, and onTrigger. Describe is passed the processor and is a required function. You must set the description like so: @@ -74,14 +85,24 @@ Describe is passed the processor and is a required function. You must set the de def describe(processor): processor.setDescription("Adds an attribute to your flow files") ``` - + onInitialize is also passed the processor reference and can be where you set properties. The first argument is the property display name, -followed by the description, and default value. The last two arguments are booleans describing if the property is required or requires EL. +followed by the description, and default value. The next three arguments are booleans describing if the property is required, support expression language, and if it is a sensitive property. +The last argument is the property type code. The property type code is an integer that represents the type of the property. The supported property type codes and their corresponding types: +``` +INTEGER = 0 +LONG = 1 +BOOLEAN = 2 +DATA_SIZE = 3 +TIME_PERIOD = 4 +NON_BLANK = 5 +PORT = 6 +``` ```python def onInitialize(processor): processor.setSupportsDynamicProperties() - processor.addProperty("property name","description","default value", True, False) + processor.addProperty("property name","description","default value", True /*required*/, False /*expression language supported*/, False /*sensitive*/, 1 /*property type code*/) ``` The onSchedule function is passed the context and session factory. This should be where your processor loads and reads properties via @@ -106,20 +127,39 @@ class VaderSentiment(object): To enable python Processor capabilities, the following options need to be provided in minifi.properties. The directory specified can contain processors. Note that the processor name will be the reference in your flow. Directories are treated like package names. Therefore if the nifi.python.processor.dir is /tmp/ and you have a subdirectory named packagedir with the file name file.py, it will -produce a processor with the name org.apache.nifi.minifi.processors.packagedir.file. Note that each subdirectory will append a package -to the reference class name. +produce a processor with the name org.apache.nifi.minifi.processors.packagedir.file. Note that each subdirectory will append a package +to the reference class name. in minifi.properties #directory where processors exist nifi.python.processor.dir=XXXX - - + + ## Processors The python directory (extensions/pythonprocessors) contains implementations that will be available for flows if the required dependencies exist. - -## Sentiment Analysis + +### Sentiment Analysis The SentimentAnalysis processor will perform a Vader Sentiment Analysis. This requires that you install nltk and VaderSentiment pip install nltk pip install VaderSentiment + +## Using NiFi Python Processors + +MiNiFi C++ supports the use of NiFi Python processors, that are inherited from the FlowFileTransform base class. To use these processors, copy the Python processor module to the nifi_python_processors subdirectory of the python directory. By default, the python directory is ${minifi_root}/minifi-python. To see how to write NiFi Python processors, please refer to the Python Developer Guide under the [Apache NiFi documentation](https://nifi.apache.org/documentation/v2/). + +In the flow configuration these Python processors can be referenced by their fully qualified class name, which looks like this: org.apache.nifi.minifi.processors.nifi_python_processors.<package_name>.<processor_name>. For example, the fully qualified class name of the PromptChatGPT processor implemented in the file nifi_python_processors/PromptChatGPT.py is org.apache.nifi.minifi.processors.nifi_python_processors.PromptChatGPT. If a processor is copied under a subdirectory, because it is [...] + +**NOTE:** The name of the NiFi Python processor file should match the class name in the file, otherwise the processor will not be found. + +Due to some differences between the NiFi and MiNiFi C++ processors and implementation, there are some limitations using the NiFi Python processors: +- Record based processors are not yet supported in MiNiFi C++, so the NiFi Python processors inherited from RecordTransform are not supported. +- Virtualenv support is not yet available in MiNiFi C++, so all required packages must be installed on the system. +- Controller properties are not supported at the moment. +- There are some validators in NiFi that are not present in MiNiFi C++, so some property validations will be missing using the NiFi Python processors. +- Allowable values specified in NiFi Python processors are ignored in MiNiFi C++ (due to MiNiFi C++ requiring them to be specified at compile time), so the property values are not pre-verified. +- MiNiFi C++ does not support custom relationship names in Python processors, the only available relationships are "success", "failure" and "original". +- MiNiFi C++ only supports expression language with flow file attributes, so only FLOWFILE_ATTRIBUTES expression language scope is supported, otherwise the expression language will not be evaluated. +- MiNiFi C++ does not support property dependencies, so the property dependencies will be ignored. If a property depends on another property, the property will not be required. +- MiNiFi C++ does not support the use of self.jvm member in Python processors that provides JVM bindings in NiFi, it is set to None in MiNiFi C++. diff --git a/extensions/python/PythonBindings.cpp b/extensions/python/PythonBindings.cpp index 7ac7e2767..b113ac864 100644 --- a/extensions/python/PythonBindings.cpp +++ b/extensions/python/PythonBindings.cpp @@ -26,16 +26,23 @@ #include "types/PyInputStream.h" #include "types/PyOutputStream.h" #include "types/PyStateManager.h" +#include "types/PyDataConverter.h" namespace org::apache::nifi::minifi::extensions::python { extern "C" { +static PyMethodDef minifi_native_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) + {"timePeriodStringToMilliseconds", (PyCFunction) timePeriodStringToMilliseconds, METH_VARARGS, nullptr}, + {"dataSizeStringToBytes", (PyCFunction) dataSizeStringToBytes, METH_VARARGS, nullptr}, + {} /* Sentinel */ +}; + struct PyModuleDef minifi_module = { .m_base = PyModuleDef_HEAD_INIT, .m_name = "minifi_native", // name of module .m_doc = nullptr, // module documentation, may be NULL .m_size = -1, // size of per-interpreter state of the module, or -1 if the module keeps state in global variables. - .m_methods = nullptr, + .m_methods = minifi_native_methods, .m_slots = nullptr, .m_traverse = nullptr, .m_clear = nullptr, diff --git a/extensions/python/PythonCreator.h b/extensions/python/PythonCreator.h index 587bbb777..198f85da9 100644 --- a/extensions/python/PythonCreator.h +++ b/extensions/python/PythonCreator.h @@ -23,6 +23,7 @@ #include <algorithm> #include <string> #include <memory> +#include <filesystem> #include "core/Core.h" #include "core/logging/LoggerConfiguration.h" #include "core/Resource.h" @@ -34,6 +35,8 @@ #include "utils/StringUtils.h" #include "range/v3/algorithm.hpp" #include "properties/Configuration.h" +#include "utils/file/FilePattern.h" +#include "range/v3/view/filter.hpp" namespace org::apache::nifi::minifi::extensions::python { @@ -62,6 +65,8 @@ class PythonCreator : public minifi::core::CoreComponent { } configure({pathListings.value()}); + auto python_lib_path = getPythonLibPath(configuration); + for (const auto &path : classpaths_) { const auto script_name = path.stem(); const auto package = getPackage(pathListings.value(), path.string()); @@ -71,7 +76,15 @@ class PythonCreator : public minifi::core::CoreComponent { full_name = utils::string::join_pack("org.apache.nifi.minifi.processors.", package, ".", script_name.string()); class_name = full_name; } - core::getClassLoader().registerClass(class_name, std::make_unique<PythonObjectFactory>(path.string(), class_name)); + if (path.string().find("nifi_python_processors") != std::string::npos) { + logger_->log_info("Registering NiFi python processor: {}", class_name); + core::getClassLoader().registerClass(class_name, std::make_unique<PythonObjectFactory>(path.string(), script_name.string(), + PythonProcessorType::NIFI_TYPE, std::vector<std::filesystem::path>{python_lib_path, std::filesystem::path{pathListings.value()}, path.parent_path()})); + } else { + logger_->log_info("Registering MiNiFi python processor: {}", class_name); + core::getClassLoader().registerClass(class_name, std::make_unique<PythonObjectFactory>(path.string(), script_name.string(), + PythonProcessorType::MINIFI_TYPE, std::vector<std::filesystem::path>{python_lib_path, std::filesystem::path{pathListings.value()}})); + } registered_classes_.push_back(class_name); try { registerScriptDescription(class_name, full_name, path, script_name.string()); @@ -118,6 +131,9 @@ class PythonCreator : public minifi::core::CoreComponent { for (const auto &path : pathOrFiles) { utils::file::addFilesMatchingExtension(logger_, path, ".py", classpaths_); } + classpaths_ = classpaths_ + | ranges::views::filter([] (auto& path) { return path.string().find("nifiapi") == std::string::npos && path.string().find("__init__") == std::string::npos; }) + | ranges::to<std::vector<std::filesystem::path>>(); } std::string getPackage(const std::string &basePath, const std::string &pythonscript) { @@ -136,6 +152,30 @@ class PythonCreator : public minifi::core::CoreComponent { return python_package; } + std::filesystem::path getPythonLibPath(const std::shared_ptr<Configure>& configuration) { + constexpr const char* DEFAULT_EXTENSION_PATH = "../extensions/*"; + std::string pattern = [&] { + auto opt_pattern = configuration->get(minifi::Configuration::nifi_extension_path); + if (!opt_pattern) { + logger_->log_warn("No extension path is provided, using default: '{}'", DEFAULT_EXTENSION_PATH); + } + return opt_pattern.value_or(DEFAULT_EXTENSION_PATH); + }(); + auto candidates = utils::file::match(utils::file::FilePattern(pattern, [&] (std::string_view subpattern, std::string_view error_msg) { + logger_->log_error("Error in subpattern '{}': {}", subpattern, error_msg); + })); + + std::filesystem::path python_lib_path; + for (const auto& candidate : candidates) { + if (candidate.string().find("python") != std::string::npos) { + python_lib_path = candidate.parent_path(); + break; + } + } + + return python_lib_path; + } + std::vector<std::string> registered_classes_; std::vector<std::filesystem::path> classpaths_; diff --git a/extensions/python/PythonObjectFactory.h b/extensions/python/PythonObjectFactory.h index d3c07964a..729f8ad8a 100644 --- a/extensions/python/PythonObjectFactory.h +++ b/extensions/python/PythonObjectFactory.h @@ -22,6 +22,7 @@ #include <string> #include <memory> #include <utility> +#include <filesystem> #include "core/ClassLoader.h" #include "ExecutePythonProcessor.h" @@ -31,11 +32,18 @@ #pragma GCC visibility push(hidden) #endif +enum class PythonProcessorType { + MINIFI_TYPE, + NIFI_TYPE +}; + class PythonObjectFactory : public org::apache::nifi::minifi::core::DefautObjectFactory<org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor> { public: - explicit PythonObjectFactory(std::string file, std::string name) + explicit PythonObjectFactory(std::string file, std::string class_name, PythonProcessorType python_processor_type, const std::vector<std::filesystem::path>& python_paths) : file_(std::move(file)), - name_(std::move(name)) { + class_name_(std::move(class_name)), + python_paths_(python_paths), + python_processor_type_(python_processor_type) { } std::unique_ptr<org::apache::nifi::minifi::core::CoreComponent> create(const std::string &name) override { @@ -44,6 +52,10 @@ class PythonObjectFactory : public org::apache::nifi::minifi::core::DefautObject if (ptr == nullptr) { return nullptr; } + if (python_processor_type_ == PythonProcessorType::NIFI_TYPE) { + ptr->setPythonClassName(class_name_); + ptr->setPythonPaths(python_paths_); + } ptr->initialize(); ptr->setProperty(org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile, file_); return ptr; @@ -55,6 +67,10 @@ class PythonObjectFactory : public org::apache::nifi::minifi::core::DefautObject if (ptr == nullptr) { return nullptr; } + if (python_processor_type_ == PythonProcessorType::NIFI_TYPE) { + ptr->setPythonClassName(class_name_); + ptr->setPythonPaths(python_paths_); + } ptr->initialize(); ptr->setProperty(org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile, file_); return ptr; @@ -69,6 +85,10 @@ class PythonObjectFactory : public org::apache::nifi::minifi::core::DefautObject org::apache::nifi::minifi::core::CoreComponent* createRaw(const std::string &name, const org::apache::nifi::minifi::utils::Identifier &uuid) override { auto ptr = dynamic_cast<org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor*>(DefautObjectFactory::createRaw(name, uuid)); + if (python_processor_type_ == PythonProcessorType::NIFI_TYPE) { + ptr->setPythonClassName(class_name_); + ptr->setPythonPaths(python_paths_); + } ptr->initialize(); ptr->setProperty(org::apache::nifi::minifi::extensions::python::processors::ExecutePythonProcessor::ScriptFile, file_); return dynamic_cast<org::apache::nifi::minifi::core::CoreComponent*>(ptr); @@ -76,7 +96,9 @@ class PythonObjectFactory : public org::apache::nifi::minifi::core::DefautObject private: std::string file_; - std::string name_; + std::string class_name_; + std::vector<std::filesystem::path> python_paths_; + PythonProcessorType python_processor_type_; }; #if defined(__GNUC__) || defined(__GNUG__) diff --git a/extensions/python/PythonProcessor.cpp b/extensions/python/PythonProcessor.cpp index 266f160d2..07fd23211 100644 --- a/extensions/python/PythonProcessor.cpp +++ b/extensions/python/PythonProcessor.cpp @@ -38,8 +38,9 @@ void PythonProcessor::setDescription(const std::string& desc) { processor_->setDescription(desc); } -void PythonProcessor::addProperty(const std::string& name, const std::string& description, const std::string& defaultvalue, bool required, bool el) { - processor_->addProperty(name, description, defaultvalue, required, el); +void PythonProcessor::addProperty(const std::string& name, const std::string& description, const std::optional<std::string>& defaultvalue, + bool required, bool el, bool sensitive, const std::optional<int64_t>& property_type_code) { + processor_->addProperty(name, description, defaultvalue, required, el, sensitive, property_type_code); } } // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/PythonProcessor.h b/extensions/python/PythonProcessor.h index 979355fb3..6ec20e2a6 100644 --- a/extensions/python/PythonProcessor.h +++ b/extensions/python/PythonProcessor.h @@ -20,6 +20,7 @@ #include <string> #include <memory> +#include <optional> #include "core/Processor.h" @@ -37,7 +38,8 @@ class PythonProcessor { void setDescription(const std::string& desc); - void addProperty(const std::string& name, const std::string& description, const std::string& defaultvalue, bool required, bool el); + void addProperty(const std::string& name, const std::string& description, const std::optional<std::string>& defaultvalue, + bool required, bool el, bool sensitive, const std::optional<int64_t>& property_type_code); private: python::processors::ExecutePythonProcessor* processor_; diff --git a/extensions/python/PythonScriptEngine.cpp b/extensions/python/PythonScriptEngine.cpp index 073d6156d..a777ea65e 100644 --- a/extensions/python/PythonScriptEngine.cpp +++ b/extensions/python/PythonScriptEngine.cpp @@ -92,6 +92,7 @@ PythonScriptEngine::PythonScriptEngine() { PythonScriptEngine::~PythonScriptEngine() { GlobalInterpreterLock lock; bindings_.resetReference(); + processor_instance_.resetReference(); } void PythonScriptEngine::eval(const std::string& script) { @@ -129,27 +130,44 @@ void PythonScriptEngine::evalFile(const std::filesystem::path& file_name) { void PythonScriptEngine::onInitialize(core::Processor* proc) { auto newproc = std::make_shared<python::PythonProcessor>(proc); - call("onInitialize", std::weak_ptr(newproc)); + if (processor_instance_.get() != nullptr) { + callProcessorObjectMethod("onInitialize", std::weak_ptr(newproc)); + } else { + call("onInitialize", std::weak_ptr(newproc)); + } } void PythonScriptEngine::describe(core::Processor* proc) { auto newproc = std::make_shared<python::PythonProcessor>(proc); - callRequiredFunction("describe", std::weak_ptr(newproc)); + if (processor_instance_.get() != nullptr) { + callRequiredProcessorObjectMethod("describe", std::weak_ptr(newproc)); + } else { + callRequiredFunction("describe", std::weak_ptr(newproc)); + } } void PythonScriptEngine::onSchedule(const std::shared_ptr<core::ProcessContext> &context) { - call("onSchedule", std::weak_ptr(context)); + if (processor_instance_.get() != nullptr) { + callProcessorObjectMethod("onSchedule", std::weak_ptr(context)); + } else { + call("onSchedule", std::weak_ptr(context)); + } } void PythonScriptEngine::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { auto py_session = std::make_shared<python::PyProcessSession>(session); - call("onTrigger", std::weak_ptr(context), std::weak_ptr(py_session)); + if (processor_instance_.get() != nullptr) { + callProcessorObjectMethod("onTrigger", std::weak_ptr(context), std::weak_ptr(py_session)); + } else { + call("onTrigger", std::weak_ptr(context), std::weak_ptr(py_session)); + } } -void PythonScriptEngine::initialize(const core::Relationship& success, const core::Relationship& failure, const std::shared_ptr<core::logging::Logger>& logger) { +void PythonScriptEngine::initialize(const core::Relationship& success, const core::Relationship& failure, const core::Relationship& original, const std::shared_ptr<core::logging::Logger>& logger) { bind("log", std::weak_ptr<core::logging::Logger>(logger)); bind("REL_SUCCESS", success); bind("REL_FAILURE", failure); + bind("REL_ORIGINAL", original); } void PythonScriptEngine::evalInternal(std::string_view script) { @@ -166,11 +184,11 @@ void PythonScriptEngine::evalInternal(std::string_view script) { void PythonScriptEngine::evaluateModuleImports() { bindings_.put("__builtins__", OwnedObject(PyImport_ImportModule("builtins"))); + evalInternal("import sys"); if (module_paths_.empty()) { return; } - evalInternal("import sys"); for (const auto& module_path : module_paths_) { if (std::filesystem::is_regular_file(module_path)) { evalInternal("sys.path.append(r'" + module_path.parent_path().string() + "')"); @@ -180,4 +198,66 @@ void PythonScriptEngine::evaluateModuleImports() { } } +void PythonScriptEngine::initializeProcessorObject(const std::string& python_class_name) { + GlobalInterpreterLock gil; + if (auto python_class = bindings_[python_class_name]) { + auto num_args = [&]() -> size_t { + auto class_init = OwnedObject(PyObject_GetAttrString(python_class->get(), "__init__")); + if (!class_init.get()) { + return 0; + } + + auto inspect_module = OwnedObject(PyImport_ImportModule("inspect")); + if (!inspect_module.get()) { + return 0; + } + + auto inspect_args = OwnedObject(PyObject_CallMethod(inspect_module.get(), "getfullargspec", "O", class_init.get())); + if (!inspect_args.get()) { + return 0; + } + + auto arg_list = OwnedObject(PyObject_GetAttrString(inspect_args.get(), "args")); + if (!arg_list.get()) { + return 0; + } + + return PyList_Size(arg_list.get()); + }(); + + if (num_args > 1) { + auto kwargs = OwnedDict::create(); + auto value = OwnedObject(Py_None); + kwargs.put("jvm", value); + auto args = OwnedObject(PyTuple_New(0)); + processor_instance_ = OwnedObject(PyObject_Call(python_class->get(), args.get(), kwargs.get())); + } else { + processor_instance_ = OwnedObject(PyObject_CallObject(python_class->get(), nullptr)); + } + + if (processor_instance_.get() == nullptr) { + throw PythonScriptException(PyException().what()); + } + + auto result = PyObject_SetAttrString(processor_instance_.get(), "logger", bindings_[std::string_view("log")]->get()); + if (result < 0) { + throw PythonScriptException("Could not bind 'logger' object to '" + python_class_name + "' python processor object"); + } + result = PyObject_SetAttrString(processor_instance_.get(), "REL_SUCCESS", bindings_[std::string_view("REL_SUCCESS")]->get()); + if (result < 0) { + throw PythonScriptException("Could not bind 'REL_SUCCESS' object to '" + python_class_name + "' python processor object"); + } + result = PyObject_SetAttrString(processor_instance_.get(), "REL_FAILURE", bindings_[std::string_view("REL_FAILURE")]->get()); + if (result < 0) { + throw PythonScriptException("Could not bind 'REL_FAILURE' object to '" + python_class_name + "' python processor object"); + } + result = PyObject_SetAttrString(processor_instance_.get(), "REL_ORIGINAL", bindings_[std::string_view("REL_ORIGINAL")]->get()); + if (result < 0) { + throw PythonScriptException("Could not bind 'REL_ORIGINAL' object to '" + python_class_name + "' python processor object"); + } + } else { + throw PythonScriptException("No python class '" + python_class_name + "' was found!"); + } +} + } // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/PythonScriptEngine.h b/extensions/python/PythonScriptEngine.h index 58ce7de5d..b6b5dbb5b 100644 --- a/extensions/python/PythonScriptEngine.h +++ b/extensions/python/PythonScriptEngine.h @@ -89,8 +89,8 @@ class PythonScriptEngine { void eval(const std::string& script); void evalFile(const std::filesystem::path& file_name); - void setModulePaths(std::vector<std::filesystem::path> module_paths) { - module_paths_ = std::move(module_paths); + void appendModulePaths(const std::vector<std::filesystem::path>& module_paths) { + module_paths_.insert(module_paths_.end(), module_paths.begin(), module_paths.end()); } template<typename... Args> @@ -117,7 +117,47 @@ class PythonScriptEngine { throw PyException(); } } else { - throw std::runtime_error("Required Function" + fn_name + " is not found within Python bindings"); + throw std::runtime_error("Required Function '" + fn_name + "' is not found within Python bindings"); + } + } + + template<typename ... Args> + void callProcessorObjectMethod(const std::string& fn_name, Args&& ...args) { + GlobalInterpreterLock gil_lock; + if (processor_instance_.get() == nullptr) { + throw std::runtime_error("No python processor instance is set!"); + } + + try { + auto callable_method = OwnedCallable(PyObject_GetAttrString(processor_instance_.get(), fn_name.c_str())); + if (callable_method.get() == nullptr) { + return; + } + + auto result = callable_method(std::forward<Args>(args)...); + if (!result) { + throw PyException(); + } + } catch (const std::exception& e) { + throw PythonScriptException(e.what()); + } + } + + template<typename ... Args> + void callRequiredProcessorObjectMethod(const std::string& fn_name, Args&& ...args) { + GlobalInterpreterLock gil_lock; + if (processor_instance_.get() == nullptr) { + throw std::runtime_error("No python processor instance is set!"); + } + + auto callable_method = OwnedCallable(PyObject_GetAttrString(processor_instance_.get(), fn_name.c_str())); + if (callable_method.get() == nullptr) { + throw std::runtime_error("Required method '" + fn_name + "' is not found in python processor object!"); + } + + auto result = callable_method(std::forward<Args>(args)...); + if (!result) { + throw PyException(); } } @@ -131,12 +171,15 @@ class PythonScriptEngine { void describe(core::Processor* proc); void onSchedule(const std::shared_ptr<core::ProcessContext>& context); void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session); - void initialize(const core::Relationship& success, const core::Relationship& failure, const std::shared_ptr<core::logging::Logger>& logger); + void initialize(const core::Relationship& success, const core::Relationship& failure, const core::Relationship& original, const std::shared_ptr<core::logging::Logger>& logger); + void initializeProcessorObject(const std::string& python_class_name); private: void evalInternal(std::string_view script); void evaluateModuleImports(); OwnedDict bindings_; + OwnedObject processor_instance_; + std::optional<std::string> processor_class_name_; std::vector<std::filesystem::path> module_paths_; }; diff --git a/extensions/python/PythonScriptExecutor.cpp b/extensions/python/PythonScriptExecutor.cpp index 59b829edf..7bf14bc5d 100644 --- a/extensions/python/PythonScriptExecutor.cpp +++ b/extensions/python/PythonScriptExecutor.cpp @@ -35,7 +35,7 @@ void PythonScriptExecutor::onTrigger(const std::shared_ptr<core::ProcessContext> gsl_Expects(std::holds_alternative<std::filesystem::path>(script_to_run_) || std::holds_alternative<std::string>(script_to_run_)); if (module_directory_) { - python_script_engine_->setModulePaths(utils::string::splitAndTrimRemovingEmpty(*module_directory_, ",") | ranges::to<std::vector<std::filesystem::path>>()); + python_script_engine_->appendModulePaths(utils::string::splitAndTrimRemovingEmpty(*module_directory_, ",") | ranges::to<std::vector<std::filesystem::path>>()); } if (std::holds_alternative<std::filesystem::path>(script_to_run_)) @@ -52,6 +52,7 @@ void PythonScriptExecutor::initialize(std::filesystem::path script_file, size_t /*max_concurrent_engines*/, const core::Relationship& success, const core::Relationship& failure, + const core::Relationship& original, const std::shared_ptr<core::logging::Logger>& logger) { if (script_file.empty() == script_body.empty()) throw std::runtime_error("Exactly one of these must be non-empty: ScriptBody, ScriptFile"); @@ -65,7 +66,7 @@ void PythonScriptExecutor::initialize(std::filesystem::path script_file, module_directory_ = std::move(module_directory); python_script_engine_ = std::make_unique<python::PythonScriptEngine>(); - python_script_engine_->initialize(success, failure, logger); + python_script_engine_->initialize(success, failure, original, logger); } REGISTER_RESOURCE(PythonScriptExecutor, InternalResource); diff --git a/extensions/python/PythonScriptExecutor.h b/extensions/python/PythonScriptExecutor.h index e111464fc..69fc96d00 100644 --- a/extensions/python/PythonScriptExecutor.h +++ b/extensions/python/PythonScriptExecutor.h @@ -36,6 +36,7 @@ class PythonScriptExecutor : public script::ScriptExecutor { size_t max_concurrent_engines, const core::Relationship& success, const core::Relationship& failure, + const core::Relationship& original, const std::shared_ptr<core::logging::Logger>& logger) override; diff --git a/extensions/python/pythonprocessors/nifi_python_processors/__init__.py b/extensions/python/pythonprocessors/nifi_python_processors/__init__.py new file mode 100644 index 000000000..ae1e83eeb --- /dev/null +++ b/extensions/python/pythonprocessors/nifi_python_processors/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/extensions/python/pythonprocessors/nifiapi/__init__.py b/extensions/python/pythonprocessors/nifiapi/__init__.py new file mode 100644 index 000000000..ae1e83eeb --- /dev/null +++ b/extensions/python/pythonprocessors/nifiapi/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/extensions/python/pythonprocessors/nifiapi/documentation.py b/extensions/python/pythonprocessors/nifiapi/documentation.py new file mode 100644 index 000000000..e09f0bb7e --- /dev/null +++ b/extensions/python/pythonprocessors/nifiapi/documentation.py @@ -0,0 +1,142 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class ProcessorConfiguration: + class Java: + implements = ['org.apache.nifi.python.processor.documentation.ProcessorConfigurationDetails'] + + def __init__(self, processor_type: str, configuration: str): + self.processor_type = processor_type + self.configuration = configuration + + def getProcessorType(self): + return self.processor_type + + def getConfiguration(self): + return self.configuration + + +def use_case(description: str, configuration: str = None, notes: str = None, keywords: list[str] = None): + """Decorator to explain how to perform a specific use case with a given processor""" + def decorator(func): + return func + return decorator + + +def multi_processor_use_case(description: str, configurations: list[ProcessorConfiguration], notes: str = None, keywords: list[str] = None): + """Decorator to explain how to perform a specific use case that involves the decorated Processor, in addition to additional Processors""" + def decorator(func): + return func + return decorator + + +class UseCaseDetails: + class Java: + implements = ['org.apache.nifi.python.processor.documentation.UseCaseDetails'] + + def __init__(self, description: str, notes: str, keywords: list[str], configuration: str): + self.description = description + self.notes = notes + self.keywords = keywords + self.configuration = configuration + + def getDescription(self): + return self.description + + def getNotes(self): + return self.notes + + def getKeywords(self): + return self.keywords + + def getConfiguration(self): + return self.configuration + + def __str__(self): + return f"UseCaseDetails[description={self.description}]" + + +class MultiProcessorUseCaseDetails: + class Java: + implements = ['org.apache.nifi.python.processor.documentation.MultiProcessorUseCaseDetails'] + + def __init__(self, description: str, notes: str, keywords: list[str], configurations: list[ProcessorConfiguration]): + self.description = description + self.notes = notes + self.keywords = keywords + self.configurations = configurations + + def getDescription(self): + return self.description + + def getNotes(self): + return self.notes + + def getKeywords(self): + return self.keywords + + def getConfigurations(self): + return self.configurations + + def __str__(self): + return f"MultiProcessorUseCaseDetails[description={self.description}]" + + +class PropertyDescription: + class Java: + implements = ['org.apache.nifi.python.processor.documentation.PropertyDescription'] + + def __init__(self, + name: str, + description: str, + display_name: str = None, + required: bool = False, + sensitive: bool = True, + default_value: str = None, + expression_language_scope: str = None, + controller_service_definition: str = None): + self.name = name + self.description = description + self.display_name = display_name + self.required = required + self.sensitive = sensitive + self.default_value = default_value + self.expression_language_scope = expression_language_scope + self.controller_service_definition = controller_service_definition + + def getName(self): + return self.name + + def getDescription(self): + return self.description + + def getDisplayName(self): + return self.display_name + + def isRequired(self): + return self.required + + def isSensitive(self): + return self.sensitive + + def getDefaultValue(self): + return self.default_value + + def getExpressionLanguageScope(self): + return self.expression_language_scope + + def getControllerServiceDefinition(self): + return self.controller_service_definition diff --git a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py new file mode 100644 index 000000000..125cf54ae --- /dev/null +++ b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py @@ -0,0 +1,119 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import List +from .properties import ExpressionLanguageScope, FlowFileProxy, ProcessContextProxy, PropertyDescriptor, translateStandardValidatorToMiNiFiPropertype +from minifi_native import OutputStream, Processor, ProcessContext, ProcessSession + + +class WriteCallback: + def __init__(self, content): + self.content = content + + def process(self, output_stream: OutputStream): + output_stream.write(self.content) + return len(self.content) + + +class FlowFileTransformResult: + def __init__(self, relationship: str, attributes=None, contents=None): + self.relationship = relationship + self.attributes = attributes + if contents is not None and isinstance(contents, str): + self.contents = str.encode(contents) + else: + self.contents = contents + + def getRelationship(self): + return self.relationship + + def getContents(self): + return self.contents + + def getAttributes(self): + return self.attributes + + +class FlowFileTransform(ABC): + # These will be added through the python bindings using C API + logger = None + REL_SUCCESS = None + REL_FAILURE = None + REL_ORIGINAL = None + + def describe(self, processor: Processor): + if hasattr(self, 'ProcessorDetails') and hasattr(self.ProcessorDetails, 'description'): + processor.setDescription(self.ProcessorDetails.description) + else: + processor.setDescription(self.__class__.__name__) + + def onInitialize(self, processor: Processor): + processor.setSupportsDynamicProperties() + for property in self.getPropertyDescriptors(): + property_type_code = translateStandardValidatorToMiNiFiPropertype(property.validators) + expression_language_supported = True if property.expressionLanguageScope != ExpressionLanguageScope.NONE else False + + # MiNiFi C++ does not support dependant properties, so if a property depends on another property, it should not be required + is_required = True if property.required and not property.dependencies else False + processor.addProperty(property.name, property.description, property.defaultValue, is_required, expression_language_supported, property.sensitive, property_type_code) + + def onScheduled(self, context_proxy: ProcessContextProxy): + pass + + def onSchedule(self, context: ProcessContext): + context_proxy = ProcessContextProxy(context) + self.onScheduled(context_proxy) + + def onTrigger(self, context: ProcessContext, session: ProcessSession): + original_flow_file = session.get() + if not original_flow_file: + return + + flow_file = session.clone(original_flow_file) + + flow_file_proxy = FlowFileProxy(session, flow_file) + context_proxy = ProcessContextProxy(context) + try: + result = self.transform(context_proxy, flow_file_proxy) + except Exception as e: + self.logger.error("Failed to transform flow file due to error: {}".format(str(e))) + session.remove(flow_file) + session.transfer(original_flow_file, self.REL_FAILURE) + return + + if result.getRelationship() == "failure": + session.remove(flow_file) + session.transfer(original_flow_file, self.REL_FAILURE) + return + + result_attributes = result.getAttributes() + if result_attributes is not None: + for attribute in result_attributes: + flow_file.addAttribute(attribute, result_attributes[attribute]) + + result_content = result.getContents() + if result_content is not None: + session.write(flow_file, WriteCallback(result_content)) + + session.transfer(flow_file, self.REL_SUCCESS) + session.transfer(original_flow_file, self.REL_ORIGINAL) + + @abstractmethod + def transform(self, context: ProcessContextProxy, flowFile: FlowFileProxy) -> FlowFileTransformResult: + pass + + def getPropertyDescriptors(self) -> List[PropertyDescriptor]: + return [] diff --git a/extensions/python/pythonprocessors/nifiapi/properties.py b/extensions/python/pythonprocessors/nifiapi/properties.py new file mode 100644 index 000000000..a146539e5 --- /dev/null +++ b/extensions/python/pythonprocessors/nifiapi/properties.py @@ -0,0 +1,283 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from typing import List +from minifi_native import ProcessSession, FlowFile, ProcessContext, timePeriodStringToMilliseconds, dataSizeStringToBytes + + +# This is a mock for NiFi's StandardValidators class methods, that return the property type equivalent in MiNiFi C++ if exists +class ValidatorGenerator: + def createNonNegativeFloatingPointValidator(self, *args) -> int: + return StandardValidators.ALWAYS_VALID + + def createDirectoryExistsValidator(self, *args) -> int: + return StandardValidators.ALWAYS_VALID + + def createURLValidator(self, *args) -> int: + return StandardValidators.ALWAYS_VALID + + def createListValidator(self, *args) -> int: + return StandardValidators.ALWAYS_VALID + + def createTimePeriodValidator(self, *args) -> int: + return StandardValidators.TIME_PERIOD_VALIDATOR + + def createAttributeExpressionLanguageValidator(self, *args) -> int: + return StandardValidators.ALWAYS_VALID + + def createDataSizeBoundsValidator(self, *args) -> int: + return StandardValidators.DATA_SIZE_VALIDATOR + + def createRegexMatchingValidator(self, *args) -> int: + return StandardValidators.ALWAYS_VALID + + def createRegexValidator(self, *args) -> int: + return StandardValidators.ALWAYS_VALID + + def createLongValidator(self, *args) -> int: + return StandardValidators.LONG_VALIDATOR + + +class StandardValidators: + _standard_validators = ValidatorGenerator() + + ALWAYS_VALID = 0 + NON_EMPTY_VALIDATOR = 1 + INTEGER_VALIDATOR = 2 + POSITIVE_INTEGER_VALIDATOR = 3 + POSITIVE_LONG_VALIDATOR = 4 + NON_NEGATIVE_INTEGER_VALIDATOR = 5 + NUMBER_VALIDATOR = 6 + LONG_VALIDATOR = 7 + PORT_VALIDATOR = 8 + NON_EMPTY_EL_VALIDATOR = 9 + HOSTNAME_PORT_LIST_VALIDATOR = 10 + BOOLEAN_VALIDATOR = 11 + URL_VALIDATOR = 12 + URI_VALIDATOR = 13 + REGULAR_EXPRESSION_VALIDATOR = 14 + REGULAR_EXPRESSION_WITH_EL_VALIDATOR = 15 + TIME_PERIOD_VALIDATOR = 16 + DATA_SIZE_VALIDATOR = 17 + FILE_EXISTS_VALIDATOR = 18 + NON_NEGATIVE_FLOATING_POINT_VALIDATOR = 19 + + +class MinifiPropertyTypes: + INTEGER_TYPE = 0 + LONG_TYPE = 1 + BOOLEAN_TYPE = 2 + DATA_SIZE_TYPE = 3 + TIME_PERIOD_TYPE = 4 + NON_BLANK_TYPE = 5 + PORT_TYPE = 6 + + +def translateStandardValidatorToMiNiFiPropertype(validators: List[int]) -> int: + if validators is None or len(validators) == 0 or len(validators) > 1: + return None + + validator = validators[0] + if validator == StandardValidators.INTEGER_VALIDATOR: + return MinifiPropertyTypes.INTEGER_TYPE + if validator == StandardValidators.LONG_VALIDATOR: + return MinifiPropertyTypes.LONG_TYPE + if validator == StandardValidators.BOOLEAN_VALIDATOR: + return MinifiPropertyTypes.BOOLEAN_TYPE + if validator == StandardValidators.DATA_SIZE_VALIDATOR: + return MinifiPropertyTypes.DATA_SIZE_TYPE + if validator == StandardValidators.TIME_PERIOD_VALIDATOR: + return MinifiPropertyTypes.TIME_PERIOD_TYPE + if validator == StandardValidators.NON_EMPTY_VALIDATOR: + return MinifiPropertyTypes.NON_BLANK_TYPE + if validator == StandardValidators.PORT_VALIDATOR: + return MinifiPropertyTypes.PORT_TYPE + return None + + +class PropertyDependency: + def __init__(self, property_descriptor, *dependent_values): + if dependent_values is None: + dependent_values = [] + + self.property_descriptor = property_descriptor + self.dependent_values = dependent_values + + +class ResourceDefinition: + def __init__(self, allow_multiple=False, allow_file=True, allow_url=False, allow_directory=False, allow_text=False): + self.allow_multiple = allow_multiple + self.allow_file = allow_file + self.allow_url = allow_url + self.allow_directory = allow_directory + self.allow_text = allow_text + + +class ExpressionLanguageScope(Enum): + NONE = 1 + ENVIRONMENT = 2 + FLOWFILE_ATTRIBUTES = 3 + + +class PropertyDescriptor: + def __init__(self, name: str, description: str, required: bool = False, sensitive: bool = False, + display_name: str = None, default_value: str = None, allowable_values: List[str] = None, + dependencies: List[PropertyDependency] = None, expression_language_scope: ExpressionLanguageScope = ExpressionLanguageScope.NONE, + dynamic: bool = False, validators: List[int] = None, resource_definition: ResourceDefinition = None, controller_service_definition: str = None): + if validators is None: + validators = [StandardValidators.ALWAYS_VALID] + + self.name = name + self.description = description + self.required = required + self.sensitive = sensitive + self.displayName = display_name + self.defaultValue = default_value + self.allowableValues = allowable_values + self.dependencies = dependencies + self.expressionLanguageScope = expression_language_scope + self.dynamic = dynamic + self.validators = validators + self.resourceDefinition = resource_definition + self.controllerServiceDefinition = controller_service_definition + + +class TimeUnit(Enum): + NANOSECONDS = "NANOSECONDS", + MICROSECONDS = "MICROSECONDS", + MILLISECONDS = "MILLISECONDS", + SECONDS = "SECONDS", + MINUTES = "MINUTES", + HOURS = "HOURS", + DAYS = "DAYS" + + +class DataUnit(Enum): + B = "B", + KB = "KB", + MB = "MB", + GB = "GB", + TB = "TB" + + +class FlowFileProxy: + def __init__(self, session: ProcessSession, flow_file: FlowFile): + self.session = session + self.flow_file = flow_file + + def getContentsAsBytes(self): + return self.session.getContentsAsBytes(self.flow_file) + + def getAttribute(self, name: str): + return self.flow_file.getAttribute(name) + + def getSize(self): + return self.flow_file.getSize() + + def getAttributes(self): + return self.flow_file.getAttributes() + + +class PythonPropertyValue: + def __init__(self, cpp_context: ProcessContext, name: str, string_value: str, el_supported: bool): + self.cpp_context = cpp_context + self.value = None + self.name = name + if string_value is not None: + self.value = string_value + self.el_supported = el_supported + + def getValue(self) -> str: + return self.value + + def isSet(self) -> bool: + return self.value is not None + + def asInteger(self) -> int: + if not self.value: + return None + return int(self.value) + + def asBoolean(self) -> bool: + if not self.value: + return None + return self.value.lower() == 'true' + + def asFloat(self) -> float: + if not self.value: + return None + return float(self.value) + + def asTimePeriod(self, time_unit: TimeUnit) -> int: + if not self.value: + return None + milliseconds = timePeriodStringToMilliseconds(self.value) + if time_unit == TimeUnit.NANOSECONDS: + return milliseconds * 1000000 + if time_unit == TimeUnit.MICROSECONDS: + return milliseconds * 1000 + if time_unit == TimeUnit.MILLISECONDS: + return milliseconds + if time_unit == TimeUnit.SECONDS: + return int(round(milliseconds / 1000)) + if time_unit == TimeUnit.MINUTES: + return int(round(milliseconds / 1000 / 60)) + if time_unit == TimeUnit.HOURS: + return int(round(milliseconds / 1000 / 60 / 60)) + if time_unit == TimeUnit.DAYS: + return int(round(milliseconds / 1000 / 60 / 60 / 24)) + return 0 + + def asDataSize(self, data_unit: DataUnit) -> int: + if not self.value: + return None + bytes = dataSizeStringToBytes(self.value) + if data_unit == DataUnit.B: + return bytes + if data_unit == DataUnit.KB: + return int(bytes / 1000) + if data_unit == DataUnit.MB: + return int(bytes / 1000 / 1000) + if data_unit == DataUnit.GB: + return int(bytes / 1000 / 1000 / 1000) + if data_unit == DataUnit.TB: + return int(bytes / 1000 / 1000 / 1000 / 1000) + return 0 + + def evaluateAttributeExpressions(self, flow_file_proxy: FlowFileProxy = None): + if flow_file_proxy is None or not self.el_supported: + return self + # If Expression Language is supported and present, evaluate it and return a new PropertyValue. + # Otherwise just return self, in order to avoid the cost of making the call to cpp for getProperty + new_string_value = self.cpp_context.getProperty(self.name, flow_file_proxy.flow_file) + return PythonPropertyValue(self.cpp_context, self.name, new_string_value, self.el_supported) + + +class ProcessContextProxy: + def __init__(self, cpp_context: ProcessContext): + self.cpp_context = cpp_context + + def getProperty(self, descriptor) -> PythonPropertyValue: + if descriptor is None: + return None + if isinstance(descriptor, str): + property_name = descriptor + expression_language_support = True + else: + property_name = descriptor.name + expression_language_support = descriptor.expressionLanguageScope != ExpressionLanguageScope.NONE + property_value = self.cpp_context.getProperty(property_name) + return PythonPropertyValue(self.cpp_context, property_name, property_value, expression_language_support) diff --git a/extensions/python/tests/PythonManifestTests.cpp b/extensions/python/tests/PythonManifestTests.cpp index 534cfdeeb..7d0792a97 100644 --- a/extensions/python/tests/PythonManifestTests.cpp +++ b/extensions/python/tests/PythonManifestTests.cpp @@ -101,7 +101,7 @@ TEST_CASE("Python processor's description is part of the manifest") { REQUIRE(getNode(MyPyProc->children, "type").value == "org.apache.nifi.minifi.processors.MyPyProc"); auto& rels = getNode(MyPyProc->children, "supportedRelationships").children; - REQUIRE(rels.size() == 2); + REQUIRE(rels.size() == 3); auto* success = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "success";}); REQUIRE(success); @@ -133,7 +133,7 @@ TEST_CASE("Python processor's description is part of the manifest") { REQUIRE(getNode(properties[0].children, "defaultValue").value == "banana"); auto& rels = getNode(MyPyProc2->children, "supportedRelationships").children; - REQUIRE(rels.size() == 2); + REQUIRE(rels.size() == 3); auto* success = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "success";}); REQUIRE(success); diff --git a/extensions/python/PythonProcessor.cpp b/extensions/python/types/PyDataConverter.cpp similarity index 55% copy from extensions/python/PythonProcessor.cpp copy to extensions/python/types/PyDataConverter.cpp index 266f160d2..e86fc4553 100644 --- a/extensions/python/PythonProcessor.cpp +++ b/extensions/python/types/PyDataConverter.cpp @@ -16,30 +16,33 @@ * limitations under the License. */ -#include <string> +#include "PyDataConverter.h" -#include "ExecutePythonProcessor.h" -#include "PythonProcessor.h" +#include "PyException.h" +#include "core/TypedValues.h" namespace org::apache::nifi::minifi::extensions::python { -namespace core = org::apache::nifi::minifi::core; +PyObject* timePeriodStringToMilliseconds(PyObject* /*self*/, PyObject* args) { + const char* time_period_str = nullptr; + if (!PyArg_ParseTuple(args, "s", &time_period_str)) { + throw PyException(); + } -PythonProcessor::PythonProcessor(core::Processor* proc) : - processor_(dynamic_cast<python::processors::ExecutePythonProcessor*>(proc)) { - gsl_Expects(processor_); -} + auto milliseconds = core::TimePeriodValue(std::string(time_period_str)).getMilliseconds().count(); -void PythonProcessor::setSupportsDynamicProperties() { - processor_->setSupportsDynamicProperties(); + return object::returnReference(milliseconds); } -void PythonProcessor::setDescription(const std::string& desc) { - processor_->setDescription(desc); -} +PyObject* dataSizeStringToBytes(PyObject* /*self*/, PyObject* args) { + const char* data_size_str = nullptr; + if (!PyArg_ParseTuple(args, "s", &data_size_str)) { + throw PyException(); + } + + uint64_t bytes = core::DataSizeValue(std::string(data_size_str)).getValue(); -void PythonProcessor::addProperty(const std::string& name, const std::string& description, const std::string& defaultvalue, bool required, bool el) { - processor_->addProperty(name, description, defaultvalue, required, el); + return object::returnReference(bytes); } } // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/PythonProcessor.h b/extensions/python/types/PyDataConverter.h similarity index 65% copy from extensions/python/PythonProcessor.h copy to extensions/python/types/PyDataConverter.h index 979355fb3..dc4d4b5ed 100644 --- a/extensions/python/PythonProcessor.h +++ b/extensions/python/types/PyDataConverter.h @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -15,32 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #pragma once -#include <string> -#include <memory> - -#include "core/Processor.h" +#include "../PythonBindings.h" +extern "C" { namespace org::apache::nifi::minifi::extensions::python { -namespace processors { -class ExecutePythonProcessor; -} - -class PythonProcessor { - public: - explicit PythonProcessor(core::Processor* proc); - - void setSupportsDynamicProperties(); - - void setDescription(const std::string& desc); - - void addProperty(const std::string& name, const std::string& description, const std::string& defaultvalue, bool required, bool el); - - private: - python::processors::ExecutePythonProcessor* processor_; -}; +PyObject* timePeriodStringToMilliseconds(PyObject* self, PyObject* args); +PyObject* dataSizeStringToBytes(PyObject* self, PyObject* args); } // namespace org::apache::nifi::minifi::extensions::python +} // extern "C" diff --git a/extensions/python/types/PyProcessContext.cpp b/extensions/python/types/PyProcessContext.cpp index 899b54544..d428d0e08 100644 --- a/extensions/python/types/PyProcessContext.cpp +++ b/extensions/python/types/PyProcessContext.cpp @@ -17,19 +17,20 @@ a * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #include "PyProcessContext.h" #include "PyStateManager.h" +#include "PyScriptFlowFile.h" #include <string> #include "PyException.h" extern "C" { namespace org::apache::nifi::minifi::extensions::python { -static PyMethodDef PyProcessContext_methods[] = { +static PyMethodDef PyProcessContext_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) {"getProperty", (PyCFunction) PyProcessContext::getProperty, METH_VARARGS, nullptr}, {"getStateManager", (PyCFunction) PyProcessContext::getStateManager, METH_VARARGS, nullptr}, {} /* Sentinel */ }; -static PyType_Slot PyProcessContextTypeSpecSlots[] = { +static PyType_Slot PyProcessContextTypeSpecSlots[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) {Py_tp_dealloc, reinterpret_cast<void*>(pythonAllocatedInstanceDealloc<PyProcessContext>)}, {Py_tp_init, reinterpret_cast<void*>(PyProcessContext::init)}, {Py_tp_methods, reinterpret_cast<void*>(PyProcessContext_methods)}, @@ -65,12 +66,31 @@ PyObject* PyProcessContext::getProperty(PyProcessContext* self, PyObject* args) return nullptr; } - const char* property; - if (!PyArg_ParseTuple(args, "s", &property)) { + const char* property_name = nullptr; + PyObject* script_flow_file = nullptr; + if (!PyArg_ParseTuple(args, "s|O", &property_name, &script_flow_file)) { throw PyException(); } + std::string value; - context->getProperty(property, value); + if (!script_flow_file) { + if (!context->getProperty(property_name, value)) { + Py_RETURN_NONE; + } + } else { + auto py_flow = reinterpret_cast<PyScriptFlowFile*>(script_flow_file); + const auto flow_file = py_flow->script_flow_file_.lock(); + if (!flow_file) { + PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'"); + return nullptr; + } + core::Property property{property_name, ""}; + property.setSupportsExpressionLanguage(true); + if (!context->getProperty(property, value, flow_file.get())) { + Py_RETURN_NONE; + } + } + return object::returnReference(value); } diff --git a/extensions/python/types/PyProcessSession.cpp b/extensions/python/types/PyProcessSession.cpp index 5aaea7ad8..02540c70b 100644 --- a/extensions/python/types/PyProcessSession.cpp +++ b/extensions/python/types/PyProcessSession.cpp @@ -24,6 +24,7 @@ #include "types/PyOutputStream.h" #include "types/PyInputStream.h" #include "range/v3/algorithm/remove_if.hpp" +#include "utils/gsl.h" namespace org::apache::nifi::minifi::extensions::python { @@ -101,6 +102,21 @@ std::shared_ptr<core::FlowFile> PyProcessSession::create(const std::shared_ptr<c return result; } +std::shared_ptr<core::FlowFile> PyProcessSession::clone(const std::shared_ptr<core::FlowFile>& flow_file) { + if (!session_) { + throw std::runtime_error("Access of ProcessSession after it has been released"); + } + + if (!flow_file) { + throw std::runtime_error("Flow file to clone is nullptr"); + } + + auto result = session_->clone(*flow_file); + + flow_files_.push_back(result); + return result; +} + void PyProcessSession::remove(const std::shared_ptr<core::FlowFile>& flow_file) { if (!session_) { throw std::runtime_error("Access of ProcessSession after it has been released"); @@ -111,19 +127,38 @@ void PyProcessSession::remove(const std::shared_ptr<core::FlowFile>& flow_file) flow_files_.erase(ranges::remove_if(flow_files_, [&flow_file](const auto& ff)-> bool { return ff == flow_file; }), flow_files_.end()); } +std::string PyProcessSession::getContentsAsString(const std::shared_ptr<core::FlowFile>& flow_file) { + if (!session_) { + throw std::runtime_error("Access of ProcessSession after it has been released"); + } + + if (!flow_file) { + throw std::runtime_error("Access of FlowFile after it has been released"); + } + + std::string content; + session_->read(flow_file, [&content](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t { + content.resize(input_stream->size()); + return gsl::narrow<int64_t>(input_stream->read(as_writable_bytes(std::span(content)))); + }); + return content; +} + extern "C" { -static PyMethodDef PyProcessSessionObject_methods[] = { +static PyMethodDef PyProcessSessionObject_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) {"get", (PyCFunction) PyProcessSessionObject::get, METH_NOARGS, nullptr}, {"create", (PyCFunction) PyProcessSessionObject::create, METH_VARARGS, nullptr}, + {"clone", (PyCFunction) PyProcessSessionObject::clone, METH_VARARGS, nullptr}, {"read", (PyCFunction) PyProcessSessionObject::read, METH_VARARGS, nullptr}, {"write", (PyCFunction) PyProcessSessionObject::write, METH_VARARGS, nullptr}, {"transfer", (PyCFunction) PyProcessSessionObject::transfer, METH_VARARGS, nullptr}, {"remove", (PyCFunction) PyProcessSessionObject::remove, METH_VARARGS, nullptr}, + {"getContentsAsBytes", (PyCFunction) PyProcessSessionObject::getContentsAsBytes, METH_VARARGS, nullptr}, {} /* Sentinel */ }; -static PyType_Slot PyProcessTypeSpecSlots[] = { +static PyType_Slot PyProcessTypeSpecSlots[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) {Py_tp_dealloc, reinterpret_cast<void*>(pythonAllocatedInstanceDealloc<PyProcessSessionObject>)}, {Py_tp_init, reinterpret_cast<void*>(PyProcessSessionObject::init)}, {Py_tp_methods, reinterpret_cast<void*>(PyProcessSessionObject_methods)}, @@ -175,13 +210,30 @@ PyObject* PyProcessSessionObject::create(PyProcessSessionObject* self, PyObject* return object::returnReference(nullptr); } +PyObject* PyProcessSessionObject::clone(PyProcessSessionObject* self, PyObject* args) { + auto session = self->process_session_.lock(); + if (!session) { + PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'"); + return nullptr; + } + PyObject* script_flow_file = nullptr; + if (!PyArg_ParseTuple(args, "O!", PyScriptFlowFile::typeObject(), &script_flow_file)) { + throw PyException(); + } + const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock(); + + if (auto cloned_flow_file = session->clone(flow_file)) + return object::returnReference(std::weak_ptr(cloned_flow_file)); + return object::returnReference(nullptr); +} + PyObject* PyProcessSessionObject::remove(PyProcessSessionObject* self, PyObject* args) { auto session = self->process_session_.lock(); if (!session) { PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'"); return nullptr; } - PyObject* script_flow_file; + PyObject* script_flow_file = nullptr; if (!PyArg_ParseTuple(args, "O!", PyScriptFlowFile::typeObject(), &script_flow_file)) { throw PyException(); } @@ -197,8 +249,8 @@ PyObject* PyProcessSessionObject::read(PyProcessSessionObject* self, PyObject* a Py_RETURN_NONE; } - PyObject* script_flow_file; - PyObject* callback; + PyObject* script_flow_file = nullptr; + PyObject* callback = nullptr; if (!PyArg_ParseTuple(args, "O!O", PyScriptFlowFile::typeObject(), &script_flow_file, &callback)) { throw PyException(); } @@ -219,8 +271,8 @@ PyObject* PyProcessSessionObject::write(PyProcessSessionObject* self, PyObject* Py_RETURN_NONE; } - PyObject* script_flow_file; - PyObject* callback; + PyObject* script_flow_file = nullptr; + PyObject* callback = nullptr; if (!PyArg_ParseTuple(args, "O!O", PyScriptFlowFile::typeObject(), &script_flow_file, &callback)) { throw PyException(); } @@ -241,8 +293,8 @@ PyObject* PyProcessSessionObject::transfer(PyProcessSessionObject* self, PyObjec Py_RETURN_NONE; } - PyObject* script_flow_file; - PyObject* relationship; + PyObject* script_flow_file = nullptr; + PyObject* relationship = nullptr; if (!PyArg_ParseTuple(args, "O!O!", PyScriptFlowFile::typeObject(), &script_flow_file, PyRelationship::typeObject(), &relationship)) { throw PyException(); } @@ -256,6 +308,22 @@ PyObject* PyProcessSessionObject::transfer(PyProcessSessionObject* self, PyObjec Py_RETURN_NONE; } +PyObject* PyProcessSessionObject::getContentsAsBytes(PyProcessSessionObject* self, PyObject* args) { + auto session = self->process_session_.lock(); + if (!session) { + PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'"); + return nullptr; + } + PyObject* script_flow_file = nullptr; + if (!PyArg_ParseTuple(args, "O!", PyScriptFlowFile::typeObject(), &script_flow_file)) { + throw PyException(); + } + const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock(); + auto content = session->getContentsAsString(flow_file); + + return PyBytes_FromStringAndSize(content.c_str(), gsl::narrow<Py_ssize_t>(content.size())); +} + PyTypeObject* PyProcessSessionObject::typeObject() { static OwnedObject PyProcessSessionObjectType{PyType_FromSpec(&PyProcessSessionObjectTypeSpec)}; return reinterpret_cast<PyTypeObject*>(PyProcessSessionObjectType.get()); diff --git a/extensions/python/types/PyProcessSession.h b/extensions/python/types/PyProcessSession.h index 756d8b283..60a788d5b 100644 --- a/extensions/python/types/PyProcessSession.h +++ b/extensions/python/types/PyProcessSession.h @@ -31,10 +31,12 @@ class PyProcessSession { std::shared_ptr<core::FlowFile> get(); std::shared_ptr<core::FlowFile> create(const std::shared_ptr<core::FlowFile>& flow_file = nullptr); + std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile>& flow_file); void transfer(const std::shared_ptr<core::FlowFile>& flow_file, const core::Relationship& relationship); void read(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject input_stream_callback); void write(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject output_stream_callback); void remove(const std::shared_ptr<core::FlowFile>& flow_file); + std::string getContentsAsString(const std::shared_ptr<core::FlowFile>& flow_file); private: std::vector<std::shared_ptr<core::FlowFile>> flow_files_; @@ -53,10 +55,12 @@ struct PyProcessSessionObject { static PyObject* get(PyProcessSessionObject* self, PyObject* args); static PyObject* create(PyProcessSessionObject* self, PyObject* args); + static PyObject* clone(PyProcessSessionObject* self, PyObject* args); static PyObject* read(PyProcessSessionObject* self, PyObject* args); static PyObject* write(PyProcessSessionObject* self, PyObject* args); static PyObject* transfer(PyProcessSessionObject* self, PyObject* args); static PyObject* remove(PyProcessSessionObject* self, PyObject* args); + static PyObject* getContentsAsBytes(PyProcessSessionObject* self, PyObject* args); static PyTypeObject* typeObject(); }; diff --git a/extensions/python/types/PyProcessor.cpp b/extensions/python/types/PyProcessor.cpp index d07fa1465..ce5a5ccb5 100644 --- a/extensions/python/types/PyProcessor.cpp +++ b/extensions/python/types/PyProcessor.cpp @@ -17,20 +17,21 @@ #include "PyProcessor.h" #include <string> +#include <optional> #include "PyException.h" #include "Types.h" extern "C" { namespace org::apache::nifi::minifi::extensions::python { -static PyMethodDef PyProcessor_methods[] = { +static PyMethodDef PyProcessor_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) {"setSupportsDynamicProperties", (PyCFunction) PyProcessor::setSupportsDynamicProperties, METH_VARARGS, nullptr}, {"setDescription", (PyCFunction) PyProcessor::setDescription, METH_VARARGS, nullptr}, {"addProperty", (PyCFunction) PyProcessor::addProperty, METH_VARARGS, nullptr}, {} /* Sentinel */ }; -static PyType_Slot PyProcessorTypeSpecSlots[] = { +static PyType_Slot PyProcessorTypeSpecSlots[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) {Py_tp_dealloc, reinterpret_cast<void*>(pythonAllocatedInstanceDealloc<PyProcessor>)}, {Py_tp_init, reinterpret_cast<void*>(PyProcessor::init)}, {Py_tp_methods, reinterpret_cast<void*>(PyProcessor_methods)}, @@ -77,7 +78,7 @@ PyObject* PyProcessor::setDescription(PyProcessor* self, PyObject* args) { Py_RETURN_NONE; } - const char* description; + const char* description = nullptr; if (!PyArg_ParseTuple(args, "s", &description)) { throw PyException(); } @@ -110,11 +111,29 @@ PyObject* PyProcessor::addProperty(PyProcessor* self, PyObject* args) { BorrowedStr name = BorrowedStr::fromTuple(args, 0); BorrowedStr description = BorrowedStr::fromTuple(args, 1); - BorrowedStr default_value = BorrowedStr::fromTuple(args, 2); + std::optional<std::string> default_value; + auto default_value_pystr = BorrowedStr::fromTuple(args, 2); + if (default_value_pystr.get() && default_value_pystr.get() != Py_None) { + default_value = default_value_pystr.toUtf8String(); + } bool is_required = getBoolFromTuple(args, 3); bool supports_expression_language = getBoolFromTuple(args, 4); - processor->addProperty(name.toUtf8String(), description.toUtf8String(), default_value.toUtf8String(), is_required, supports_expression_language); + bool sensitive = false; + auto arg_size = PyTuple_Size(args); + if (arg_size > 5) { + sensitive = getBoolFromTuple(args, 5); + } + + std::optional<int64_t> validator_value; + if (arg_size > 6) { + auto validator_value_pyint = BorrowedLong::fromTuple(args, 6); + if (validator_value_pyint.get() && validator_value_pyint.get() != Py_None) { + validator_value = validator_value_pyint.asInt64(); + } + } + + processor->addProperty(name.toUtf8String(), description.toUtf8String(), default_value, is_required, supports_expression_language, sensitive, validator_value); Py_RETURN_NONE; } diff --git a/extensions/python/types/PyScriptFlowFile.cpp b/extensions/python/types/PyScriptFlowFile.cpp index 742c747a9..da6180fa8 100644 --- a/extensions/python/types/PyScriptFlowFile.cpp +++ b/extensions/python/types/PyScriptFlowFile.cpp @@ -22,16 +22,18 @@ extern "C" { namespace org::apache::nifi::minifi::extensions::python { -static PyMethodDef PyScriptFlowFile_methods[] = { +static PyMethodDef PyScriptFlowFile_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) {"getAttribute", (PyCFunction) PyScriptFlowFile::getAttribute, METH_VARARGS, nullptr}, {"addAttribute", (PyCFunction) PyScriptFlowFile::addAttribute, METH_VARARGS, nullptr}, {"updateAttribute", (PyCFunction) PyScriptFlowFile::updateAttribute, METH_VARARGS, nullptr}, {"removeAttribute", (PyCFunction) PyScriptFlowFile::removeAttribute, METH_VARARGS, nullptr}, {"setAttribute", (PyCFunction) PyScriptFlowFile::setAttribute, METH_VARARGS, nullptr}, + {"getSize", (PyCFunction) PyScriptFlowFile::getSize, METH_VARARGS, nullptr}, + {"getAttributes", (PyCFunction) PyScriptFlowFile::getAttributes, METH_VARARGS, nullptr}, {} /* Sentinel */ }; -static PyType_Slot PyScriptFlowFileTypeSpecSlots[] = { +static PyType_Slot PyScriptFlowFileTypeSpecSlots[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays) {Py_tp_dealloc, reinterpret_cast<void*>(pythonAllocatedInstanceDealloc<PyScriptFlowFile>)}, {Py_tp_init, reinterpret_cast<void*>(PyScriptFlowFile::init)}, {Py_tp_methods, reinterpret_cast<void*>(PyScriptFlowFile_methods)}, @@ -68,7 +70,7 @@ PyObject* PyScriptFlowFile::getAttribute(PyScriptFlowFile* self, PyObject* args) return nullptr; } - const char* attribute; + const char* attribute = nullptr; if (!PyArg_ParseTuple(args, "s", &attribute)) { throw PyException(); } @@ -82,8 +84,8 @@ PyObject* PyScriptFlowFile::addAttribute(PyScriptFlowFile* self, PyObject* args) return nullptr; } - const char* key; - const char* value; + const char* key = nullptr; + const char* value = nullptr; if (!PyArg_ParseTuple(args, "ss", &key, &value)) { throw PyException(); } @@ -98,8 +100,8 @@ PyObject* PyScriptFlowFile::updateAttribute(PyScriptFlowFile* self, PyObject* ar return nullptr; } - const char* key; - const char* value; + const char* key = nullptr; + const char* value = nullptr; if (!PyArg_ParseTuple(args, "ss", &key, &value)) { throw PyException(); } @@ -114,7 +116,7 @@ PyObject* PyScriptFlowFile::removeAttribute(PyScriptFlowFile* self, PyObject* ar return nullptr; } - const char* attribute; + const char* attribute = nullptr; if (!PyArg_ParseTuple(args, "s", &attribute)) { throw PyException(); } @@ -128,8 +130,8 @@ PyObject* PyScriptFlowFile::setAttribute(PyScriptFlowFile* self, PyObject* args) return nullptr; } - const char* key; - const char* value; + const char* key = nullptr; + const char* value = nullptr; if (!PyArg_ParseTuple(args, "ss", &key, &value)) { throw PyException(); } @@ -137,6 +139,31 @@ PyObject* PyScriptFlowFile::setAttribute(PyScriptFlowFile* self, PyObject* args) return object::returnReference(flow_file->setAttribute(key, value)); } +PyObject* PyScriptFlowFile::getSize(PyScriptFlowFile* self, PyObject* /*args*/) { + auto flow_file = self->script_flow_file_.lock(); + if (!flow_file) { + PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'"); + return nullptr; + } + + return object::returnReference(flow_file->getSize()); +} + +PyObject* PyScriptFlowFile::getAttributes(PyScriptFlowFile* self, PyObject* /*args*/) { + auto flow_file = self->script_flow_file_.lock(); + if (!flow_file) { + PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'"); + return nullptr; + } + + auto attributes = OwnedDict::create(); + for (const auto& [key, value] : flow_file->getAttributes()) { + attributes.put(key, value); + } + + return object::returnReference(attributes); +} + PyTypeObject* PyScriptFlowFile::typeObject() { static OwnedObject PyScriptFlowFileType{PyType_FromSpec(&PyScriptFlowFileTypeSpec)}; return reinterpret_cast<PyTypeObject*>(PyScriptFlowFileType.get()); diff --git a/extensions/python/types/PyScriptFlowFile.h b/extensions/python/types/PyScriptFlowFile.h index bd5fc174d..932444d8f 100644 --- a/extensions/python/types/PyScriptFlowFile.h +++ b/extensions/python/types/PyScriptFlowFile.h @@ -37,6 +37,8 @@ struct PyScriptFlowFile { static PyObject* updateAttribute(PyScriptFlowFile* self, PyObject* args); static PyObject* removeAttribute(PyScriptFlowFile* self, PyObject* args); static PyObject* setAttribute(PyScriptFlowFile* self, PyObject* args); + static PyObject* getSize(PyScriptFlowFile* self, PyObject* args); + static PyObject* getAttributes(PyScriptFlowFile* self, PyObject* args); static PyTypeObject* typeObject(); }; diff --git a/extensions/python/types/Types.h b/extensions/python/types/Types.h index 97203a6bb..fa9055d1f 100644 --- a/extensions/python/types/Types.h +++ b/extensions/python/types/Types.h @@ -22,6 +22,7 @@ #include <utility> #include "../PyException.h" +#include "utils/gsl.h" namespace org::apache::nifi::minifi::extensions::python { @@ -160,7 +161,18 @@ class Long : public ReferenceHolder<reference_type> { } int64_t asInt64() { - return static_cast<int64_t>(PyLong_AsLongLong(this->ref_.get())); + auto long_value = PyLong_AsLongLong(this->ref_.get()); + if (long_value == -1 && PyErr_Occurred()) { + throw PyException(); + } + return gsl::narrow<int64_t>(long_value); + } + + static BorrowedLong fromTuple(PyObject* tuple, Py_ssize_t location) requires(reference_type == ReferenceType::BORROWED) { + BorrowedLong long_from_tuple{PyTuple_GetItem(tuple, location)}; + if (long_from_tuple.get() == nullptr) + throw PyException(); + return long_from_tuple; } }; diff --git a/extensions/script/ExecuteScript.cpp b/extensions/script/ExecuteScript.cpp index 23df7a617..8e1080027 100644 --- a/extensions/script/ExecuteScript.cpp +++ b/extensions/script/ExecuteScript.cpp @@ -71,7 +71,7 @@ void ExecuteScript::onSchedule(core::ProcessContext& context, core::ProcessSessi throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Script File set is not a regular file or does not exist: " + script_file); } - script_executor_->initialize(std::move(script_file), std::move(script_body), std::move(module_directory), getMaxConcurrentTasks(), Success, Failure, logger_); + script_executor_->initialize(std::move(script_file), std::move(script_body), std::move(module_directory), getMaxConcurrentTasks(), Success, Failure, Original, logger_); } void ExecuteScript::onTriggerSharedPtr(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { diff --git a/extensions/script/ExecuteScript.h b/extensions/script/ExecuteScript.h index 5b1b1afdc..e5ef77739 100644 --- a/extensions/script/ExecuteScript.h +++ b/extensions/script/ExecuteScript.h @@ -82,7 +82,8 @@ class ExecuteScript : public core::Processor { EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Script successes"}; EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "Script failures"}; - EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; + EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", "Original flow file"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure, Original}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; diff --git a/extensions/script/ScriptExecutor.h b/extensions/script/ScriptExecutor.h index c33be136b..49f82b4bf 100644 --- a/extensions/script/ScriptExecutor.h +++ b/extensions/script/ScriptExecutor.h @@ -39,6 +39,7 @@ class ScriptExecutor : public minifi::core::CoreComponent { size_t max_concurrent_engines, const core::Relationship& success, const core::Relationship& failure, + const core::Relationship& original, const std::shared_ptr<core::logging::Logger>& logger) = 0; protected: diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h index 83ec09b82..41fa10b04 100644 --- a/libminifi/include/core/ConfigurableComponent.h +++ b/libminifi/include/core/ConfigurableComponent.h @@ -178,7 +178,7 @@ class ConfigurableComponent { * * @return map of property keys to Property instances. */ - std::map<std::string, Property> getProperties() const; + virtual std::map<std::string, Property> getProperties() const; /** * @return if property exists and is explicitly set, not just falling back to default value @@ -212,7 +212,10 @@ class ConfigurableComponent { // Dynamic properties std::map<std::string, Property> dynamic_properties_; + virtual const Property* findProperty(const std::string& name) const; + private: + Property* findProperty(const std::string& name); std::shared_ptr<logging::Logger> logger_; bool createDynamicProperty(const std::string &name, const std::string &value); @@ -222,9 +225,9 @@ template<typename T> bool ConfigurableComponent::getProperty(const std::string& name, T &value) const { std::lock_guard<std::mutex> lock(configuration_mutex_); - const auto property_name_and_object = properties_.find(name); - if (property_name_and_object != properties_.end()) { - const Property& property = property_name_and_object->second; + const auto prop_ptr = findProperty(name); + if (prop_ptr != nullptr) { + const Property& property = *prop_ptr; if (property.getValue().getValue() == nullptr) { // empty value if (property.getRequired()) { diff --git a/libminifi/include/core/PropertyType.h b/libminifi/include/core/PropertyType.h index 1f4aa2046..64cd1df24 100644 --- a/libminifi/include/core/PropertyType.h +++ b/libminifi/include/core/PropertyType.h @@ -338,6 +338,7 @@ class DataTransferSpeedPropertyType : public PropertyType { }; namespace StandardPropertyTypes { + inline constexpr auto INVALID_TYPE = NeverValidPropertyType{}; inline constexpr auto INTEGER_TYPE = IntegerPropertyType{}; inline constexpr auto UNSIGNED_INT_TYPE = UnsignedIntPropertyType{}; @@ -373,6 +374,19 @@ inline gsl::not_null<const PropertyValidator*> getValidator(const std::shared_pt return gsl::make_not_null<const PropertyValidator*>(&VALID_TYPE); } } + +enum class PropertyTypeCode : int64_t { + INTEGER = 0, + LONG = 1, + BOOLEAN = 2, + DATA_SIZE = 3, + TIME_PERIOD = 4, + NON_BLANK = 5, + PORT = 6 +}; + +const core::PropertyType& translateCodeToPropertyType(const PropertyTypeCode& code); + } // namespace StandardPropertyTypes } // namespace org::apache::nifi::minifi::core diff --git a/libminifi/include/core/TypedValues.h b/libminifi/include/core/TypedValues.h index a108744a4..615a4dd0a 100644 --- a/libminifi/include/core/TypedValues.h +++ b/libminifi/include/core/TypedValues.h @@ -31,6 +31,7 @@ #include "utils/Literals.h" #include "utils/Export.h" #include "utils/TimeUtil.h" +#include "core/logging/Logger.h" namespace org::apache::nifi::minifi::core { diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index 3ed2c0533..00fdbb241 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -36,19 +36,33 @@ ConfigurableComponent::ConfigurableComponent() ConfigurableComponent::~ConfigurableComponent() = default; +const Property* ConfigurableComponent::findProperty(const std::string& name) const { + const auto& it = properties_.find(name); + if (it != properties_.end()) { + return &it->second; + } + return nullptr; +} + +Property* ConfigurableComponent::findProperty(const std::string& name) { + const auto& const_self = *this; + return const_cast<Property*>(const_self.findProperty(name)); +} + bool ConfigurableComponent::getProperty(const std::string &name, Property &prop) const { std::lock_guard<std::mutex> lock(configuration_mutex_); - auto &&it = properties_.find(name); + auto prop_ptr = findProperty(name); - if (it != properties_.end()) { - prop = it->second; + if (prop_ptr != nullptr) { + prop = *prop_ptr; return true; } else { return false; } } + /** * Sets the property using the provided name * @param property name @@ -57,11 +71,11 @@ bool ConfigurableComponent::getProperty(const std::string &name, Property &prop) */ bool ConfigurableComponent::setProperty(const std::string& name, const std::string& value) { std::lock_guard<std::mutex> lock(configuration_mutex_); - auto it = properties_.find(name); + auto prop_ptr = findProperty(name); - if (it != properties_.end()) { - Property orig_property = it->second; - Property& new_property = it->second; + if (prop_ptr != nullptr) { + Property orig_property = *prop_ptr; + Property& new_property = *prop_ptr; auto onExit = gsl::finally([&]{ onPropertyModified(orig_property, new_property); logger_->log_debug("Component {} property name {} value {}", name, new_property.getName(), value); @@ -91,11 +105,11 @@ bool ConfigurableComponent::setProperty(const std::string& name, const std::stri */ bool ConfigurableComponent::updateProperty(const std::string &name, const std::string &value) { std::lock_guard<std::mutex> lock(configuration_mutex_); - auto &&it = properties_.find(name); + auto prop_ptr = findProperty(name); - if (it != properties_.end()) { - Property orig_property = it->second; - Property& new_property = it->second; + if (prop_ptr != nullptr) { + Property orig_property = *prop_ptr; + Property& new_property = *prop_ptr; auto onExit = gsl::finally([&] { onPropertyModified(orig_property, new_property); logger_->log_debug("Component {} property name {} value {}", name, new_property.getName(), value); @@ -119,11 +133,11 @@ bool ConfigurableComponent::updateProperty(const PropertyReference& property, st */ bool ConfigurableComponent::setProperty(const Property& prop, const std::string& value) { std::lock_guard<std::mutex> lock(configuration_mutex_); - auto it = properties_.find(prop.getName()); + auto prop_ptr = findProperty(prop.getName()); - if (it != properties_.end()) { - Property orig_property = it->second; - Property& new_property = it->second; + if (prop_ptr != nullptr) { + Property orig_property = *prop_ptr; + Property& new_property = *prop_ptr; auto onExit = gsl::finally([&] { onPropertyModified(orig_property, new_property); if (prop.isSensitive()) { @@ -160,11 +174,11 @@ bool ConfigurableComponent::setProperty(const PropertyReference& property, std:: bool ConfigurableComponent::setProperty(const Property& prop, PropertyValue &value) { std::lock_guard<std::mutex> lock(configuration_mutex_); - auto it = properties_.find(prop.getName()); + auto prop_ptr = findProperty(prop.getName()); - if (it != properties_.end()) { - Property orig_property = it->second; - Property& new_property = it->second; + if (prop_ptr != nullptr) { + Property orig_property = *prop_ptr; + Property& new_property = *prop_ptr; auto onExit = gsl::finally([&] { onPropertyModified(orig_property, new_property); if (prop.isSensitive()) { diff --git a/libminifi/src/core/PropertyType.cpp b/libminifi/src/core/PropertyType.cpp index 1a9515897..d1e4e2ad5 100644 --- a/libminifi/src/core/PropertyType.cpp +++ b/libminifi/src/core/PropertyType.cpp @@ -56,4 +56,29 @@ PropertyValue DataTransferSpeedPropertyType::parse(std::string_view input) const return PropertyValue::parse<core::DataTransferSpeedValue>(input, *this); } +namespace StandardPropertyTypes { + +const core::PropertyType& translateCodeToPropertyType(const PropertyTypeCode& code) { + switch (code) { + case PropertyTypeCode::INTEGER: + return core::StandardPropertyTypes::INTEGER_TYPE; + case PropertyTypeCode::LONG: + return core::StandardPropertyTypes::LONG_TYPE; + case PropertyTypeCode::BOOLEAN: + return core::StandardPropertyTypes::BOOLEAN_TYPE; + case PropertyTypeCode::DATA_SIZE: + return core::StandardPropertyTypes::DATA_SIZE_TYPE; + case PropertyTypeCode::TIME_PERIOD: + return core::StandardPropertyTypes::TIME_PERIOD_TYPE; + case PropertyTypeCode::NON_BLANK: + return core::StandardPropertyTypes::NON_BLANK_TYPE; + case PropertyTypeCode::PORT: + return core::StandardPropertyTypes::PORT_TYPE; + default: + throw std::invalid_argument("Unknown PropertyTypeCode"); + } +} + +} // namespace StandardPropertyTypes + } // namespace org::apache::nifi::minifi::core diff --git a/msi/WixWin.wsi b/msi/WixWin.wsi index 6757dd56b..4e82c5797 100644 --- a/msi/WixWin.wsi +++ b/msi/WixWin.wsi @@ -62,6 +62,15 @@ Licensed to the Apache Software Foundation (ASF) under one or more <ComponentRef Id="UpdateConfigNotExist"/> </Feature> + <Feature Id="InstallMiNiFiPython" Title="Apache NiFi MiNiFi C++ Python processors" AllowAdvertise="yes" Level="1"> + <ComponentRef Id="PythonProcessorExampleFiles"/> + <ComponentRef Id="NifiPythonProcessorFiles"/> + <ComponentRef Id="PythonProcessorMojoFiles"/> + <ComponentRef Id="PythonProcessorH2oFiles"/> + <ComponentRef Id="PythonProcessorNifiApiFiles"/> + <ComponentRef Id="PythonProcessorGoogleFiles"/> + </Feature> + <FeatureRef Id="ProductFeature"/> <?ifdef CPACK_WIX_UI_DIALOG?> @@ -233,13 +242,13 @@ Licensed to the Apache Software Foundation (ASF) under one or more <Condition Action="disable"><![CDATA[ENABLEC2<>1]]></Condition> <Condition Action="enable">ENABLEC2="1"</Condition> </Control> - <!-- + <!-- comment when available <Control Id="AgentProtocolLabel" Type="Text" X="20" Y="240" Width="60" Height="20" NoPrefix="yes" Text="Agent Protocol" /> <Control Id="AgentProtocolComboBreaker" Type="ComboBox" X="20" Y="260" Width="300" Height="17" Property="AGENT_PROTOCOL" > - + <ComboBox Property="AGENT_PROTOCOL"> <ListItem Value="RESTSender" /> <ListItem Value="COAPProtocol" /> @@ -266,10 +275,21 @@ Licensed to the Apache Software Foundation (ASF) under one or more <Property Id="SERVICEACCOUNTPASSWORD" /> <SetProperty Id="AGENT_IDENTIFIER" After="AppSearch" Value="[ComputerName]" Sequence="first" /> - + <SetProperty Id="ENABLEC2" After="AppSearch" Value="0" Sequence="first" > <![CDATA[ENABLEC2 = 0]]> </SetProperty> + + <CustomAction Id="MakeSymbolicLink" Directory="RootInstallDir" ExeCommand="cmd /c mklink extensions\minifi_native.pyd minifi-python-script-extension.dll" Execute="deferred" Impersonate="no" /> + <CustomAction Id="RemoveSymbolicLink" Directory="RootInstallDir" ExeCommand="cmd /c DEL extensions\minifi_native.pyd" Execute="deferred" Impersonate="no"/> + + <InstallExecuteSequence> + <Custom Action="MakeSymbolicLink" After="InstallFiles">NOT Installed</Custom> + </InstallExecuteSequence> + + <InstallExecuteSequence> + <Custom Action="RemoveSymbolicLink" After="InstallInitialize">Installed AND NOT REINSTALL</Custom> + </InstallExecuteSequence> </Product> <Fragment> @@ -392,6 +412,49 @@ Licensed to the Apache Software Foundation (ASF) under one or more </Component> </Directory> + + <Directory Id="INSTALLPYTHONDIR" Name="minifi-python"> + <Directory Id="INSTALLEXAMPLESDIR" Name="examples"> + <Component Id="PythonProcessorExampleFiles" Guid="2508ace9-6f5d-4cb1-af66-58d5f44ae2b1"> + <File Id="Examples_AddPythonAttribute" Name="AddPythonAttribute.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\AddPythonAttribute.py"/> + <File Id="Examples_CountingProcessor" Name="CountingProcessor.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\CountingProcessor.py"/> + <File Id="Examples_GaussianDistributionWithNumpy" Name="GaussianDistributionWithNumpy.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\GaussianDistributionWithNumpy.py"/> + <File Id="Examples_MoveContentToJson" Name="MoveContentToJson.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\MoveContentToJson.py"/> + <File Id="Examples_RemoveFlowFile" Name="RemoveFlowFile.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\RemoveFlowFile.py"/> + <File Id="Examples_SentimentAnalysis" Name="SentimentAnalysis.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\SentimentAnalysis.py"/> + </Component> + </Directory> + <Directory Id="INSTALLGOOGLEDIR" Name="google"> + <Component Id="PythonProcessorGoogleFiles" Guid="90b898a9-76fb-4888-a4aa-adcd22097863"> + <File Id="Google_SentimentAnalyzer" Name="SentimentAnalyzer.py" KeyPath="yes" Source="..\extensions\python\pythonprocessors\google\SentimentAnalyzer.py"/> + </Component> + </Directory> + <Directory Id="INSTALLH2ODIR" Name="h2o"> + <Component Id="PythonProcessorH2oFiles" Guid="b290aeb3-c67a-4cd1-a0a4-1ba049a2e44e"> + <File Id="H2o_ConvertDsToCsv" Name="ConvertDsToCsv.py" KeyPath="yes" Source="..\extensions\python\pythonprocessors\h2o\ConvertDsToCsv.py"/> + </Component> + <Directory Id="INSTALLH2O3DIR" Name="h2o3"> + <Directory Id="INSTALLMOJODIR" Name="mojo"> + <Component Id="PythonProcessorMojoFiles" Guid="d2b803da-1cd1-4925-802b-19f3e09fc0f7"> + <File Id="H2o_ExecuteH2oMojoScoring" Name="ExecuteH2oMojoScoring.py" KeyPath="yes" Source="..\extensions\python\pythonprocessors\h2o\h2o3\mojo\ExecuteH2oMojoScoring.py"/> + </Component> + </Directory> + </Directory> + </Directory> + <Directory Id="INSTALLNIFIAPIDIR" Name="nifiapi"> + <Component Id="PythonProcessorNifiApiFiles" Guid="a9cb7b7b-e66d-4e32-9115-eab4aa980124"> + <File Id="NifiApi_flowfiletransform" Name="flowfiletransform.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\nifiapi\flowfiletransform.py"/> + <File Id="NifiApi_properties" Name="properties.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\nifiapi\properties.py"/> + <File Id="NifiApi_documentation" Name="documentation.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\nifiapi\documentation.py"/> + <File Id="NifiApi_init" Name="__init__.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\nifiapi\__init__.py"/> + </Component> + </Directory> + <Directory Id="INSTALLNIFIPYTHONPROCESSORSDIR" Name="nifi_python_processors"> + <Component Id="NifiPythonProcessorFiles" Guid="66ce16d4-aea4-4fcb-bbbb-8a28813e2138"> + <File Id="NifiPythonProcessors_init" Name="__init__.py" KeyPath="yes" Source="..\extensions\python\pythonprocessors\nifi_python_processors\__init__.py"/> + </Component> + </Directory> + </Directory> </Directory> </DirectoryRef> </Fragment> diff --git a/msi/WixWinMergeModules.wsi b/msi/WixWinMergeModules.wsi index f4124f3ee..ba6b720f7 100644 --- a/msi/WixWinMergeModules.wsi +++ b/msi/WixWinMergeModules.wsi @@ -72,8 +72,16 @@ Licensed to the Apache Software Foundation (ASF) under one or more <ComponentRef Id="UpdateConfigNotExist"/> </Feature> - <FeatureRef Id="ProductFeature"/> + <Feature Id="InstallMiNiFiPython" Title="Apache NiFi MiNiFi C++ Python processors" AllowAdvertise="yes" Level="1"> + <ComponentRef Id="PythonProcessorExampleFiles"/> + <ComponentRef Id="NifiPythonProcessorFiles"/> + <ComponentRef Id="PythonProcessorMojoFiles"/> + <ComponentRef Id="PythonProcessorH2oFiles"/> + <ComponentRef Id="PythonProcessorNifiApiFiles"/> + <ComponentRef Id="PythonProcessorGoogleFiles"/> + </Feature> + <FeatureRef Id="ProductFeature"/> <?ifdef CPACK_WIX_UI_DIALOG?> <WixVariable Id="WixUIDialogBmp" Value="$(var.CPACK_WIX_UI_DIALOG)"/> @@ -138,8 +146,6 @@ Licensed to the Apache Software Foundation (ASF) under one or more <UI> <!-- Define the installer UI --> - - <Dialog Id="ApacheLicenseDlg" Width="370" Height="270" Title="Please review our license"> <Control Id="LicenseAcceptedCheckBox" Type="CheckBox" X="20" Y="207" Width="330" Height="18" CheckBoxValue="1" Property="LicenseAccepted" Text="Click here to accept this license" /> <Control Id="Back" Type="PushButton" X="180" Y="243" Width="56" Height="17" Text="Back" /> @@ -246,13 +252,13 @@ Licensed to the Apache Software Foundation (ASF) under one or more <Condition Action="disable"><![CDATA[ENABLEC2<>1]]></Condition> <Condition Action="enable">ENABLEC2="1"</Condition> </Control> - <!-- + <!-- comment when available <Control Id="AgentProtocolLabel" Type="Text" X="20" Y="240" Width="60" Height="20" NoPrefix="yes" Text="Agent Protocol" /> <Control Id="AgentProtocolComboBreaker" Type="ComboBox" X="20" Y="260" Width="300" Height="17" Property="AGENT_PROTOCOL" > - + <ComboBox Property="AGENT_PROTOCOL"> <ListItem Value="RESTSender" /> <ListItem Value="COAPProtocol" /> @@ -263,11 +269,8 @@ Licensed to the Apache Software Foundation (ASF) under one or more --> </Dialog> - <UIRef Id="WixUI_HK" /> - - </UI> <Property Id="AGENT_CLASS" Value="Your Agent Class" /> @@ -282,10 +285,21 @@ Licensed to the Apache Software Foundation (ASF) under one or more <Property Id="SERVICEACCOUNTPASSWORD" /> <SetProperty Id="AGENT_IDENTIFIER" After="AppSearch" Value="[ComputerName]" Sequence="first" /> - + <SetProperty Id="ENABLEC2" After="AppSearch" Value="0" Sequence="first" > <![CDATA[ENABLEC2 = 0]]> </SetProperty> + + <CustomAction Id="MakeSymbolicLink" Directory="RootInstallDir" ExeCommand="cmd /c mklink extensions\minifi_native.pyd minifi-python-script-extension.dll" Execute="deferred" Impersonate="no" /> + <CustomAction Id="RemoveSymbolicLink" Directory="RootInstallDir" ExeCommand="cmd /c DEL extensions\minifi_native.pyd" Execute="deferred" Impersonate="no"/> + + <InstallExecuteSequence> + <Custom Action="MakeSymbolicLink" After="InstallFiles">NOT Installed</Custom> + </InstallExecuteSequence> + + <InstallExecuteSequence> + <Custom Action="RemoveSymbolicLink" After="InstallInitialize">Installed AND NOT REINSTALL</Custom> + </InstallExecuteSequence> </Product> <Fragment> @@ -326,10 +340,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more <Directory Id="INSTALLBINDIR" Name="bin"> <Component Id="minifiService" Guid="87658309-0339-425c-8633-f54ffaaa4921"> - <File Id="MiNiFiExe" - Name="minifi.exe" - KeyPath="yes" - Source="minifi_main\minifi.exe"/> + <File Id="MiNiFiExe" Name="minifi.exe" KeyPath="yes" Source="minifi_main\minifi.exe"/> <!-- It is not possible to set 'ServiceConfig' for 2 'Component' (error LGHT0130 : The primary key 'Apache NiFi MiNiFi' is duplicated in table 'ServiceConfig'). Problem is described: http://windows-installer-xml-wix-toolset.687559.n2.nabble.com/Installing-a-Service-with-Varying-Dependencies-td693097.html @@ -367,10 +378,7 @@ Licensed to the Apache Software Foundation (ASF) under one or more <Condition><![CDATA[SERVICEACCOUNT="LocalSystem"]]></Condition> </Component> <Component Id="minifiServiceNotLocal" Guid="87658309-0339-425c-8633-f54ffaaa4922"> - <File Id="MiNiFiExeWithPassword" - Name="minifi.exe" - KeyPath="yes" - Source="minifi_main\minifi.exe"/> + <File Id="MiNiFiExeWithPassword" Name="minifi.exe" KeyPath="yes" Source="minifi_main\minifi.exe"/> <ServiceInstall Id="MiNiFiExeServiceWithPassword" Type="ownProcess" Vital="yes" @@ -399,6 +407,49 @@ Licensed to the Apache Software Foundation (ASF) under one or more <Condition><![CDATA[SERVICEACCOUNT<>"LocalSystem"]]></Condition> </Component> </Directory> + + <Directory Id="INSTALLPYTHONDIR" Name="minifi-python"> + <Directory Id="INSTALLEXAMPLESDIR" Name="examples"> + <Component Id="PythonProcessorExampleFiles" Guid="2508ace9-6f5d-4cb1-af66-58d5f44ae2b1"> + <File Id="Examples_AddPythonAttribute" Name="AddPythonAttribute.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\AddPythonAttribute.py"/> + <File Id="Examples_CountingProcessor" Name="CountingProcessor.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\CountingProcessor.py"/> + <File Id="Examples_GaussianDistributionWithNumpy" Name="GaussianDistributionWithNumpy.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\GaussianDistributionWithNumpy.py"/> + <File Id="Examples_MoveContentToJson" Name="MoveContentToJson.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\MoveContentToJson.py"/> + <File Id="Examples_RemoveFlowFile" Name="RemoveFlowFile.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\RemoveFlowFile.py"/> + <File Id="Examples_SentimentAnalysis" Name="SentimentAnalysis.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\examples\SentimentAnalysis.py"/> + </Component> + </Directory> + <Directory Id="INSTALLGOOGLEDIR" Name="google"> + <Component Id="PythonProcessorGoogleFiles" Guid="90b898a9-76fb-4888-a4aa-adcd22097863"> + <File Id="Google_SentimentAnalyzer" Name="SentimentAnalyzer.py" KeyPath="yes" Source="..\extensions\python\pythonprocessors\google\SentimentAnalyzer.py"/> + </Component> + </Directory> + <Directory Id="INSTALLH2ODIR" Name="h2o"> + <Component Id="PythonProcessorH2oFiles" Guid="b290aeb3-c67a-4cd1-a0a4-1ba049a2e44e"> + <File Id="H2o_ConvertDsToCsv" Name="ConvertDsToCsv.py" KeyPath="yes" Source="..\extensions\python\pythonprocessors\h2o\ConvertDsToCsv.py"/> + </Component> + <Directory Id="INSTALLH2O3DIR" Name="h2o3"> + <Directory Id="INSTALLMOJODIR" Name="mojo"> + <Component Id="PythonProcessorMojoFiles" Guid="d2b803da-1cd1-4925-802b-19f3e09fc0f7"> + <File Id="H2o_ExecuteH2oMojoScoring" Name="ExecuteH2oMojoScoring.py" KeyPath="yes" Source="..\extensions\python\pythonprocessors\h2o\h2o3\mojo\ExecuteH2oMojoScoring.py"/> + </Component> + </Directory> + </Directory> + </Directory> + <Directory Id="INSTALLNIFIAPIDIR" Name="nifiapi"> + <Component Id="PythonProcessorNifiApiFiles" Guid="a9cb7b7b-e66d-4e32-9115-eab4aa980124"> + <File Id="NifiApi_flowfiletransform" Name="flowfiletransform.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\nifiapi\flowfiletransform.py"/> + <File Id="NifiApi_properties" Name="properties.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\nifiapi\properties.py"/> + <File Id="NifiApi_documentation" Name="documentation.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\nifiapi\documentation.py"/> + <File Id="NifiApi_init" Name="__init__.py" KeyPath="no" Source="..\extensions\python\pythonprocessors\nifiapi\__init__.py"/> + </Component> + </Directory> + <Directory Id="INSTALLNIFIPYTHONPROCESSORSDIR" Name="nifi_python_processors"> + <Component Id="NifiPythonProcessorFiles" Guid="66ce16d4-aea4-4fcb-bbbb-8a28813e2138"> + <File Id="NifiPythonProcessors_init" Name="__init__.py" KeyPath="yes" Source="..\extensions\python\pythonprocessors\nifi_python_processors\__init__.py"/> + </Component> + </Directory> + </Directory> </Directory> </DirectoryRef> </Fragment> diff --git a/run_flake8.sh b/run_flake8.sh index e1a409f09..1e884aa57 100755 --- a/run_flake8.sh +++ b/run_flake8.sh @@ -19,4 +19,4 @@ set -euo pipefail directory=${1:-.} -flake8 --exclude venv,thirdparty,build,cmake-build-*,github_env --builtins log,REL_SUCCESS,REL_FAILURE,raw_input --ignore E501,W503 --per-file-ignores="steps.py:F811" "${directory}" +flake8 --exclude venv,thirdparty,build,cmake-build-*,github_env --builtins log,REL_SUCCESS,REL_FAILURE,REL_ORIGINAL,raw_input --ignore E501,W503 --per-file-ignores="steps.py:F811" "${directory}"