[ https://issues.apache.org/jira/browse/NIFI-12739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mark Payne updated NIFI-12739: ------------------------------ Fix Version/s: 2.0.0 Resolution: Fixed Status: Resolved (was: Patch Available) > Python custom processor cannot import ProcessPoolExecutor > --------------------------------------------------------- > > Key: NIFI-12739 > URL: https://issues.apache.org/jira/browse/NIFI-12739 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions > Affects Versions: 2.0.0-M2 > Reporter: Alex Ethier > Assignee: Alex Ethier > Priority: Major > Fix For: 2.0.0 > > Time Spent: 10m > Remaining Estimate: 0h > > A runtime exception is thrown when trying to import ProcessPoolExecutor in a > Python custom processor. This affects other libraries such as llama-index > when it tries to import ProcessPoolExecutor. > My system's full stack trace (see below for a simpler stack trace): > {code:java} > py4j.Py4JException: An exception was raised by the Python Proxy. Return > Message: Traceback (most recent call last): > File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line > 2466, in _call_proxy > return_value = getattr(self.pool[obj_id], method)(*params) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "/opt/nifi-2.0.0-SNAPSHOT/./python/framework/Controller.py", line 75, > in createProcessor > processorClass = self.extensionManager.getProcessorClass(processorType, > version, work_dir) > > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line > 104, in getProcessorClass > processor_class = self.__load_extension_module(module_file, > details.local_dependencies) > > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line > 360, in __load_extension_module > module_spec.loader.exec_module(module) > File "<frozen importlib._bootstrap_external>", line 940, in exec_module > File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed > File > "/Users/aethier/playground/the_source/datavolo/datavolo-resources/demo/advanced_rag_small_to_big/processors/RedisVectorStoreProcessor.py", > line 4, in <module> > from llama_index import GPTVectorStoreIndex, StorageContext, > ServiceContext, Document > File > "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/__init__.py", > line 24, in <module> > from llama_index.indices import ( > File > "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/__init__.py", > line 4, in <module> > from llama_index.indices.composability.graph import ComposableGraph > File > "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/__init__.py", > line 4, in <module> > from llama_index.indices.composability.graph import ComposableGraph > File > "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/graph.py", > line 7, in <module> > from llama_index.indices.base import BaseIndex > File > "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/base.py", > line 10, in <module> > from llama_index.ingestion import run_transformations > File > "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/__init__.py", > line 2, in <module> > from llama_index.ingestion.pipeline import ( > File > "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/pipeline.py", > line 5, in <module> > from concurrent.futures import ProcessPoolExecutor > File "<frozen importlib._bootstrap>", line 1229, in _handle_fromlist > File > "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/__init__.py", > line 44, in __getattr__ > from .process import ProcessPoolExecutor as pe > File > "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", > line 106, in <module> > threading._register_atexit(_python_exit) > File > "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", > line 1527, in _register_atexit > raise RuntimeError("can't register atexit after shutdown") > RuntimeError: can't register atexit after shutdown > at py4j.Protocol.getReturnValue(Protocol.java:476) > at > org.apache.nifi.py4j.client.PythonProxyInvocationHandler.invoke(PythonProxyInvocationHandler.java:64) > at jdk.proxy8/jdk.proxy8.$Proxy95.createProcessor(Unknown Source) > at > org.apache.nifi.py4j.StandardPythonBridge$1.createProcessor(StandardPythonBridge.java:116) > at > org.apache.nifi.py4j.StandardPythonProcessorBridge.initializePythonSide(StandardPythonProcessorBridge.java:106) > at > org.apache.nifi.py4j.StandardPythonProcessorBridge.lambda$initialize$0(StandardPythonProcessorBridge.java:67) > at java.base/java.lang.VirtualThread.run(VirtualThread.java:309){code} > Note the problem exists for both python 3.9 and python 3.11 and on both NiFi > 2.0.0 release and on the main branch. > > > The following is a stacktrace snippet: > > {code:java} > Traceback (most recent call last): > File "/configuration_resources/python_extensions/ImportTestProcessor.py", > line 26, in transform > from concurrent.futures import ProcessPoolExecutor > File "<frozen importlib._bootstrap>", line 1055, in _handle_fromlist > File "/usr/lib/python3.9/concurrent/futures/__init__.py", line 44, in > __getattr__ > from .process import ProcessPoolExecutor as pe > File "/usr/lib/python3.9/concurrent/futures/process.py", line 101, in > <module> > threading._register_atexit(_python_exit) > File "/usr/lib/python3.9/threading.py", line 1374, in _register_atexit > raise RuntimeError("can't register atexit after shutdown") > RuntimeError: can't register atexit after shutdown{code} > When the import fails sometimes it can be hard to find the python stacktrace > in the logs and sometimes the processor will not repeat the initialization > (so the stacktrace is reported only once). > > The following custom python processor can be used to generate the stacktrace > snippet in an easily repeatable way: > {code:java} > from nifiapi.flowfiletransform import FlowFileTransform, > FlowFileTransformResult > from nifiapi.properties import PropertyDescriptor, StandardValidators, > PropertyDependency, ExpressionLanguageScope > ### > # Test python imports > ## > class ImportTestProcessor(FlowFileTransform): > class Java: > implements = ["org.apache.nifi.python.processor.FlowFileTransform"] > class ProcessorDetails: > version = "2.0.0-M1" > description = """Test Imports""" > tags = ["test"] > def __init__(self, **kwargs): > pass > def transform(self, context, flowfile): > import traceback > stack_trace_str = "" > try: > from concurrent.futures import ProcessPoolExecutor > except Exception as e: > stack_trace_str = f"Exception:\n{traceback.format_exc()}" > return FlowFileTransformResult( > relationship="success", contents=stack_trace_str > ) > return FlowFileTransformResult( > relationship="success" > ) {code} > When running this processor, the flowfile output will show the stack trace in > the flow file's content. > -- This message was sent by Atlassian Jira (v8.20.10#820010)