This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 754baf0a37 NIFI-12308: Create Python Environment in background thread instead of during Processor creation 754baf0a37 is described below commit 754baf0a37cb156cbab08ca7884a7ccfe12b7df1 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Wed Nov 1 14:02:12 2023 -0400 NIFI-12308: Create Python Environment in background thread instead of during Processor creation This closes #7971 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../nifi/components/AsyncLoadedProcessor.java | 6 +- .../nifi/controller/AbstractComponentNode.java | 1 + .../components/ClassLoaderAwarePythonBridge.java | 3 +- .../apache/nifi/controller/ExtensionBuilder.java | 45 ++-- .../nar/StandardExtensionDiscoveringManager.java | 69 ++--- .../org/apache/nifi/py4j/StandardPythonBridge.java | 28 +- .../nifi/py4j/StandardPythonProcessorBridge.java | 77 ++---- .../python/processor/FlowFileTransformProxy.java | 9 +- .../python/processor/PythonProcessorProxy.java | 103 +++++++- .../python/processor/RecordTransformProxy.java | 9 +- .../nifi-py4j-integration-tests/pom.xml | 12 +- .../PythonControllerInteractionIT.java | 294 +++++++-------------- .../apache/nifi/python/DisabledPythonBridge.java | 4 +- .../java/org/apache/nifi/python/PythonBridge.java | 7 +- .../python/processor/PythonProcessorBridge.java | 10 +- 15 files changed, 309 insertions(+), 368 deletions(-) diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java index 14f7e31cb8..68ce2953bf 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java @@ -17,7 +17,9 @@ package org.apache.nifi.components; -public interface AsyncLoadedProcessor { +import org.apache.nifi.processor.Processor; + +public interface AsyncLoadedProcessor extends Processor { default boolean isLoaded() { return getState() == LoadState.FINISHED_LOADING; } @@ -25,6 +27,8 @@ public interface AsyncLoadedProcessor { LoadState getState(); enum LoadState { + INITIALIZING_ENVIRONMENT, + DOWNLOADING_DEPENDENCIES, LOADING_PROCESSOR_CODE, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java index c9fc336381..0b6d3bbd72 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java @@ -878,6 +878,7 @@ public abstract class AbstractComponentNode implements ComponentNode { if (component instanceof final AsyncLoadedProcessor asyncLoadedProcessor) { if (!asyncLoadedProcessor.isLoaded()) { final String explanation = switch (asyncLoadedProcessor.getState()) { + case INITIALIZING_ENVIRONMENT -> "Initializing runtime environment for the Processor."; case DEPENDENCY_DOWNLOAD_FAILED -> "Failed to download one or more Processor dependencies. See logs for additional details."; case DOWNLOADING_DEPENDENCIES -> "In the process of downloading third-party dependencies required by the Processor."; case LOADING_PROCESSOR_CODE -> "In the process of loading Processor code"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java index 06bcd4dc70..8a49feaf92 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java @@ -22,7 +22,6 @@ import org.apache.nifi.python.BoundObjectCounts; import org.apache.nifi.python.PythonBridge; import org.apache.nifi.python.PythonBridgeInitializationContext; import org.apache.nifi.python.PythonProcessorDetails; -import org.apache.nifi.python.processor.PythonProcessorBridge; import java.io.IOException; import java.util.List; @@ -97,7 +96,7 @@ public class ClassLoaderAwarePythonBridge implements PythonBridge { } @Override - public PythonProcessorBridge createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) { + public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) { try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(classLoader)) { return delegate.createProcessor(identifier, type, version, preferIsolatedProcess); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java index 43ee011ea2..13c7609896 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java @@ -16,16 +16,6 @@ */ package org.apache.nifi.controller; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Proxy; -import java.net.URL; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import javax.net.ssl.SSLContext; import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; @@ -77,8 +67,6 @@ import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessorInitializationContext; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.python.PythonBridge; -import org.apache.nifi.python.processor.PythonProcessorBridge; -import org.apache.nifi.python.processor.PythonProcessorInitializationContext; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext; import org.apache.nifi.registry.flow.FlowRegistryClientNode; @@ -94,6 +82,17 @@ import org.apache.nifi.validation.RuleViolationsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Proxy; +import java.net.URL; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + public class ExtensionBuilder { private static final Logger logger = LoggerFactory.getLogger(ExtensionBuilder.class); @@ -750,9 +749,9 @@ public class ExtensionBuilder { final Processor processor = processorComponent.getComponent(); - final ProcessorInitializationContext initiContext = new StandardProcessorInitializationContext(identifier, processorComponent.getLogger(), + final ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(identifier, processorComponent.getLogger(), serviceProvider, nodeTypeProvider, kerberosConfig); - processor.initialize(initiContext); + processor.initialize(initContext); final Bundle bundle = extensionManager.getBundle(bundleCoordinate); verifyControllerServiceReferences(processor, bundle.getClassLoader()); @@ -867,24 +866,14 @@ public class ExtensionBuilder { // TODO: This is a hack because there's a bug in the UI that causes it to not load extensions that don't have a `.` in the type. final String processorType = type.startsWith("python.") ? type.substring("python.".length()) : type; - final PythonProcessorBridge processorBridge = pythonBridge.createProcessor(identifier, processorType, bundleCoordinate.getVersion(), true); - final Processor processor = processorBridge.getProcessorProxy(); + final Processor processor = pythonBridge.createProcessor(identifier, processorType, bundleCoordinate.getVersion(), true); final ComponentLog componentLog = new SimpleProcessLogger(identifier, processor, new StandardLoggingContext(null)); final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLog); - final PythonProcessorInitializationContext initContext = new PythonProcessorInitializationContext() { - @Override - public String getIdentifier() { - return identifier; - } - - @Override - public ComponentLog getLogger() { - return terminationAwareLogger; - } - }; - processorBridge.initialize(initContext); + final ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(identifier, terminationAwareLogger, + serviceProvider, nodeTypeProvider, kerberosConfig); + processor.initialize(initContext); return new LoggableComponent<>(processor, bundleCoordinate, terminationAwareLogger); } finally { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java index 24058fd045..bed296e3c2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java @@ -16,29 +16,6 @@ */ package org.apache.nifi.nar; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.authentication.LoginIdentityProvider; import org.apache.nifi.authorization.AccessPolicyProvider; @@ -63,16 +40,12 @@ import org.apache.nifi.flowanalysis.FlowAnalysisRule; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.init.ConfigurableComponentInitializer; import org.apache.nifi.init.ConfigurableComponentInitializerFactory; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.mock.MockComponentLogger; import org.apache.nifi.nar.ExtensionDefinition.ExtensionRuntime; import org.apache.nifi.parameter.ParameterProvider; import org.apache.nifi.processor.Processor; import org.apache.nifi.provenance.ProvenanceRepository; import org.apache.nifi.python.PythonBridge; import org.apache.nifi.python.PythonProcessorDetails; -import org.apache.nifi.python.processor.PythonProcessorBridge; -import org.apache.nifi.python.processor.PythonProcessorInitializationContext; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingTask; @@ -80,6 +53,30 @@ import org.apache.nifi.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + /** * Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs). * @@ -711,23 +708,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering final String type = classType.startsWith(PYTHON_TYPE_PREFIX) ? classType.substring(PYTHON_TYPE_PREFIX.length()) : classType; final String procId = "temp-component-" + type; - final PythonProcessorBridge processorBridge = pythonBridge.createProcessor(procId, type, bundleCoordinate.getVersion(), false); - tempComponent = processorBridge.getProcessorProxy(); - - final ComponentLog componentLog = new MockComponentLogger(); - final PythonProcessorInitializationContext initContext = new PythonProcessorInitializationContext() { - @Override - public String getIdentifier() { - return procId; - } - - @Override - public ComponentLog getLogger() { - return componentLog; - } - }; - - processorBridge.initialize(initContext); + tempComponent = pythonBridge.createProcessor(procId, type, bundleCoordinate.getVersion(), false); } else { final Class<?> componentClass = Class.forName(classType, true, bundleClassLoader); tempComponent = (ConfigurableComponent) componentClass.getDeclaredConstructor().newInstance(); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java index faa1afdd2d..64392717c8 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java @@ -17,6 +17,7 @@ package org.apache.nifi.py4j; +import org.apache.nifi.components.AsyncLoadedProcessor; import org.apache.nifi.python.BoundObjectCounts; import org.apache.nifi.python.ControllerServiceTypeLookup; import org.apache.nifi.python.PythonBridge; @@ -24,8 +25,12 @@ import org.apache.nifi.python.PythonBridgeInitializationContext; import org.apache.nifi.python.PythonController; import org.apache.nifi.python.PythonProcessConfig; import org.apache.nifi.python.PythonProcessorDetails; +import org.apache.nifi.python.processor.FlowFileTransform; +import org.apache.nifi.python.processor.FlowFileTransformProxy; import org.apache.nifi.python.processor.PythonProcessorAdapter; import org.apache.nifi.python.processor.PythonProcessorBridge; +import org.apache.nifi.python.processor.RecordTransform; +import org.apache.nifi.python.processor.RecordTransformProxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; import java.util.stream.Collectors; public class StandardPythonBridge implements PythonBridge { @@ -89,8 +95,7 @@ public class StandardPythonBridge implements PythonBridge { controllerProcess.getController().discoverExtensions(extensionsDirs, workDirPath); } - @Override - public PythonProcessorBridge createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) { + private PythonProcessorBridge createProcessorBridge(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) { ensureStarted(); logger.debug("Creating Python Processor of type {}", type); @@ -127,6 +132,25 @@ public class StandardPythonBridge implements PythonBridge { return processorBridge; } + @Override + public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) { + final PythonProcessorDetails processorDetails = getProcessorTypes().stream() + .filter(details -> details.getProcessorType().equals(type)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Unknown Python Processor type: " + type)); + + final String implementedInterface = processorDetails.getInterface(); + final Supplier<PythonProcessorBridge> processorBridgeFactory = () -> createProcessorBridge(identifier, type, version, preferIsolatedProcess); + + if (FlowFileTransform.class.getName().equals(implementedInterface)) { + return new FlowFileTransformProxy(type, processorBridgeFactory); + } + if (RecordTransform.class.getName().equals(implementedInterface)) { + return new RecordTransformProxy(type, processorBridgeFactory); + } + return null; + } + @Override public synchronized void onProcessorRemoved(final String identifier, final String type, final String version) { final ExtensionId extensionId = new ExtensionId(type, version); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java index 7ae9cef081..ba9d95fe01 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java @@ -17,24 +17,16 @@ package org.apache.nifi.py4j; -import org.apache.nifi.processor.Processor; import org.apache.nifi.python.PythonController; import org.apache.nifi.python.PythonProcessorDetails; -import org.apache.nifi.python.processor.FlowFileTransform; -import org.apache.nifi.python.processor.FlowFileTransformProxy; import org.apache.nifi.python.processor.PythonProcessorAdapter; import org.apache.nifi.python.processor.PythonProcessorBridge; import org.apache.nifi.python.processor.PythonProcessorInitializationContext; -import org.apache.nifi.python.processor.PythonProcessorProxy; -import org.apache.nifi.python.processor.RecordTransform; -import org.apache.nifi.python.processor.RecordTransformProxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState; @@ -45,7 +37,6 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { private final ProcessorCreationWorkflow creationWorkflow; private final PythonProcessorDetails processorDetails; private volatile PythonProcessorAdapter adapter; - private final PythonProcessorProxy proxy; private final File workingDir; private final File moduleFile; private volatile long lastModified; @@ -60,8 +51,6 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { this.workingDir = builder.workDir; this.moduleFile = builder.moduleFile; this.lastModified = this.moduleFile.lastModified(); - - this.proxy = createProxy(); } @Override @@ -70,46 +59,35 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { } @Override - public Future<Void> initialize(final PythonProcessorInitializationContext context) { + public void initialize(final PythonProcessorInitializationContext context) { this.initializationContext = context; final String threadName = "Initialize Python Processor %s (%s)".formatted(initializationContext.getIdentifier(), getProcessorType()); - final CompletableFuture<Void> future = new CompletableFuture<>(); - - Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(future)); - return future; + Thread.ofVirtual().name(threadName).start(this::initializePythonSide); } public LoadState getLoadState() { return loadState; } - private void initializePythonSide(final CompletableFuture<Void> future) { + private void initializePythonSide() { try { - try { - creationWorkflow.downloadDependencies(); - loadState = LoadState.LOADING_PROCESSOR_CODE; - } catch (final Exception e) { - loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED; - throw e; - } - - final PythonProcessorAdapter pythonProcessorAdapter; - try { - pythonProcessorAdapter = creationWorkflow.createProcessor(); - pythonProcessorAdapter.initialize(initializationContext); - this.adapter = pythonProcessorAdapter; - this.proxy.onPythonSideInitialized(pythonProcessorAdapter); - - loadState = LoadState.FINISHED_LOADING; - } catch (final Exception e) { - loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED; - throw e; - } + creationWorkflow.downloadDependencies(); + loadState = LoadState.LOADING_PROCESSOR_CODE; + } catch (final Exception e) { + loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED; + throw e; + } - future.complete(null); - } catch (final Throwable t) { - future.completeExceptionally(t); + final PythonProcessorAdapter pythonProcessorAdapter; + try { + pythonProcessorAdapter = creationWorkflow.createProcessor(); + pythonProcessorAdapter.initialize(initializationContext); + this.adapter = pythonProcessorAdapter; + loadState = LoadState.FINISHED_LOADING; + } catch (final Exception e) { + loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED; + throw e; } } @@ -118,11 +96,6 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { return processorDetails.getProcessorType(); } - @Override - public Processor getProcessorProxy() { - return proxy; - } - @Override public boolean reload() { if (moduleFile.lastModified() <= lastModified) { @@ -131,24 +104,12 @@ public class StandardPythonProcessorBridge implements PythonProcessorBridge { } controller.reloadProcessor(getProcessorType(), processorDetails.getProcessorVersion(), workingDir.getAbsolutePath()); - initializePythonSide(new CompletableFuture<>()); + initializePythonSide(); lastModified = moduleFile.lastModified(); return true; } - private PythonProcessorProxy createProxy() { - final String implementedInterface = processorDetails.getInterface(); - if (FlowFileTransform.class.getName().equals(implementedInterface)) { - return new FlowFileTransformProxy(this); - } - if (RecordTransform.class.getName().equals(implementedInterface)) { - return new RecordTransformProxy(this); - } - - throw new IllegalArgumentException("Python Processor does not implement any of the valid interfaces. Interface implemented: " + implementedInterface); - } - public static class Builder { private PythonController controller; diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java index d7f46cda67..0af61d738a 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java @@ -29,21 +29,20 @@ import py4j.Py4JNetworkException; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; @InputRequirement(Requirement.INPUT_REQUIRED) public class FlowFileTransformProxy extends PythonProcessorProxy { - private final PythonProcessorBridge bridge; private volatile FlowFileTransform transform; - public FlowFileTransformProxy(final PythonProcessorBridge bridge) { - super(bridge); - this.bridge = bridge; + public FlowFileTransformProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory) { + super(processorType, bridgeFactory); } - @OnScheduled public void setContext(final ProcessContext context) { + final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new IllegalStateException(this + " is not finished initializing")); final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter(); if (optionalAdapter.isEmpty()) { throw new IllegalStateException(this + " is not finished initializing"); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java index dfcf5fd8c5..1b9383819e 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java @@ -26,8 +26,10 @@ import org.apache.nifi.components.AsyncLoadedProcessor; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import java.util.Collection; @@ -38,15 +40,18 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; @SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS) @SupportsSensitiveDynamicProperties public abstract class PythonProcessorProxy extends AbstractProcessor implements AsyncLoadedProcessor { - private final PythonProcessorBridge bridge; + private final String processorType; + private volatile PythonProcessorInitializationContext initContext; + private volatile PythonProcessorBridge bridge; private volatile Set<Relationship> cachedRelationships = null; private volatile List<PropertyDescriptor> cachedPropertyDescriptors = null; private volatile Map<String, PropertyDescriptor> cachedDynamicDescriptors = null; - private volatile boolean supportsDynamicProperties; + private volatile Boolean supportsDynamicProperties; protected static final Relationship REL_ORIGINAL = new Relationship.Builder() .name("original") @@ -61,16 +66,55 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements REL_ORIGINAL, REL_FAILURE); - public PythonProcessorProxy(final PythonProcessorBridge bridge) { - this.bridge = bridge; + public PythonProcessorProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory) { + this.processorType = processorType; + + Thread.ofVirtual().name("Initialize " + processorType).start(() -> { + this.bridge = bridgeFactory.get(); + + // If initialization context has already been set, initialize bridge. + final PythonProcessorInitializationContext pythonInitContext = initContext; + if (pythonInitContext != null) { + this.bridge.initialize(pythonInitContext); + } + }); } - public void onPythonSideInitialized(final PythonProcessorAdapter adapter) { - supportsDynamicProperties = adapter.isDynamicPropertySupported(); + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + + final PythonProcessorInitializationContext initContext = new PythonProcessorInitializationContext() { + @Override + public String getIdentifier() { + return context.getIdentifier(); + } + + @Override + public ComponentLog getLogger() { + return context.getLogger(); + } + }; + + this.initContext = initContext; + + // If Bridge has already been set, initialize it. + final PythonProcessorBridge bridge = this.bridge; + if (bridge != null) { + bridge.initialize(initContext); + } + } + + protected Optional<PythonProcessorBridge> getBridge() { + return Optional.ofNullable(this.bridge); } @Override public LoadState getState() { + if (bridge == null) { + return LoadState.INITIALIZING_ENVIRONMENT; + } + return bridge.getLoadState(); } @@ -80,6 +124,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements return this.cachedPropertyDescriptors; } + if (bridge == null) { + return Collections.emptyList(); + } + final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter(); if (optionalAdapter.isEmpty()) { // If we don't have the adapter yet, use whatever is cached, even if it's old, or an empty List if we have nothing cached. @@ -99,6 +147,14 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements @Override protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + if (bridge == null) { + return List.of(new ValidationResult.Builder() + .subject("Processor") + .explanation("Python environment is not yet initialized") + .valid(false) + .build()); + } + final LoadState loadState = bridge.getLoadState(); if (loadState == LoadState.LOADING_PROCESSOR_CODE || loadState == LoadState.DOWNLOADING_DEPENDENCIES) { return List.of(new ValidationResult.Builder() @@ -144,6 +200,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements return cachedDynamicDescriptors.get(propertyDescriptorName); } + if (bridge == null) { + return null; + } + try { final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter(); return optionalAdapter.map(adapter -> adapter.getSupportedDynamicPropertyDescriptor(propertyDescriptorName)) @@ -155,7 +215,18 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements } protected boolean isSupportsDynamicPropertyDescriptor() { - return supportsDynamicProperties; + if (this.supportsDynamicProperties != null) { + return supportsDynamicProperties; + } + + if (bridge == null) { + return false; + } + + final Optional<PythonProcessorAdapter> adapter = bridge.getProcessorAdapter(); + final boolean supported = adapter.map(PythonProcessorAdapter::isDynamicPropertySupported).orElse(false); + supportsDynamicProperties = supported; + return supported; } @OnScheduled @@ -193,6 +264,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements } private Set<Relationship> fetchRelationshipsFromPythonProcessor() { + if (bridge == null) { + return Collections.emptySet(); + } + Set<Relationship> processorRelationships; try { processorRelationships = bridge.getProcessorAdapter() @@ -210,6 +285,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements @OnScheduled public void onScheduled(final ProcessContext context) { + if (bridge == null) { + throw new IllegalStateException("Processor is not yet initialized"); + } + reload(); bridge.getProcessorAdapter() .orElseThrow(() -> new IllegalStateException("Processor has not finished initializing")) @@ -218,6 +297,10 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements @OnStopped public void onStopped(final ProcessContext context) { + if (bridge == null) { + throw new IllegalStateException("Processor is not yet initialized"); + } + bridge.getProcessorAdapter() .orElseThrow(() -> new IllegalStateException("Processor has not finished initializing")) .onStopped(context); @@ -225,10 +308,14 @@ public abstract class PythonProcessorProxy extends AbstractProcessor implements @Override public String toString() { - return "PythonProcessor[type=" + bridge.getProcessorType() + ", id=" + getIdentifier() + "]"; + return "PythonProcessor[type=" + processorType + ", id=" + getIdentifier() + "]"; } private void reload() { + if (bridge == null) { + return; + } + final boolean reloaded = bridge.reload(); if (reloaded) { getLogger().info("Successfully reloaded Processor"); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java index 93f140b052..ecae852962 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java @@ -56,10 +56,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.function.Supplier; @InputRequirement(Requirement.INPUT_REQUIRED) public class RecordTransformProxy extends PythonProcessorProxy { - private final PythonProcessorBridge bridge; private volatile RecordTransform transform; static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() @@ -78,9 +78,8 @@ public class RecordTransformProxy extends PythonProcessorProxy { .build(); - public RecordTransformProxy(final PythonProcessorBridge bridge) { - super(bridge); - this.bridge = bridge; + public RecordTransformProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory) { + super(processorType, bridgeFactory); } @Override @@ -94,6 +93,8 @@ public class RecordTransformProxy extends PythonProcessorProxy { @OnScheduled public void setProcessContext(final ProcessContext context) { + final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new IllegalStateException(this + " is not finished initializing")); + final Optional<PythonProcessorAdapter> adapterOptional = bridge.getProcessorAdapter(); if (adapterOptional.isEmpty()) { throw new IllegalStateException(this + " is not finished initializing"); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml index 188ad20cfc..2e272e6451 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml @@ -82,7 +82,17 @@ <artifactId>nifi-record-serialization-service-api</artifactId> <scope>test</scope> </dependency> - + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-services</artifactId> + <version>2.0.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java index 5660fb277d..30b7e9e1b4 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java @@ -17,34 +17,28 @@ package org.apache.nifi.py4j; +import org.apache.nifi.components.AsyncLoadedProcessor; +import org.apache.nifi.components.AsyncLoadedProcessor.LoadState; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.Validator; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.mock.MockComponentLogger; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.mock.MockProcessorInitializationContext; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.python.ControllerServiceTypeLookup; import org.apache.nifi.python.PythonBridge; import org.apache.nifi.python.PythonBridgeInitializationContext; import org.apache.nifi.python.PythonProcessConfig; import org.apache.nifi.python.PythonProcessorDetails; -import org.apache.nifi.python.processor.EmptyAttributeMap; import org.apache.nifi.python.processor.FlowFileTransformProxy; -import org.apache.nifi.python.processor.PythonProcessorBridge; -import org.apache.nifi.python.processor.PythonProcessorInitializationContext; -import org.apache.nifi.python.processor.RecordTransform; -import org.apache.nifi.python.processor.RecordTransformResult; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockPropertyValue; import org.apache.nifi.util.TestRunner; @@ -54,7 +48,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -74,13 +67,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.stream.Collectors; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; @@ -166,7 +156,7 @@ public class PythonControllerInteractionIT { final List<PythonProcessorDetails> extensionDetails = bridge.getProcessorTypes(); final List<String> types = extensionDetails.stream() .map(PythonProcessorDetails::getProcessorType) - .collect(Collectors.toList()); + .toList(); assertTrue(types.contains(PRETTY_PRINT_JSON)); assertTrue(types.contains("ConvertCsvToExcel")); @@ -186,10 +176,7 @@ public class PythonControllerInteractionIT { // Create a PrettyPrintJson Processor final byte[] jsonContent = Files.readAllBytes(Paths.get("src/test/resources/json/input/simple-person.json")); for (int i=0; i < 3; i++) { - final PythonProcessorBridge prettyPrintJson = createProcessor(PRETTY_PRINT_JSON); - assertNotNull(prettyPrintJson); - - final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(prettyPrintJson); + final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON); final TestRunner runner = TestRunners.newTestRunner(wrapper); runner.enqueue(jsonContent); @@ -203,11 +190,7 @@ public class PythonControllerInteractionIT { @Disabled("Just for manual testing...") public void runPrettyPrintJsonManyThreads() throws IOException { // Create a PrettyPrintJson Processor - final PythonProcessorBridge prettyPrintJson = createProcessor(PRETTY_PRINT_JSON); - assertNotNull(prettyPrintJson); - - // Setup - final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(prettyPrintJson); + final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON); final TestRunner runner = TestRunners.newTestRunner(wrapper); final int flowFileCount = 100_000; @@ -226,12 +209,8 @@ public class PythonControllerInteractionIT { @Test public void testSimplePrettyPrint() throws IOException { - // Discover extensions so that they can be created - final PythonProcessorBridge prettyPrintJson = createProcessor(PRETTY_PRINT_JSON); - assertNotNull(prettyPrintJson); - // Setup - final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(prettyPrintJson); + final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON); final TestRunner runner = TestRunners.newTestRunner(wrapper); runner.enqueue(Paths.get("src/test/resources/json/input/simple-person.json")); runner.setProperty("Indentation", "2"); @@ -250,8 +229,7 @@ public class PythonControllerInteractionIT { @Test public void testValidator() { - final PythonProcessorBridge prettyPrintJson = createProcessor(PRETTY_PRINT_JSON); - final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(prettyPrintJson); + final FlowFileTransformProxy wrapper = createFlowFileTransform(PRETTY_PRINT_JSON); final TestRunner runner = TestRunners.newTestRunner(wrapper); runner.setProperty("Indentation", "-1"); @@ -273,11 +251,7 @@ public class PythonControllerInteractionIT { @Test public void testCsvToExcel() { // Create a PrettyPrintJson Processor - final PythonProcessorBridge csvToExcel = createProcessor("ConvertCsvToExcel"); - assertNotNull(csvToExcel); - - // Setup - final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(csvToExcel); + final FlowFileTransformProxy wrapper = createFlowFileTransform("ConvertCsvToExcel"); final TestRunner runner = TestRunners.newTestRunner(wrapper); runner.enqueue("name, number\nJohn Doe, 500"); @@ -289,11 +263,8 @@ public class PythonControllerInteractionIT { @Test public void testExpressionLanguageWithAttributes() { - final PythonProcessorBridge writeProperty = createProcessor("WritePropertyToFlowFile"); - assertNotNull(writeProperty); - // Setup - final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(writeProperty); + final FlowFileTransformProxy wrapper = createFlowFileTransform("WritePropertyToFlowFile"); final TestRunner runner = TestRunners.newTestRunner(wrapper); runner.setProperty("Message", "Hola Mundo"); runner.enqueue("Hello World"); @@ -308,11 +279,7 @@ public class PythonControllerInteractionIT { @Test public void testPythonPackage() { // Create a WriteNumber Processor - final PythonProcessorBridge procBridge = createProcessor("WriteNumber"); - assertNotNull(procBridge); - - // Setup - final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(procBridge); + final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteNumber"); final TestRunner runner = TestRunners.newTestRunner(wrapper); runner.enqueue(""); @@ -326,6 +293,14 @@ public class PythonControllerInteractionIT { assertTrue(resultNum <= 1000); } + private FlowFileTransformProxy createFlowFileTransform(final String type) { + final Processor processor = createProcessor(type); + assertNotNull(processor); + + processor.initialize(new MockProcessorInitializationContext()); + return (FlowFileTransformProxy) processor; + } + @Test public void testImportRequirements() { // Discover extensions so that they can be created @@ -340,12 +315,8 @@ public class PythonControllerInteractionIT { assertEquals(1, dependencies.size()); assertEquals("numpy==1.25.0", dependencies.get(0)); - // Create a PrettyPrintJson Processor - final PythonProcessorBridge writeNumPyVersion = createProcessor("WriteNumpyVersion"); - assertNotNull(writeNumPyVersion); - // Setup - final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(writeNumPyVersion); + final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteNumpyVersion"); final TestRunner runner = TestRunners.newTestRunner(wrapper); runner.enqueue("Hello World"); @@ -359,13 +330,9 @@ public class PythonControllerInteractionIT { @Test public void testControllerService() throws InitializationException { - final PythonProcessorBridge processor = createProcessor("LookupAddress"); - assertNotNull(processor); - - controllerServiceMap.put("StringLookupService", TestLookupService.class); - // Setup - final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(processor); + controllerServiceMap.put("StringLookupService", TestLookupService.class); + final FlowFileTransformProxy wrapper = createFlowFileTransform("LookupAddress"); final TestRunner runner = TestRunners.newTestRunner(wrapper); final StringLookupService lookupService = new TestLookupService((Collections.singletonMap("John Doe", "123 My Street"))); runner.addControllerService("lookup", lookupService); @@ -392,13 +359,8 @@ public class PythonControllerInteractionIT { // Ensure that we started with "Hello, World" because if the test is run multiple times, we may already be starting with the modified version replaceFileText(sourceFile, replacement, originalMessage); - // Create a PrettyPrintJson Processor - final PythonProcessorBridge processor = createProcessor("WriteMessage"); - processor.initialize(createInitContext()); - assertNotNull(processor); - // Setup - final FlowFileTransformProxy wrapper = new FlowFileTransformProxy(processor); + final FlowFileTransformProxy wrapper = createFlowFileTransform("WriteMessage"); final TestRunner runner = TestRunners.newTestRunner(wrapper); runner.enqueue(""); @@ -465,8 +427,7 @@ public class PythonControllerInteractionIT { assertEquals(1, v2Count); // Create a WriteMessage Processor, version 0.0.1-SNAPSHOT - final PythonProcessorBridge procV1 = createProcessor("WriteMessage"); - final FlowFileTransformProxy wrapperV1 = new FlowFileTransformProxy(procV1); + final FlowFileTransformProxy wrapperV1 = createFlowFileTransform("WriteMessage"); final TestRunner runnerV1 = TestRunners.newTestRunner(wrapperV1); runnerV1.enqueue(""); @@ -477,9 +438,8 @@ public class PythonControllerInteractionIT { runnerV1.getFlowFilesForRelationship("success").get(0).assertContentEquals("Hello, World"); // Create an instance of WriteMessage V2 - final PythonProcessorBridge procV2 = createProcessor("WriteMessage", "0.0.2-SNAPSHOT"); - final FlowFileTransformProxy wrapperV2 = new FlowFileTransformProxy(procV2); - final TestRunner runnerV2 = TestRunners.newTestRunner(wrapperV2); + final Processor procV2 = createProcessor("WriteMessage", "0.0.2-SNAPSHOT"); + final TestRunner runnerV2 = TestRunners.newTestRunner(procV2); runnerV2.enqueue(""); // Trigger the processor @@ -490,40 +450,23 @@ public class PythonControllerInteractionIT { } @Test - public void testRecordTransformWithDynamicProperties() { - // Create a PrettyPrintJson Processor - final PythonProcessorBridge processor = createProcessor("SetRecordField"); - assertNotNull(processor); - - // Mock out ProcessContext to reflect that the processor should set the 'name' field to 'Jane Doe' - final PropertyDescriptor nameDescriptor = new PropertyDescriptor.Builder() - .name("name") - .dynamic(true) - .addValidator(Validator.VALID) - .build(); - final PropertyDescriptor numberDescriptor = new PropertyDescriptor.Builder() - .name("number") - .dynamic(true) - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .build(); - - final Map<PropertyDescriptor, String> propertyMap = new HashMap<>(); - propertyMap.put(nameDescriptor, "Jane Doe"); - propertyMap.put(numberDescriptor, "8"); - - final ProcessContext context = createContext(propertyMap); + public void testRecordTransformWithDynamicProperties() throws InitializationException { + // Create a SetRecordField Processor + final TestRunner runner = createRecordTransformRunner("SetRecordField"); + runner.setProperty("name", "Jane Doe"); + runner.setProperty("number", "8"); // Create a Record to transform and transform it final String json = "[{ \"name\": \"John Doe\" }]"; - final RecordSchema schema = createSimpleRecordSchema("name"); - final RecordTransform recordTransform = (RecordTransform) processor.getProcessorAdapter().get().getProcessor(); - recordTransform.setContext(context); - final RecordTransformResult result = recordTransform.transformRecord(json, schema, new EmptyAttributeMap()).get(0); + runner.enqueue(json); + runner.run(); + runner.assertTransferCount("original", 1); + runner.assertTransferCount("success", 1); // Verify the results - assertEquals("success", result.getRelationship()); - assertNull(result.getSchema()); - assertEquals("{\"name\": \"Jane Doe\", \"number\": \"8\"}", result.getRecordJson()); + final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0); + out.assertContentEquals(""" + [{"name":"Jane Doe","number":"8"}]"""); } private ProcessContext createContext(final Map<PropertyDescriptor, String> propertyValues) { @@ -543,72 +486,44 @@ public class PythonControllerInteractionIT { return context; } - @Test - public void testRecordTransformWithInnerRecord() { - // Create a PrettyPrintJson Processor - final PythonProcessorBridge processor = createProcessor("SetRecordField"); + private TestRunner createRecordTransformRunner(final String type) throws InitializationException { + final Processor processor = createProcessor("SetRecordField"); assertNotNull(processor); - // Mock out ProcessContext to reflect that the processor should set the 'name' field to 'Jane Doe' - final PropertyDescriptor nameDescriptor = new PropertyDescriptor.Builder() - .name("name") - .dynamic(true) - .addValidator(Validator.VALID) - .build(); - - final Map<PropertyDescriptor, String> propertyMap = new HashMap<>(); - propertyMap.put(nameDescriptor, "Jane Doe"); - final ProcessContext context = createContext(propertyMap); - - // Create a Record to transform and transform it - final String json = "[{\"name\": \"Jake Doe\", \"father\": { \"name\": \"John Doe\" }}]"; - final RecordSchema recordSchema = createTwoLevelRecord().getSchema(); - final RecordTransform recordTransform = (RecordTransform) processor.getProcessorAdapter().get().getProcessor(); - recordTransform.setContext(context); - final RecordTransformResult result = recordTransform.transformRecord(json, recordSchema, new EmptyAttributeMap()).get(0); + final JsonTreeReader reader = new JsonTreeReader(); + final JsonRecordSetWriter writer = new JsonRecordSetWriter(); - // Verify the results - assertEquals("success", result.getRelationship()); + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.addControllerService("reader", reader); + runner.addControllerService("writer", writer); + runner.enableControllerService(reader); + runner.enableControllerService(writer); + runner.setProperty("Record Reader", "reader"); + runner.setProperty("Record Writer", "writer"); - assertEquals("{\"name\": \"Jane Doe\", \"father\": {\"name\": \"John Doe\"}}", result.getRecordJson()); + return runner; } - @Test - public void testLogger() throws ExecutionException, InterruptedException { - bridge.discoverExtensions(); - - final String procId = createId(); - // Create the Processor, but we do not use this.createProcessor() because we want to explicitly have access to the - // initialization context to inject in the logger that we want. - final PythonProcessorBridge logContentsBridge = bridge.createProcessor(procId, "LogContents", VERSION, true); - - final ComponentLog logger = Mockito.mock(ComponentLog.class); - final PythonProcessorInitializationContext initContext = new PythonProcessorInitializationContext() { - @Override - public String getIdentifier() { - return procId; - } - - @Override - public ComponentLog getLogger() { - return logger; - } - }; + public void testRecordTransformWithInnerRecord() throws InitializationException { + // Create a SetRecordField Processor + final TestRunner runner = createRecordTransformRunner("SetRecordField"); + runner.setProperty("name", "Jane Doe"); - logContentsBridge.initialize(initContext).get(); - - final TestRunner runner = TestRunners.newTestRunner(logContentsBridge.getProcessorProxy()); - runner.enqueue("Hello World"); + // Create a Record to transform and transform it + final String json = "[{\"name\": \"Jake Doe\", \"father\": { \"name\": \"John Doe\" }}]"; + runner.enqueue(json); runner.run(); - runner.assertTransferCount("original", 1); + // Verify the results runner.assertTransferCount("success", 1); - final ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class); - Mockito.verify(logger).info(argumentCaptor.capture()); - assertEquals("Hello World", argumentCaptor.getValue()); + runner.assertTransferCount("original", 1); + final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0); + out.assertContentEquals(""" + [{"name":"Jane Doe","father":{"name":"John Doe"}}]"""); } + private RecordSchema createSimpleRecordSchema(final String... fieldNames) { return createSimpleRecordSchema(Arrays.asList(fieldNames)); } @@ -623,30 +538,6 @@ public class PythonControllerInteractionIT { return schema; } - private Record createSimpleRecord(final Map<String, Object> values) { - final List<RecordField> recordFields = new ArrayList<>(); - for (final Map.Entry<String, Object> entry : values.entrySet()) { - final DataType dataType = DataTypeUtils.inferDataType(entry.getValue(), RecordFieldType.STRING.getDataType()); - recordFields.add(new RecordField(entry.getKey(), dataType, true)); - } - - final RecordSchema schema = new SimpleRecordSchema(recordFields); - return new MapRecord(schema, values); - } - - private Record createTwoLevelRecord() { - final Map<String, Object> innerPersonValues = new HashMap<>(); - innerPersonValues.put("name", "Jake Doe"); - final Record innerPersonRecord = createSimpleRecord(innerPersonValues); - - final Map<String, Object> outerPersonValues = new HashMap<>(); - outerPersonValues.put("name", "John Doe"); - outerPersonValues.put("father", innerPersonRecord); - final Record outerPersonRecord = createSimpleRecord(outerPersonValues); - - return outerPersonRecord; - } - public interface StringLookupService extends ControllerService { Optional<String> lookup(Map<String, String> coordinates); @@ -670,37 +561,40 @@ public class PythonControllerInteractionIT { return UUID.randomUUID().toString(); } - private PythonProcessorBridge createProcessor(final String type) { + private Processor createProcessor(final String type) { return createProcessor(type, VERSION); } - private PythonProcessorBridge createProcessor(final String type, final String version) { + private Processor createProcessor(final String type, final String version) { bridge.discoverExtensions(); - final PythonProcessorBridge processor = bridge.createProcessor(createId(), type, version, true); - final Future<Void> future = processor.initialize(createInitContext()); - - try { - future.get(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e.getCause()); - } + final AsyncLoadedProcessor processor = bridge.createProcessor(createId(), type, version, true); - return processor; - } + final ProcessorInitializationContext initContext = new MockProcessorInitializationContext(); + processor.initialize(initContext); - private PythonProcessorInitializationContext createInitContext() { - return new PythonProcessorInitializationContext() { - @Override - public String getIdentifier() { - return "unit-test-id"; + final long maxInitTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L); + while (true) { + final LoadState state = processor.getState(); + if (state == LoadState.FINISHED_LOADING) { + break; + } + if (state == LoadState.DEPENDENCY_DOWNLOAD_FAILED || state == LoadState.LOADING_PROCESSOR_CODE_FAILED) { + throw new RuntimeException("Failed to initialize processor of type %s version %s".formatted(type, version)); } - @Override - public ComponentLog getLogger() { - return new MockComponentLogger(); + if (System.currentTimeMillis() > maxInitTime) { + throw new RuntimeException("Timed out waiting for processor of type %s version %s to initialize".formatted(type, version)); } - }; + + try { + Thread.sleep(10L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while initializing processor of type %s version %s".formatted(type, version)); + } + } + processor.initialize(new MockProcessorInitializationContext()); + return processor; } + } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java index f33b62bc81..2911a84839 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java @@ -17,7 +17,7 @@ package org.apache.nifi.python; -import org.apache.nifi.python.processor.PythonProcessorBridge; +import org.apache.nifi.components.AsyncLoadedProcessor; import java.io.IOException; import java.util.Collections; @@ -64,7 +64,7 @@ public class DisabledPythonBridge implements PythonBridge { } @Override - public PythonProcessorBridge createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) { + public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) { throw new UnsupportedOperationException("Cannot create Processor of type " + type + " because Python extensions are disabled"); } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java index dd15d75740..20d2c1f180 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java @@ -17,7 +17,7 @@ package org.apache.nifi.python; -import org.apache.nifi.python.processor.PythonProcessorBridge; +import org.apache.nifi.components.AsyncLoadedProcessor; import java.io.IOException; import java.util.List; @@ -88,8 +88,7 @@ public interface PythonBridge { void discoverExtensions(); /** - * Creates a Processor with the given identifier, type, and version. Then returns a PythonProcessorBridge that provides access to all - * necessary information and objects for interacting with this Processor from the Java side. + * Creates a Processor with the given identifier, type, and version. * * @param identifier the Processor's identifier * @param type the Processor's type @@ -97,7 +96,7 @@ public interface PythonBridge { * @param preferIsolatedProcess whether or not to prefer launching a Python Process that is isolated for just this one instance of the Processor * @return a PythonProcessorBridge that can be used for interacting with the Processor */ - PythonProcessorBridge createProcessor(String identifier, String type, String version, boolean preferIsolatedProcess); + AsyncLoadedProcessor createProcessor(String identifier, String type, String version, boolean preferIsolatedProcess); /** * A notification that the Processor with the given identifier, type, and version was removed from the flow. This triggers the bridge diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java index 7d08bdfae3..0a5b1af2c9 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java @@ -18,10 +18,8 @@ package org.apache.nifi.python.processor; import org.apache.nifi.components.AsyncLoadedProcessor.LoadState; -import org.apache.nifi.processor.Processor; import java.util.Optional; -import java.util.concurrent.Future; /** * A model object that is used to bridge the gap between what is necessary for the Framework to interact @@ -34,12 +32,6 @@ public interface PythonProcessorBridge { */ Optional<PythonProcessorAdapter> getProcessorAdapter(); - /** - * @return a proxy for the actual Processor implementation that will trigger the appropriate method on the Python side, or an empty Optional - * if the processor/adapter have not yet been initialized - */ - Processor getProcessorProxy(); - /** * @return the name of the Processor implementation. This will not contain a 'python.' prefix. */ @@ -57,7 +49,7 @@ public interface PythonProcessorBridge { * Initializes the Processor * @param context the initialization context */ - Future<Void> initialize(PythonProcessorInitializationContext context); + void initialize(PythonProcessorInitializationContext context); /** * @return the current state of the Processor loading