This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 2ad9db18db NIFI-12959: Support loading Python processors from NARs 2ad9db18db is described below commit 2ad9db18db3d0ef0a1c0e3a9241e418299de78c3 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Tue Mar 26 10:55:02 2024 -0400 NIFI-12959: Support loading Python processors from NARs This closes #8573 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../java/org/apache/nifi/util/NiFiProperties.java | 7 +- .../src/main/asciidoc/python-developer-guide.adoc | 88 ++++++++++------- .../components/ClassLoaderAwarePythonBridge.java | 4 +- .../org/apache/nifi/controller/FlowController.java | 15 ++- .../nifi/nar/ExtensionDiscoveringManager.java | 9 +- .../nar/StandardExtensionDiscoveringManager.java | 22 +++-- .../nifi/py4j/ProcessorCreationWorkflow.java | 13 +++ .../java/org/apache/nifi/py4j/PythonProcess.java | 53 ++++++++-- .../org/apache/nifi/py4j/StandardPythonBridge.java | 51 ++++++++-- .../nifi/py4j/StandardPythonProcessorBridge.java | 7 ++ .../org/apache/nifi/py4j/PythonProcessTest.java | 27 +++-- .../PythonControllerInteractionIT.java | 8 +- .../apache/nifi/python/DisabledPythonBridge.java | 2 +- .../java/org/apache/nifi/python/PythonBridge.java | 4 +- .../apache/nifi/python/PythonProcessConfig.java | 16 ++- .../apache/nifi/python/PythonProcessorDetails.java | 12 +++ .../src/main/python/framework/Controller.py | 2 - .../src/main/python/framework/ExtensionDetails.py | 10 ++ .../src/main/python/framework/ExtensionManager.py | 32 +++--- .../main/python/framework/ProcessorInspection.py | 15 ++- .../python/framework/TestPythonProcessorAdapter.py | 5 +- .../nifi-python-test-extensions-nar/pom.xml | 84 ++++++++++++++++ .../src/main/resources/WriteBech32Charset.py | 32 ++++++ nifi-system-tests/nifi-system-test-suite/pom.xml | 7 ++ .../src/test/assembly/dependencies.xml | 12 +++ .../nifi/tests/system/python/PythonNarIT.java | 110 +++++++++++++++++++++ nifi-system-tests/pom.xml | 1 + 27 files changed, 541 insertions(+), 107 deletions(-) diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 2e3829fe36..8932760b14 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.util; +import org.apache.nifi.properties.ApplicationProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; @@ -36,9 +40,6 @@ import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.nifi.properties.ApplicationProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The NiFiProperties class holds all properties which are needed for various diff --git a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc index 8e52375e99..b798fb6d0f 100644 --- a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc +++ b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc @@ -516,37 +516,6 @@ environment for each Processor implementation (not for each instance of a Proces dependencies in that environment. -[[deploying]] -== Deploying a Developed Processor - -Once a Processor has been developed, it can be made available in NiFi by copying the source of the Python extension to the `$NIFI_HOME/python/extensions` directory by default. -The actual directory to look for extensions can be configured in `nifi.properties` via properties that have the prefix `nifi.python.extensions.source.directory.`. -For example, by default, `nifi.python.extensions.source.directory.default` is set to `./python/extensions`. However, additional paths may be added by replacing `default` -in the property name with some other value. - -Any `.py` file found in the directory will be parsed and examined in order to determine whether or not it is a valid NiFi Processor. -In order to be found, the Processor must have a valid parent (`FlowFileTransform` or `RecordTransform`) and must have an inner class named `Java` -with a `implements = ['org.apache.nifi.python.processor.FlowFileTransform']` or `implements = ['org.apache.nifi.python.processor.RecordFileTransform']`. -This will allow NiFi to automatically discover the Processor. - -Note, however, that if the Processor implementation is broken into multiple Python modules, those modules will not be made available by default. In order -to package a Processor along with its modules, the Processor and any related module must be added to a directory that is directly below the Extensions directory. -For example, if the `WriteNumber.py` file contains a NiFi Processor and also depends on the `ProcessorUtil.py` module, the directory structure would look like this: ----- -NIFI_HOME/ - - python/ - - extensions/ - ProcessorA.py - ProcessorB.py - write-number/ - __init__.py - ProcessorUtils.py - WriteNumber.py ----- -By packaging them together in a subdirectory, NiFi knows to expose the modules to one another. However, the ProcessorA module will have no access -to the `ProcessorUtils` module. Only `WriteNumber` will have access to it. - - [[reloading]] == Processor Reloading @@ -623,15 +592,66 @@ Here, we accept any version of `pandas` (though the latest is preferred), and we [[dependency-isolation]] === Dependency Isolation -On startup, NiFi will create a separate Python env (venv) for each Processor implementation and will use `pip` to install -the specified dependencies from PyPI only into the appropriate Python environment for that Processor. Therefore, dependencies of one -Processor are not made available to another Processor. +The first time that a user creates a NiFi Processor of a given type, NiFi will create a separate Python env (venv) for the Processor. +It will use `pip` to install the specified dependencies from PyPI only into the appropriate Python environment for that Processor. +Therefore, dependencies of one Processor are not made available to another Processor. Beyond that, dependencies of one version of a Processor are not made available to other versions of the Processor. So, for example, if we have two different versions of the same Processor made available, version `0.0.1` and version `0.0.2`, the dependencies that are necessary for version `0.0.1` will not be made available to version `0.0.2` unless version `0.0.2` of the Processor also declares those dependencies. +Some environments, however, cannot make use of `pip` for package management. In an air-gapped environment, for example, or in +environments with strict security policies in place, `pip` may not be available. In such a case, Python processors can be packaged +using the NiFi ARchive (NAR) format. This is a .zip file with the following specific layout, and uses a filename extension of `.nar`: + +``` +my-nar-bundle.nar ++-- META-INF/ + +-- MANIFEST.MF ++-- NAR-INF/ + +-- bundled-dependencies/ + +-- dependency1 + +-- dependency2 + +-- ... + +-- dependencyN +MyProcessor.py +``` + + +[[deploying]] +== Deploying a Developed Processor + +Once a Processor has been developed, it can be made available in NiFi using one of two methods. +For Processors that have been packaged as a NAR file, the NAR file should be copied to NiFi's `lib/` directory or configured extensions directory. +For Processors that are not pre-packaged as a NAR, the Processor is deployed by copying the source of the Python extension to the `$NIFI_HOME/python/extensions` directory by default. +The actual directory to look for extensions can be configured in `nifi.properties` via properties that have the prefix `nifi.python.extensions.source.directory.`. +For example, by default, `nifi.python.extensions.source.directory.default` is set to `./python/extensions`. However, additional paths may be added by replacing `default` +in the property name with some other value. + +Any `.py` file found in the directory will be parsed and examined in order to determine whether or not it is a valid NiFi Processor. +In order to be found, the Processor must have a valid parent (`FlowFileTransform` or `RecordTransform`) and must have an inner class named `Java` +with a `implements = ['org.apache.nifi.python.processor.FlowFileTransform']` or `implements = ['org.apache.nifi.python.processor.RecordFileTransform']`. +This will allow NiFi to automatically discover the Processor. + +Note, however, that if the Processor implementation is broken into multiple Python modules, those modules will not be made available by default. In order +to package a Processor along with its modules, the Processor and any related module must be added to a directory that is directly below the Extensions directory. +For example, if the `WriteNumber.py` file contains a NiFi Processor and also depends on the `ProcessorUtil.py` module, the directory structure would look like this: +---- +NIFI_HOME/ + - python/ + - extensions/ + ProcessorA.py + ProcessorB.py + write-number/ + __init__.py + ProcessorUtils.py + WriteNumber.py +---- +By packaging them together in a subdirectory, NiFi knows to expose the modules to one another. However, the ProcessorA module will have no access +to the `ProcessorUtils` module. Only `WriteNumber` will have access to it. + + [[troubleshooting]] == Troubleshooting diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java index 5679a8f45b..3546ca7ece 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java @@ -89,9 +89,9 @@ public class ClassLoaderAwarePythonBridge implements PythonBridge { } @Override - public void discoverExtensions() { + public void discoverExtensions(final boolean includeNarDirectories) { try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(classLoader)) { - delegate.discoverExtensions(); + delegate.discoverExtensions(includeNarDirectories); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index d3eb8644cc..8c28496a4d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -102,8 +102,8 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.io.LimitedInputStream; -import org.apache.nifi.controller.scheduling.LifecycleStateManager; import org.apache.nifi.controller.scheduling.CronSchedulingAgent; +import org.apache.nifi.controller.scheduling.LifecycleStateManager; import org.apache.nifi.controller.scheduling.RepositoryContextFactory; import org.apache.nifi.controller.scheduling.StandardLifecycleStateManager; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; @@ -216,8 +216,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.NotificationEmitter; -import javax.net.ssl.SSLContext; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; @@ -250,6 +248,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import java.util.stream.Collectors; +import javax.management.NotificationEmitter; +import javax.net.ssl.SSLContext; import static java.util.Objects.requireNonNull; @@ -900,10 +900,19 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr maxProcessesPerType = maxProcesses; } + final List<File> narDirectories = new ArrayList<>(); + for (final org.apache.nifi.bundle.Bundle bundle : extensionManager.getAllBundles()) { + final File workingDir = bundle.getBundleDetails().getWorkingDirectory(); + if (workingDir.exists()) { + narDirectories.add(workingDir); + } + } + final PythonProcessConfig pythonProcessConfig = new PythonProcessConfig.Builder() .pythonCommand(pythonCommand) .pythonFrameworkDirectory(pythonFrameworkSourceDirectory) .pythonExtensionsDirectories(pythonExtensionsDirectories) + .narDirectories(narDirectories) .pythonWorkingDirectory(pythonWorkingDirectory) .commsTimeout(commsTimeout == null ? null : Duration.ofMillis(FormatUtils.getTimeDuration(commsTimeout, TimeUnit.MILLISECONDS))) .maxPythonProcesses(maxProcesses) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionDiscoveringManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionDiscoveringManager.java index 1de8da3a00..0c67e92e9f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionDiscoveringManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionDiscoveringManager.java @@ -64,10 +64,15 @@ public interface ExtensionDiscoveringManager extends ExtensionManager { void setPythonBridge(PythonBridge pythonBridge); /** - * Discovers any Python based extensions using the given Python Bridge - * @param pythonBundle the system bundle + * Discovers any Python based extensions that exist in either the Python extensions directories or NAR bundles that have been expanded. + * @param pythonBundle the python bundle */ void discoverPythonExtensions(Bundle pythonBundle); + /** + * Discovers any new Python based extensions that have been added. This method will scan only the Python extension directories + * that have been configured and will not include scanning NAR bundles. + * @param pythonBundle the python bundle + */ void discoverNewPythonExtensions(Bundle pythonBundle); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java index 3931f46898..fe9b2cc002 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java @@ -75,7 +75,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; /** * Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs). @@ -136,9 +135,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering @Override public Set<Bundle> getAllBundles() { - return classNameBundleLookup.values().stream() - .flatMap(List::stream) - .collect(Collectors.toSet()); + return new HashSet<>(bundleCoordinateBundleLookup.values()); } @Override @@ -186,10 +183,20 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering @Override public void discoverPythonExtensions(final Bundle pythonBundle) { + discoverPythonExtensions(pythonBundle, true); + } + + @Override + public void discoverNewPythonExtensions(final Bundle pythonBundle) { + logger.debug("Scanning to discover new Python extensions..."); + discoverPythonExtensions(pythonBundle, false); + } + + private void discoverPythonExtensions(final Bundle pythonBundle, final boolean includeNarBundles) { logger.debug("Scanning to discover which Python extensions are available and importing any necessary dependencies. If new components are discovered, this may take a few minutes. " + "See python logs for more details."); final long start = System.currentTimeMillis(); - pythonBridge.discoverExtensions(); + pythonBridge.discoverExtensions(includeNarBundles); bundleCoordinateBundleLookup.putIfAbsent(pythonBundle.getBundleDetails().getCoordinate(), pythonBundle); @@ -269,11 +276,6 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering .build(); } - @Override - public void discoverNewPythonExtensions(final Bundle pythonBundle) { - logger.debug("Scanning to discover new Python extensions..."); - discoverPythonExtensions(pythonBundle); - } /** * Loads extensions from the specified bundle. diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/ProcessorCreationWorkflow.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/ProcessorCreationWorkflow.java index 7b99f146f5..cf3580ca74 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/ProcessorCreationWorkflow.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/ProcessorCreationWorkflow.java @@ -21,8 +21,21 @@ import org.apache.nifi.python.processor.PythonProcessorAdapter; public interface ProcessorCreationWorkflow { + /** + * @return <code>true</code> if the Processor has been packaged along with its dependencies, <code>false</code> otherwise + */ + boolean isPackagedWithDependencies(); + + /** + * Downloads any dependencies required by the Processor using <code>pip</code>. + * If the Processor is already packaged with its dependencies, this method does nothing. + */ void downloadDependencies(); + /** + * Creates the Processor on the Python side and returns an adapter for interacting with the Processor from the Java side. + * @return an adapter for interacting with the Python Processor + */ PythonProcessorAdapter createProcessor(); } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java index 2e4779bfab..0dcb82e1bc 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java @@ -17,13 +17,13 @@ package org.apache.nifi.py4j; -import org.apache.nifi.py4j.logging.LogLevelChangeListener; -import org.apache.nifi.py4j.logging.PythonLogLevel; -import org.apache.nifi.py4j.logging.StandardLogLevelChangeHandler; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.py4j.client.JavaObjectBindings; import org.apache.nifi.py4j.client.NiFiPythonGateway; import org.apache.nifi.py4j.client.StandardPythonClient; +import org.apache.nifi.py4j.logging.LogLevelChangeListener; +import org.apache.nifi.py4j.logging.PythonLogLevel; +import org.apache.nifi.py4j.logging.StandardLogLevelChangeHandler; import org.apache.nifi.py4j.server.NiFiGatewayServer; import org.apache.nifi.python.ControllerServiceTypeLookup; import org.apache.nifi.python.PythonController; @@ -36,8 +36,6 @@ import org.slf4j.LoggerFactory; import py4j.CallbackClient; import py4j.GatewayServer; -import javax.net.ServerSocketFactory; -import javax.net.SocketFactory; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -52,6 +50,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; public class PythonProcess { private static final Logger logger = LoggerFactory.getLogger(PythonProcess.class); @@ -62,6 +62,7 @@ public class PythonProcess { private final PythonProcessConfig processConfig; private final ControllerServiceTypeLookup controllerServiceTypeLookup; private final File virtualEnvHome; + private final boolean packagedWithDependencies; private final String componentType; private final String componentId; private GatewayServer server; @@ -78,10 +79,11 @@ public class PythonProcess { public PythonProcess(final PythonProcessConfig processConfig, final ControllerServiceTypeLookup controllerServiceTypeLookup, final File virtualEnvHome, - final String componentType, final String componentId) { + final boolean packagedWithDependencies, final String componentType, final String componentId) { this.processConfig = processConfig; this.controllerServiceTypeLookup = controllerServiceTypeLookup; this.virtualEnvHome = virtualEnvHome; + this.packagedWithDependencies = packagedWithDependencies; this.componentType = componentType; this.componentId = componentId; } @@ -224,6 +226,10 @@ public class PythonProcess { return Base64.getEncoder().encodeToString(bytes); } + private boolean isPackagedWithDependencies() { + return packagedWithDependencies; + } + private Process launchPythonProcess(final int listeningPort, final String authToken) throws IOException { final File pythonFrameworkDirectory = processConfig.getPythonFrameworkDirectory(); final File pythonApiDirectory = new File(pythonFrameworkDirectory.getParentFile(), "api"); @@ -234,8 +240,21 @@ public class PythonProcess { final List<String> commands = new ArrayList<>(); commands.add(pythonCommand); + if (isPackagedWithDependencies()) { + // If not using venv, we will not launch a separate virtual environment, so we need to use the -S + // flag in order to prevent the Python process from using the installation's site-packages. This provides + // proper dependency isolation to the Python process. + commands.add("-S"); + } String pythonPath = pythonApiDirectory.getAbsolutePath(); + final String absolutePath = virtualEnvHome.getAbsolutePath(); + pythonPath = pythonPath + File.pathSeparator + absolutePath; + + if (isPackagedWithDependencies()) { + final File dependenciesDir = new File(new File(absolutePath), "NAR-INF/bundled-dependencies"); + pythonPath = pythonPath + File.pathSeparator + dependenciesDir.getAbsolutePath(); + } if (processConfig.isDebugController() && "Controller".equals(componentId)) { commands.add("-m"); @@ -264,7 +283,13 @@ public class PythonProcess { return processBuilder.start(); } + // Visible for testing String resolvePythonCommand() throws IOException { + // If pip is disabled, we will not create separate virtual environments for each Processor and thus we will use the configured Python command + if (isPackagedWithDependencies()) { + return processConfig.getPythonCommand(); + } + final File pythonCmdFile = new File(processConfig.getPythonCommand()); final String pythonCmd = pythonCmdFile.getName(); @@ -287,6 +312,13 @@ public class PythonProcess { private void setupEnvironment() throws IOException { + // Environment creation is only necessary if using PIP. Otherwise, the Process requires no outside dependencies, other than those + // provided in the package and thus we can simply include those packages in the PYTHON_PATH. + if (isPackagedWithDependencies()) { + logger.debug("Will not create Python Virtual Environment because Python Processor packaged with dependencies"); + return; + } + final File environmentCreationCompleteFile = new File(virtualEnvHome, "env-creation-complete.txt"); if (environmentCreationCompleteFile.exists()) { logger.debug("Environment has already been created for {}; will not recreate", virtualEnvHome); @@ -408,8 +440,17 @@ public class PythonProcess { public PythonProcessorBridge createProcessor(final String identifier, final String type, final String version, final String workDirPath) { final ProcessorCreationWorkflow creationWorkflow = new ProcessorCreationWorkflow() { + @Override + public boolean isPackagedWithDependencies() { + return packagedWithDependencies; + } + @Override public void downloadDependencies() { + if (packagedWithDependencies) { + return; + } + controller.downloadDependencies(type, version, workDirPath); } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java index cc2a5d8489..dc9c0451b7 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java @@ -78,7 +78,7 @@ public class StandardPythonBridge implements PythonBridge { LevelChangeListener.registerLogbackListener(logLevelChangeHandler); final File envHome = new File(processConfig.getPythonWorkingDirectory(), "controller"); - controllerProcess = new PythonProcess(processConfig, serviceTypeLookup, envHome, "Controller", "Controller"); + controllerProcess = new PythonProcess(processConfig, serviceTypeLookup, envHome, true, "Controller", "Controller"); controllerProcess.start(); running = true; } catch (final Exception e) { @@ -88,11 +88,18 @@ public class StandardPythonBridge implements PythonBridge { } @Override - public void discoverExtensions() { + public void discoverExtensions(final boolean includeNarDirectories) { ensureStarted(); final List<String> extensionsDirs = processConfig.getPythonExtensionsDirectories().stream() .map(File::getAbsolutePath) - .collect(Collectors.toList()); + .collect(Collectors.toCollection(ArrayList::new)); + + if (includeNarDirectories) { + processConfig.getNarDirectories().stream() + .map(File::getAbsolutePath) + .forEach(extensionsDirs::add); + } + final String workDirPath = processConfig.getPythonWorkingDirectory().getAbsolutePath(); controllerProcess.discoverExtensions(extensionsDirs, workDirPath); } @@ -104,7 +111,16 @@ public class StandardPythonBridge implements PythonBridge { final ExtensionId extensionId = extensionIdFound.orElseThrow(() -> new IllegalArgumentException("Processor Type [%s] Version [%s] not found".formatted(type, version))); logger.debug("Creating Python Processor Type [{}] Version [{}]", extensionId.type(), extensionId.version()); - final PythonProcess pythonProcess = getProcessForNextComponent(extensionId, identifier, preferIsolatedProcess); + final PythonProcessorDetails processorDetails = getProcessorTypes().stream() + .filter(details -> details.getProcessorType().equals(type)) + .filter(details -> details.getProcessorVersion().equals(version)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Could not find Processor Details for Python Processor type [%s] or version [%s]".formatted(type, version))); + + final String processorHome = processorDetails.getExtensionHome(); + final boolean bundledWithDependencies = processorDetails.isBundledWithDependencies(); + + final PythonProcess pythonProcess = getProcessForNextComponent(extensionId, identifier, processorHome, preferIsolatedProcess, bundledWithDependencies); final String workDirPath = processConfig.getPythonWorkingDirectory().getAbsolutePath(); final PythonProcessorBridge processorBridge = pythonProcess.createProcessor(identifier, type, version, workDirPath); @@ -181,7 +197,9 @@ public class StandardPythonBridge implements PythonBridge { return count; } - private synchronized PythonProcess getProcessForNextComponent(final ExtensionId extensionId, final String componentId, final boolean preferIsolatedProcess) { + private synchronized PythonProcess getProcessForNextComponent(final ExtensionId extensionId, final String componentId, final String processorHome, final boolean preferIsolatedProcess, + final boolean packagedWithDependencies) { + final int processorsOfThisType = processorCountByType.getOrDefault(extensionId, 0); final int processIndex = processorsOfThisType % processConfig.getMaxPythonProcessesPerType(); @@ -210,15 +228,28 @@ public class StandardPythonBridge implements PythonBridge { logger.info("In order to create Python Processor of type {}, launching a new Python Process because there are currently {} Python Processors of this type and {} Python Processes", extensionId.type(), processorsOfThisType, processesByProcessorType.size()); - final File extensionsWorkDir = new File(processConfig.getPythonWorkingDirectory(), "extensions"); - final File componentTypeHome = new File(extensionsWorkDir, extensionId.type()); - final File envHome = new File(componentTypeHome, extensionId.version()); - final PythonProcess pythonProcess = new PythonProcess(processConfig, serviceTypeLookup, envHome, extensionId.type(), componentId); + // If the processor is packaged with its dependencies as a NAR, we can use the Processor Home as the Environment Home. + // Otherwise, we need to create a Virtual Environment for the Processor. + final File envHome; + if (packagedWithDependencies) { + envHome = new File(processorHome); + } else { + final File extensionsWorkDir = new File(processConfig.getPythonWorkingDirectory(), "extensions"); + final File componentTypeHome = new File(extensionsWorkDir, extensionId.type()); + envHome = new File(componentTypeHome, extensionId.version()); + } + + final PythonProcess pythonProcess = new PythonProcess(processConfig, serviceTypeLookup, envHome, packagedWithDependencies, extensionId.type(), componentId); pythonProcess.start(); + // Create list of extensions directories, including NAR directories final List<String> extensionsDirs = processConfig.getPythonExtensionsDirectories().stream() .map(File::getAbsolutePath) - .collect(Collectors.toList()); + .collect(Collectors.toCollection(ArrayList::new)); + processConfig.getNarDirectories().stream() + .map(File::getAbsolutePath) + .forEach(extensionsDirs::add); + final String workDirPath = processConfig.getPythonWorkingDirectory().getAbsolutePath(); pythonProcess.discoverExtensions(extensionsDirs, workDirPath); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java index 22aa56585e..713d12addb 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java @@ -106,11 +106,18 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { long sleepMillis = 1_000L; while (!future.isCancelled()) { + final boolean packagedWithDependencies = creationWorkflow.isPackagedWithDependencies(); + if (packagedWithDependencies) { + loadState = LoadState.LOADING_PROCESSOR_CODE; + break; + } + loadState = LoadState.DOWNLOADING_DEPENDENCIES; try { creationWorkflow.downloadDependencies(); logger.info("Successfully downloaded dependencies for Python Processor {} ({})", identifier, getProcessorType()); + break; } catch (final Exception e) { loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED; diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/PythonProcessTest.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/PythonProcessTest.java index 4a1ad32638..8797d66eba 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/PythonProcessTest.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/PythonProcessTest.java @@ -16,14 +16,6 @@ */ package org.apache.nifi.py4j; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.io.IOException; - import org.apache.nifi.python.ControllerServiceTypeLookup; import org.apache.nifi.python.PythonProcessConfig; import org.junit.jupiter.api.BeforeEach; @@ -34,6 +26,14 @@ import org.junit.jupiter.api.io.TempDir; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.io.File; +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + @ExtendWith(MockitoExtension.class) class PythonProcessTest { @@ -56,7 +56,14 @@ class PythonProcessTest { @BeforeEach public void setUp() { - this.pythonProcess = new PythonProcess(this.pythonProcessConfig, this.controllerServiceTypeLookup, virtualEnvHome, "Controller", "Controller"); + this.pythonProcess = new PythonProcess(this.pythonProcessConfig, this.controllerServiceTypeLookup, virtualEnvHome, false, "Controller", "Controller"); + } + + @Test + void testUsesConfiguredValueWhenPackagedWithDependencies() throws IOException { + when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD); + final PythonProcess process = new PythonProcess(this.pythonProcessConfig, this.controllerServiceTypeLookup, virtualEnvHome, true, "Controller", "Controller"); + assertEquals(PYTHON_CMD, process.resolvePythonCommand()); } @Test @@ -100,7 +107,7 @@ class PythonProcessTest { @Test void testResolvePythonCommandNone() { when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD); - assertThrows(IOException.class, ()-> this.pythonProcess.resolvePythonCommand()); + assertThrows(IOException.class, () -> this.pythonProcess.resolvePythonCommand()); } private String getExpectedBinaryPath(String binarySubDirectoryName) { diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java index 7fb0399166..8c184f22a5 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java @@ -142,7 +142,7 @@ public class PythonControllerInteractionIT { public void testGetProcessorDetails() { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.py4j", "DEBUG"); - bridge.discoverExtensions(); + bridge.discoverExtensions(true); final List<PythonProcessorDetails> extensionDetails = bridge.getProcessorTypes(); final List<String> types = extensionDetails.stream() @@ -286,7 +286,7 @@ public class PythonControllerInteractionIT { @Test public void testImportRequirements() { // Discover extensions so that they can be created - bridge.discoverExtensions(); + bridge.discoverExtensions(true); final PythonProcessorDetails writeNumpyVersionDetails = bridge.getProcessorTypes().stream() .filter(details -> details.getProcessorType().equals("WriteNumpyVersion")) @@ -390,7 +390,7 @@ public class PythonControllerInteractionIT { replaceFileText(sourceFile, "Hola, Mundo", "Hello, World"); // Discover extensions so that they can be created - bridge.discoverExtensions(); + bridge.discoverExtensions(true); // Ensure that we find 2 different versions of the WriteMessage Processor. final List<PythonProcessorDetails> processorTypes = bridge.getProcessorTypes(); @@ -598,7 +598,7 @@ public class PythonControllerInteractionIT { } private TestRunner createProcessor(final String type, final String version) { - bridge.discoverExtensions(); + bridge.discoverExtensions(true); final AsyncLoadedProcessor processor = bridge.createProcessor(createId(), type, version, true, true); final TestRunner runner = TestRunners.newTestRunner(processor); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java index 2071a9c916..9380143d5c 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java @@ -60,7 +60,7 @@ public class DisabledPythonBridge implements PythonBridge { } @Override - public void discoverExtensions() { + public void discoverExtensions(final boolean includeNarDirectories) { } @Override diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java index a75ebca5dd..fd54691a34 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java @@ -84,8 +84,10 @@ public interface PythonBridge { /** * Triggers the Python Bridge to scan in order to determine which extensions are available. The results may then be obtained by calling * {@link #getProcessorTypes()}. + * + * @param includeNarDirectories whether or not to include NAR directories in the search for extensions */ - void discoverExtensions(); + void discoverExtensions(boolean includeNarDirectories); /** * Creates a Processor with the given identifier, type, and version. diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java index 4e32b5e89d..dfdba4c317 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; public class PythonProcessConfig { @@ -29,6 +30,7 @@ public class PythonProcessConfig { private final String pythonCommand; private final File pythonFrameworkDirectory; private final List<File> pythonExtensionsDirectories; + private final List<File> narDirectories; private final File pythonWorkingDirectory; private final Duration commsTimeout; private final int maxPythonProcesses; @@ -41,6 +43,7 @@ public class PythonProcessConfig { this.pythonCommand = builder.pythonCommand; this.pythonFrameworkDirectory = builder.pythonFrameworkDirectory; this.pythonExtensionsDirectories = builder.pythonExtensionsDirectories; + this.narDirectories = builder.narDirectories; this.pythonWorkingDirectory = builder.pythonWorkingDirectory; this.commsTimeout = builder.commsTimeout; this.maxPythonProcesses = builder.maxProcesses; @@ -62,6 +65,10 @@ public class PythonProcessConfig { return pythonExtensionsDirectories; } + public List<File> getNarDirectories() { + return narDirectories; + } + public File getPythonWorkingDirectory() { return pythonWorkingDirectory; } @@ -93,7 +100,8 @@ public class PythonProcessConfig { public static class Builder { private String pythonCommand = "python3"; private File pythonFrameworkDirectory = new File("python/framework"); - private List<File> pythonExtensionsDirectories = Collections.singletonList(new File("python/extensions")); + private List<File> pythonExtensionsDirectories = List.of(new File("python/extensions")); + private List<File> narDirectories = Collections.emptyList(); private File pythonWorkingDirectory = new File("python"); private Duration commsTimeout = Duration.ofSeconds(0); private int maxProcesses; @@ -102,6 +110,7 @@ public class PythonProcessConfig { private String debugHost = "localhost"; private int debugPort = 5678; + public Builder pythonCommand(final String command) { this.pythonCommand = command; return this; @@ -117,6 +126,11 @@ public class PythonProcessConfig { return this; } + public Builder narDirectories(final Collection<File> narDirectories) { + this.narDirectories = new ArrayList<>(new HashSet<>(narDirectories)); + return this; + } + public Builder pythonWorkingDirectory(final File pythonWorkingDirectory) { this.pythonWorkingDirectory = pythonWorkingDirectory; return this; diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java index 9831f9ddd3..04200c7b61 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java @@ -39,6 +39,18 @@ public interface PythonProcessorDetails extends PythonObjectProxy { */ String getSourceLocation(); + /** + * @return the directory where the Processor's extension is installed. If the extension is a module, this will be the directory + * containing the module. If the extension is a single file outside of a module, this will be the directory containing + * that file. + */ + String getExtensionHome(); + + /** + * @return <code>true</code> if the Processor is bundled with its dependencies; <code>false</code> otherwise + */ + boolean isBundledWithDependencies(); + /** * @return the Processor's capability description */ diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/Controller.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/Controller.py index 05e0d4f6b0..b40a444d28 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/Controller.py +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/Controller.py @@ -17,7 +17,6 @@ import logging import os import sys from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor - from py4j.java_gateway import JavaGateway, CallbackServerParameters, GatewayParameters import ExtensionManager @@ -40,7 +39,6 @@ logger = logging.getLogger("org.apache.nifi.py4j.Controller") class Controller: - def ping(self): return "pong" diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionDetails.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionDetails.py index fbab6dca5d..650612dbc6 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionDetails.py +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionDetails.py @@ -27,6 +27,8 @@ class ExtensionDetails: tags=None, use_cases=None, multi_processor_use_cases=None, + extension_home=None, + dependencies_bundled=False, property_descriptions=None): self.type = type @@ -35,10 +37,12 @@ class ExtensionDetails: self.tags = tags if tags else [] self.version = version self.source_location = source_location + self.extension_home = extension_home self.description = description self.use_cases = use_cases if use_cases else {} self.multi_processor_use_cases = multi_processor_use_cases if multi_processor_use_cases else {} self.property_descriptions = property_descriptions if property_descriptions else {} + self.dependencies_bundled = dependencies_bundled def getProcessorType(self): return self.type @@ -49,6 +53,9 @@ class ExtensionDetails: def getSourceLocation(self): return self.source_location + def getExtensionHome(self): + return self.extension_home + def getDependencies(self): return ArrayList(self.dependencies) @@ -67,6 +74,9 @@ class ExtensionDetails: def getPropertyDescriptions(self): return ArrayList(self.property_descriptions) + def isBundledWithDependencies(self): + return self.dependencies_bundled + def getInterface(self): if len(self.interfaces) == 0: return None diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py index 2e9063dda3..3bc6d25cd9 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py @@ -25,7 +25,6 @@ from pathlib import Path import ProcessorInspection - logger = logging.getLogger("python.ExtensionManager") @@ -129,7 +128,9 @@ class ExtensionManager: os.remove(completion_marker_file) # Call load_extension to ensure that we load all necessary dependencies, in case they have changed - self.__gather_extension_details(module_file, work_dir) + dependencies_bundled = details.isBundledWithDependencies() + extension_home = details.getExtensionHome() + self.__gather_extension_details(module_file, extension_home, dependencies_bundled, work_dir) # Reload the processor class itself processor_class = self.__load_extension_module(module_file, details.local_dependencies) @@ -179,20 +180,29 @@ class ExtensionManager: paths = [] for path in paths: + # If the path has a child directory named NAR-INF, we note that it has dependencies bundled with it + nar_inf_dir = os.path.join(path, 'NAR-INF') + dependencies_bundled = os.path.exists(nar_inf_dir) + for finder, name, ispkg in pkgutil.iter_modules([path]): if not require_nifi_prefix or name.startswith('nifi_'): module_file = '<Unknown Module File>' try: module = finder.find_module(name) module_file = module.path - logger.info('Discovered extension %s' % module_file) - self.__gather_extension_details(module_file, work_dir) + # Ignore any packaged dependencies + if 'NAR-INF/bundled-dependencies' in module_file: + continue + + logger.debug('Discovered extension %s' % module_file) + + self.__gather_extension_details(module_file, path, dependencies_bundled, work_dir) except Exception: logger.error("Failed to load Python extensions from module file {0}. This module will be ignored.".format(module_file), exc_info=True) - def __gather_extension_details(self, module_file, work_dir, local_dependencies=None): + def __gather_extension_details(self, module_file, extension_home, dependencies_bundled, work_dir, local_dependencies=None): path = Path(module_file) basename = os.path.basename(module_file) @@ -222,12 +232,12 @@ class ExtensionManager: continue child_module_file = os.path.join(dir, filename) - self.__gather_extension_details(child_module_file, work_dir, local_dependencies=local_dependencies) + self.__gather_extension_details(child_module_file, extension_home, dependencies_bundled, work_dir, local_dependencies=local_dependencies) - classes_and_details = self.__get_processor_classes_and_details(module_file) + classes_and_details = self.__get_processor_classes_and_details(module_file, extension_home, dependencies_bundled) for classname, details in classes_and_details.items(): id = ExtensionId(classname, details.version) - logger.info(f"For {classname} found local dependencies {local_dependencies}") + logger.debug(f"For {classname} found local dependencies {local_dependencies}") details.local_dependencies = local_dependencies @@ -247,13 +257,13 @@ class ExtensionManager: id = ExtensionId(extension_type, version) return self.processor_details[id].dependencies - def __get_processor_classes_and_details(self, module_file): + def __get_processor_classes_and_details(self, module_file, extension_home, dependencies_bundled): class_nodes = ProcessorInspection.get_processor_class_nodes(module_file) details_by_class = {} for class_node in class_nodes: - logger.info(f"Discovered Processor class {class_node.name} in module {module_file}") - details = ProcessorInspection.get_processor_details(class_node, module_file) + logger.debug(f"Discovered Processor class {class_node.name} in module {module_file} with home {extension_home}") + details = ProcessorInspection.get_processor_details(class_node, module_file, extension_home, dependencies_bundled=dependencies_bundled) details_by_class[class_node.name] = details return details_by_class diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py index c01b5a3a52..cd5fedb24a 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py @@ -14,11 +14,12 @@ # limitations under the License. import ast -import ExtensionDetails import logging import textwrap from nifiapi.documentation import UseCaseDetails, MultiProcessorUseCaseDetails, ProcessorConfiguration, PropertyDescription +import ExtensionDetails + PROCESSOR_INTERFACES = ['org.apache.nifi.python.processor.FlowFileTransform', 'org.apache.nifi.python.processor.RecordTransform'] logger = logging.getLogger("python.ProcessorInspection") @@ -37,7 +38,7 @@ def get_processor_class_nodes(module_file: str) -> list: return processor_class_nodes -def get_processor_details(class_node, module_file): +def get_processor_details(class_node, module_file, extension_home, dependencies_bundled): # Look for a 'ProcessorDetails' class child_class_nodes = get_class_nodes(class_node) @@ -60,6 +61,8 @@ def get_processor_details(class_node, module_file): version=version, dependencies=dependencies, source_location=module_file, + extension_home=extension_home, + dependencies_bundled=dependencies_bundled, description=description, tags=tags, use_cases=use_cases, @@ -70,7 +73,9 @@ def get_processor_details(class_node, module_file): type=class_node.name, version='Unknown', dependencies=[], - source_location=module_file) + source_location=module_file, + extension_home=extension_home, + dependencies_bundled=dependencies_bundled) def __get_processor_version(details_node): @@ -80,9 +85,9 @@ def __get_processor_version(details_node): def __get_processor_dependencies(details_node, class_name): deps = get_assigned_value(details_node, 'dependencies', []) if len(deps) == 0: - logger.info("Found no external dependencies that are required for class %s" % class_name) + logger.debug("Found no external dependencies that are required for class %s" % class_name) else: - logger.info("Found the following external dependencies that are required for class {0}: {1}".format(class_name, deps)) + logger.debug("Found the following external dependencies that are required for class {0}: {1}".format(class_name, deps)) return deps diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/test/python/framework/TestPythonProcessorAdapter.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/test/python/framework/TestPythonProcessorAdapter.py index c1075d6078..b8b9b9038b 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/test/python/framework/TestPythonProcessorAdapter.py +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/test/python/framework/TestPythonProcessorAdapter.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest import ProcessorInspection +import unittest DUMMY_PROCESSOR_FILE = 'src/test/python/framework/DummyProcessor.py' @@ -26,10 +26,11 @@ class DetectProcessorUseCase(unittest.TestCase): class_node = class_nodes[0] self.assertEqual(class_node.name, 'DummyProcessor') - details = ProcessorInspection.get_processor_details(class_node, DUMMY_PROCESSOR_FILE) + details = ProcessorInspection.get_processor_details(class_node, DUMMY_PROCESSOR_FILE, '/extensions/dummy_processor', False) self.assertIsNotNone(details) self.assertEqual(details.description, 'Fake Processor') self.assertEqual(details.tags, ['tag1', 'tag2']) + self.assertEqual(details.extension_home, '/extensions/dummy_processor') self.assertEqual(len(details.use_cases), 2) self.assertEqual(details.use_cases[0].description, 'First Use Case') self.assertEqual(details.use_cases[1].description, 'Second Use Case') diff --git a/nifi-system-tests/nifi-python-test-extensions-nar/pom.xml b/nifi-system-tests/nifi-python-test-extensions-nar/pom.xml new file mode 100644 index 0000000000..8736a90dcc --- /dev/null +++ b/nifi-system-tests/nifi-python-test-extensions-nar/pom.xml @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-system-tests</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-python-test-extensions-nar</artifactId> + <packaging>nar</packaging> + + <properties> + <bech32.version>1.2.0</bech32.version> + <bech32.url>https://files.pythonhosted.org/packages/b6/41/7022a226e5a6ac7091a95ba36bad057012ab7330b9894ad4e14e31d0b858/bech32-1.2.0-py3-none-any.whl</bech32.url> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <version>2.0.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>com.googlecode.maven-download-plugin</groupId> + <artifactId>download-maven-plugin</artifactId> + <version>1.7.1</version> + <executions> + <execution> + <id>download-bech32</id> + <goals> + <goal>wget</goal> + </goals> + <phase>generate-resources</phase> + <configuration> + <url>${bech32.url}</url> + <outputFileName>bech32-${bech32.version}.zip</outputFileName> + <unpack>true</unpack> + <outputDirectory>${project.build.directory}/classes/NAR-INF/bundled-dependencies</outputDirectory> + <sha256>990dc8e5a5e4feabbdf55207b5315fdd9b73db40be294a19b3752cde9e79d981</sha256> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <configuration> + <includeEmptyDirs>false</includeEmptyDirs> + <outputDirectory>${project.build.outputDirectory}</outputDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + <includes> + <include>*.py</include> + </includes> + </resource> + </resources> + </configuration> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/nifi-system-tests/nifi-python-test-extensions-nar/src/main/resources/WriteBech32Charset.py b/nifi-system-tests/nifi-python-test-extensions-nar/src/main/resources/WriteBech32Charset.py new file mode 100644 index 0000000000..363ecb6a6b --- /dev/null +++ b/nifi-system-tests/nifi-python-test-extensions-nar/src/main/resources/WriteBech32Charset.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import bech32 +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult + + +# A simple processor that demonstrates the ability to import and use a third-party library that is not defined as a dependency +# but that is bundled with the nar file. This processor uses the bech32 library to return the bech32.CHARSET value. +class WriteBech32Charset(FlowFileTransform): + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileTransform'] + class ProcessorDetails: + version = '0.0.1-SNAPSHOT' + + def __init__(self, **kwargs): + pass + + def transform(self, context, flowFile): + return FlowFileTransformResult(relationship = "success", contents = bech32.CHARSET) diff --git a/nifi-system-tests/nifi-system-test-suite/pom.xml b/nifi-system-tests/nifi-system-test-suite/pom.xml index 61a69646c1..a4a650b6b8 100644 --- a/nifi-system-tests/nifi-system-test-suite/pom.xml +++ b/nifi-system-tests/nifi-system-test-suite/pom.xml @@ -40,6 +40,7 @@ <configuration> <excludes> <exclude>PythonProcessorIT.java</exclude> + <exclude>PythonNarIT.java</exclude> </excludes> </configuration> </plugin> @@ -323,6 +324,12 @@ <artifactId>nifi-python-test-extensions</artifactId> <version>2.0.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-python-test-extensions-nar</artifactId> + <version>2.0.0-SNAPSHOT</version> + <type>nar</type> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml b/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml index 26558507c2..9662e6400f 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml +++ b/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml @@ -118,6 +118,18 @@ </unpackOptions> </dependencySet> + <dependencySet> + <scope>runtime</scope> + <useProjectArtifact>false</useProjectArtifact> + <outputDirectory>lib/python-nars</outputDirectory> + <directoryMode>0770</directoryMode> + <fileMode>0664</fileMode> + <useTransitiveFiltering>true</useTransitiveFiltering> + <unpack>false</unpack> + <includes> + <include>*:nifi-python-test-extensions-nar</include> + </includes> + </dependencySet> </dependencySets> </assembly> diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonNarIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonNarIT.java new file mode 100644 index 0000000000..6090bd0d07 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonNarIT.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.python; + +import org.apache.nifi.tests.system.NiFiInstanceFactory; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PythonNarIT extends NiFiSystemIT { + + @Override + public NiFiInstanceFactory getInstanceFactory() { + return createPythonicInstanceFactory(); + } + + @Override + protected boolean isAllowFactoryReuse() { + return false; + } + + @Test + public void testRunProcessorWithBundledDependencies() throws IOException, NiFiClientException, InterruptedException { + final File nifiHome = getNiFiInstance().getInstanceDirectory(); + final File lib = new File(nifiHome, "lib"); + final File pythonNars = new File(lib, "python-nars"); + final File[] narFiles = pythonNars.listFiles((dir, name) -> name.endsWith(".nar")); + assertNotNull(narFiles); + + // Copy the python nar files to the lib directory so that they will be loaded on restart. + for (final File narFile : narFiles) { + Files.copy(narFile.toPath(), lib.toPath().resolve(narFile.getName()), StandardCopyOption.REPLACE_EXISTING); + } + + // Delete all python extensions from the python/extensions directory to ensure they are not loaded. + final File pythonExtensions = new File(nifiHome, "python/extensions"); + final File[] extensionFiles = pythonExtensions.listFiles(); + assertNotNull(extensionFiles); + for (final File extensionFile : extensionFiles) { + deleteRecursively(extensionFile); + } + + // Restart NiFi + getNiFiInstance().stop(); + getNiFiInstance().start(true); + + // Create instance of the WriteNumpyVersionBundledDependency processor, and connect a GenerateFlowFile to it + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + ProcessorEntity writeVersion = getClientUtil().createPythonProcessor("WriteBech32Charset"); + writeVersion = getClientUtil().setAutoTerminatedRelationships(writeVersion, "failure"); + final ConnectionEntity generateToWriteVersion = getClientUtil().createConnection(generate, writeVersion, "success"); + + // Create a TerminateFlowFile processor and connect the WriteNumpyVersionBundledDependency processor to it + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + final ConnectionEntity writeVersionToTerminate = getClientUtil().createConnection(writeVersion, terminate, "success"); + + // Wait for processor validation to complete + getClientUtil().waitForValidProcessor(generate.getId()); + getClientUtil().waitForValidProcessor(writeVersion.getId()); + + // Run the flow + getClientUtil().startProcessor(generate); + waitForQueueCount(generateToWriteVersion.getId(), 1); + getClientUtil().startProcessor(writeVersion); + waitForQueueCount(writeVersionToTerminate.getId(), 1); + + // Verify the output + final String contents = getClientUtil().getFlowFileContentAsUtf8(writeVersionToTerminate.getId(), 0); + // Ensure that the contents written to the FlowFile are the the 32 characters used by the bech32 encoding + assertEquals("qpzry9x8gf2tvdw0s3jn54khce6mua7l", contents); + } + + private void deleteRecursively(final File file) { + if (file.isDirectory()) { + final File[] children = file.listFiles(); + assertNotNull(children); + + for (final File child : children) { + deleteRecursively(child); + } + } + assertTrue(file.delete()); + } +} diff --git a/nifi-system-tests/pom.xml b/nifi-system-tests/pom.xml index 8446edce34..c8253c2e46 100644 --- a/nifi-system-tests/pom.xml +++ b/nifi-system-tests/pom.xml @@ -29,6 +29,7 @@ <module>nifi-system-test-extensions2-bundle</module> <module>nifi-alternate-config-extensions-bundle</module> <module>nifi-system-test-nar-provider-bundles</module> + <module>nifi-python-test-extensions-nar</module> <module>nifi-system-test-suite</module> <module>nifi-stateless-system-test-suite</module> </modules>