This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 754baf0a37 NIFI-12308: Create Python Environment in background thread 
instead of during Processor creation
754baf0a37 is described below

commit 754baf0a37cb156cbab08ca7884a7ccfe12b7df1
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Wed Nov 1 14:02:12 2023 -0400

    NIFI-12308: Create Python Environment in background thread instead of 
during Processor creation
    
    This closes #7971
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../nifi/components/AsyncLoadedProcessor.java      |   6 +-
 .../nifi/controller/AbstractComponentNode.java     |   1 +
 .../components/ClassLoaderAwarePythonBridge.java   |   3 +-
 .../apache/nifi/controller/ExtensionBuilder.java   |  45 ++--
 .../nar/StandardExtensionDiscoveringManager.java   |  69 ++---
 .../org/apache/nifi/py4j/StandardPythonBridge.java |  28 +-
 .../nifi/py4j/StandardPythonProcessorBridge.java   |  77 ++----
 .../python/processor/FlowFileTransformProxy.java   |   9 +-
 .../python/processor/PythonProcessorProxy.java     | 103 +++++++-
 .../python/processor/RecordTransformProxy.java     |   9 +-
 .../nifi-py4j-integration-tests/pom.xml            |  12 +-
 .../PythonControllerInteractionIT.java             | 294 +++++++--------------
 .../apache/nifi/python/DisabledPythonBridge.java   |   4 +-
 .../java/org/apache/nifi/python/PythonBridge.java  |   7 +-
 .../python/processor/PythonProcessorBridge.java    |  10 +-
 15 files changed, 309 insertions(+), 368 deletions(-)

diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java
index 14f7e31cb8..68ce2953bf 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java
@@ -17,7 +17,9 @@
 
 package org.apache.nifi.components;
 
-public interface AsyncLoadedProcessor {
+import org.apache.nifi.processor.Processor;
+
+public interface AsyncLoadedProcessor extends Processor {
     default boolean isLoaded() {
         return getState() == LoadState.FINISHED_LOADING;
     }
@@ -25,6 +27,8 @@ public interface AsyncLoadedProcessor {
     LoadState getState();
 
     enum LoadState {
+        INITIALIZING_ENVIRONMENT,
+
         DOWNLOADING_DEPENDENCIES,
 
         LOADING_PROCESSOR_CODE,
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index c9fc336381..0b6d3bbd72 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -878,6 +878,7 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
             if (component instanceof final AsyncLoadedProcessor 
asyncLoadedProcessor) {
                 if (!asyncLoadedProcessor.isLoaded()) {
                     final String explanation = switch 
(asyncLoadedProcessor.getState()) {
+                        case INITIALIZING_ENVIRONMENT -> "Initializing runtime 
environment for the Processor.";
                         case DEPENDENCY_DOWNLOAD_FAILED -> "Failed to download 
one or more Processor dependencies. See logs for additional details.";
                         case DOWNLOADING_DEPENDENCIES -> "In the process of 
downloading third-party dependencies required by the Processor.";
                         case LOADING_PROCESSOR_CODE -> "In the process of 
loading Processor code";
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
index 06bcd4dc70..8a49feaf92 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
@@ -22,7 +22,6 @@ import org.apache.nifi.python.BoundObjectCounts;
 import org.apache.nifi.python.PythonBridge;
 import org.apache.nifi.python.PythonBridgeInitializationContext;
 import org.apache.nifi.python.PythonProcessorDetails;
-import org.apache.nifi.python.processor.PythonProcessorBridge;
 
 import java.io.IOException;
 import java.util.List;
@@ -97,7 +96,7 @@ public class ClassLoaderAwarePythonBridge implements 
PythonBridge {
     }
 
     @Override
-    public PythonProcessorBridge createProcessor(final String identifier, 
final String type, final String version, final boolean preferIsolatedProcess) {
+    public AsyncLoadedProcessor createProcessor(final String identifier, final 
String type, final String version, final boolean preferIsolatedProcess) {
         try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(classLoader)) {
             return delegate.createProcessor(identifier, type, version, 
preferIsolatedProcess);
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
index 43ee011ea2..13c7609896 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
@@ -16,16 +16,6 @@
  */
 package org.apache.nifi.controller;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Proxy;
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import javax.net.ssl.SSLContext;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
@@ -77,8 +67,6 @@ import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardProcessorInitializationContext;
 import org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.python.PythonBridge;
-import org.apache.nifi.python.processor.PythonProcessorBridge;
-import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext;
 import org.apache.nifi.registry.flow.FlowRegistryClientNode;
@@ -94,6 +82,17 @@ import org.apache.nifi.validation.RuleViolationsManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Proxy;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
 public class ExtensionBuilder {
    private static final Logger logger = 
LoggerFactory.getLogger(ExtensionBuilder.class);
 
@@ -750,9 +749,9 @@ public class ExtensionBuilder {
 
            final Processor processor = processorComponent.getComponent();
 
-           final ProcessorInitializationContext initiContext = new 
StandardProcessorInitializationContext(identifier, 
processorComponent.getLogger(),
+           final ProcessorInitializationContext initContext = new 
StandardProcessorInitializationContext(identifier, 
processorComponent.getLogger(),
                    serviceProvider, nodeTypeProvider, kerberosConfig);
-           processor.initialize(initiContext);
+           processor.initialize(initContext);
 
            final Bundle bundle = extensionManager.getBundle(bundleCoordinate);
            verifyControllerServiceReferences(processor, 
bundle.getClassLoader());
@@ -867,24 +866,14 @@ public class ExtensionBuilder {
 
            // TODO: This is a hack because there's a bug in the UI that causes 
it to not load extensions that don't have a `.` in the type.
            final String processorType = type.startsWith("python.") ? 
type.substring("python.".length()) : type;
-           final PythonProcessorBridge processorBridge = 
pythonBridge.createProcessor(identifier, processorType, 
bundleCoordinate.getVersion(), true);
-           final Processor processor = processorBridge.getProcessorProxy();
+           final Processor processor = 
pythonBridge.createProcessor(identifier, processorType, 
bundleCoordinate.getVersion(), true);
 
            final ComponentLog componentLog = new 
SimpleProcessLogger(identifier, processor, new StandardLoggingContext(null));
            final TerminationAwareLogger terminationAwareLogger = new 
TerminationAwareLogger(componentLog);
 
-           final PythonProcessorInitializationContext initContext = new 
PythonProcessorInitializationContext() {
-               @Override
-               public String getIdentifier() {
-                   return identifier;
-               }
-
-               @Override
-               public ComponentLog getLogger() {
-                   return terminationAwareLogger;
-               }
-           };
-           processorBridge.initialize(initContext);
+           final ProcessorInitializationContext initContext = new 
StandardProcessorInitializationContext(identifier, terminationAwareLogger,
+               serviceProvider, nodeTypeProvider, kerberosConfig);
+           processor.initialize(initContext);
 
            return new LoggableComponent<>(processor, bundleCoordinate, 
terminationAwareLogger);
        } finally {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
index 24058fd045..bed296e3c2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
@@ -16,29 +16,6 @@
  */
 package org.apache.nifi.nar;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
 import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.authentication.LoginIdentityProvider;
 import org.apache.nifi.authorization.AccessPolicyProvider;
@@ -63,16 +40,12 @@ import org.apache.nifi.flowanalysis.FlowAnalysisRule;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.init.ConfigurableComponentInitializer;
 import org.apache.nifi.init.ConfigurableComponentInitializerFactory;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.mock.MockComponentLogger;
 import org.apache.nifi.nar.ExtensionDefinition.ExtensionRuntime;
 import org.apache.nifi.parameter.ParameterProvider;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.provenance.ProvenanceRepository;
 import org.apache.nifi.python.PythonBridge;
 import org.apache.nifi.python.PythonProcessorDetails;
-import org.apache.nifi.python.processor.PythonProcessorBridge;
-import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingTask;
@@ -80,6 +53,30 @@ import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
 /**
  * Scans through the classpath to load all FlowFileProcessors, 
FlowFileComparators, and ReportingTasks using the service provider API and 
running through all classloaders (root, NARs).
  *
@@ -711,23 +708,7 @@ public class StandardExtensionDiscoveringManager 
implements ExtensionDiscovering
                 final String type = classType.startsWith(PYTHON_TYPE_PREFIX) ? 
classType.substring(PYTHON_TYPE_PREFIX.length()) : classType;
 
                 final String procId = "temp-component-" + type;
-                final PythonProcessorBridge processorBridge = 
pythonBridge.createProcessor(procId, type, bundleCoordinate.getVersion(), 
false);
-                tempComponent = processorBridge.getProcessorProxy();
-
-                final ComponentLog componentLog = new MockComponentLogger();
-                final PythonProcessorInitializationContext initContext = new 
PythonProcessorInitializationContext() {
-                    @Override
-                    public String getIdentifier() {
-                        return procId;
-                    }
-
-                    @Override
-                    public ComponentLog getLogger() {
-                        return componentLog;
-                    }
-                };
-
-                processorBridge.initialize(initContext);
+                tempComponent = pythonBridge.createProcessor(procId, type, 
bundleCoordinate.getVersion(), false);
             } else {
                 final Class<?> componentClass = Class.forName(classType, true, 
bundleClassLoader);
                 tempComponent = (ConfigurableComponent) 
componentClass.getDeclaredConstructor().newInstance();
diff --git 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
index faa1afdd2d..64392717c8 100644
--- 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
+++ 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.py4j;
 
+import org.apache.nifi.components.AsyncLoadedProcessor;
 import org.apache.nifi.python.BoundObjectCounts;
 import org.apache.nifi.python.ControllerServiceTypeLookup;
 import org.apache.nifi.python.PythonBridge;
@@ -24,8 +25,12 @@ import 
org.apache.nifi.python.PythonBridgeInitializationContext;
 import org.apache.nifi.python.PythonController;
 import org.apache.nifi.python.PythonProcessConfig;
 import org.apache.nifi.python.PythonProcessorDetails;
+import org.apache.nifi.python.processor.FlowFileTransform;
+import org.apache.nifi.python.processor.FlowFileTransformProxy;
 import org.apache.nifi.python.processor.PythonProcessorAdapter;
 import org.apache.nifi.python.processor.PythonProcessorBridge;
+import org.apache.nifi.python.processor.RecordTransform;
+import org.apache.nifi.python.processor.RecordTransformProxy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 public class StandardPythonBridge implements PythonBridge {
@@ -89,8 +95,7 @@ public class StandardPythonBridge implements PythonBridge {
         controllerProcess.getController().discoverExtensions(extensionsDirs, 
workDirPath);
     }
 
-    @Override
-    public PythonProcessorBridge createProcessor(final String identifier, 
final String type, final String version, final boolean preferIsolatedProcess) {
+    private PythonProcessorBridge createProcessorBridge(final String 
identifier, final String type, final String version, final boolean 
preferIsolatedProcess) {
         ensureStarted();
 
         logger.debug("Creating Python Processor of type {}", type);
@@ -127,6 +132,25 @@ public class StandardPythonBridge implements PythonBridge {
         return processorBridge;
     }
 
+    @Override
+    public AsyncLoadedProcessor createProcessor(final String identifier, final 
String type, final String version, final boolean preferIsolatedProcess) {
+        final PythonProcessorDetails processorDetails = 
getProcessorTypes().stream()
+            .filter(details -> details.getProcessorType().equals(type))
+            .findFirst()
+            .orElseThrow(() -> new IllegalArgumentException("Unknown Python 
Processor type: " + type));
+
+        final String implementedInterface = processorDetails.getInterface();
+        final Supplier<PythonProcessorBridge> processorBridgeFactory = () -> 
createProcessorBridge(identifier, type, version, preferIsolatedProcess);
+
+        if (FlowFileTransform.class.getName().equals(implementedInterface)) {
+            return new FlowFileTransformProxy(type, processorBridgeFactory);
+        }
+        if (RecordTransform.class.getName().equals(implementedInterface)) {
+            return new RecordTransformProxy(type, processorBridgeFactory);
+        }
+        return null;
+    }
+
     @Override
     public synchronized void onProcessorRemoved(final String identifier, final 
String type, final String version) {
         final ExtensionId extensionId = new ExtensionId(type, version);
diff --git 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
index 7ae9cef081..ba9d95fe01 100644
--- 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
+++ 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
@@ -17,24 +17,16 @@
 
 package org.apache.nifi.py4j;
 
-import org.apache.nifi.processor.Processor;
 import org.apache.nifi.python.PythonController;
 import org.apache.nifi.python.PythonProcessorDetails;
-import org.apache.nifi.python.processor.FlowFileTransform;
-import org.apache.nifi.python.processor.FlowFileTransformProxy;
 import org.apache.nifi.python.processor.PythonProcessorAdapter;
 import org.apache.nifi.python.processor.PythonProcessorBridge;
 import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
-import org.apache.nifi.python.processor.PythonProcessorProxy;
-import org.apache.nifi.python.processor.RecordTransform;
-import org.apache.nifi.python.processor.RecordTransformProxy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
 
 import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
 
@@ -45,7 +37,6 @@ public class StandardPythonProcessorBridge implements 
PythonProcessorBridge {
     private final ProcessorCreationWorkflow creationWorkflow;
     private final PythonProcessorDetails processorDetails;
     private volatile PythonProcessorAdapter adapter;
-    private final PythonProcessorProxy proxy;
     private final File workingDir;
     private final File moduleFile;
     private volatile long lastModified;
@@ -60,8 +51,6 @@ public class StandardPythonProcessorBridge implements 
PythonProcessorBridge {
         this.workingDir = builder.workDir;
         this.moduleFile = builder.moduleFile;
         this.lastModified = this.moduleFile.lastModified();
-
-        this.proxy = createProxy();
     }
 
     @Override
@@ -70,46 +59,35 @@ public class StandardPythonProcessorBridge implements 
PythonProcessorBridge {
     }
 
     @Override
-    public Future<Void> initialize(final PythonProcessorInitializationContext 
context) {
+    public void initialize(final PythonProcessorInitializationContext context) 
{
         this.initializationContext = context;
 
         final String threadName = "Initialize Python Processor %s 
(%s)".formatted(initializationContext.getIdentifier(), getProcessorType());
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-
-        Thread.ofVirtual().name(threadName).start(() -> 
initializePythonSide(future));
-        return future;
+        Thread.ofVirtual().name(threadName).start(this::initializePythonSide);
     }
 
     public LoadState getLoadState() {
         return loadState;
     }
 
-    private void initializePythonSide(final CompletableFuture<Void> future) {
+    private void initializePythonSide() {
         try {
-            try {
-                creationWorkflow.downloadDependencies();
-                loadState = LoadState.LOADING_PROCESSOR_CODE;
-            } catch (final Exception e) {
-                loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
-                throw e;
-            }
-
-            final PythonProcessorAdapter pythonProcessorAdapter;
-            try {
-                pythonProcessorAdapter = creationWorkflow.createProcessor();
-                pythonProcessorAdapter.initialize(initializationContext);
-                this.adapter = pythonProcessorAdapter;
-                this.proxy.onPythonSideInitialized(pythonProcessorAdapter);
-
-                loadState = LoadState.FINISHED_LOADING;
-            } catch (final Exception e) {
-                loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;
-                throw e;
-            }
+            creationWorkflow.downloadDependencies();
+            loadState = LoadState.LOADING_PROCESSOR_CODE;
+        } catch (final Exception e) {
+            loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
+            throw e;
+        }
 
-            future.complete(null);
-        } catch (final Throwable t) {
-            future.completeExceptionally(t);
+        final PythonProcessorAdapter pythonProcessorAdapter;
+        try {
+            pythonProcessorAdapter = creationWorkflow.createProcessor();
+            pythonProcessorAdapter.initialize(initializationContext);
+            this.adapter = pythonProcessorAdapter;
+            loadState = LoadState.FINISHED_LOADING;
+        } catch (final Exception e) {
+            loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;
+            throw e;
         }
     }
 
@@ -118,11 +96,6 @@ public class StandardPythonProcessorBridge implements 
PythonProcessorBridge {
         return processorDetails.getProcessorType();
     }
 
-    @Override
-    public Processor getProcessorProxy() {
-        return proxy;
-    }
-
     @Override
     public boolean reload() {
         if (moduleFile.lastModified() <= lastModified) {
@@ -131,24 +104,12 @@ public class StandardPythonProcessorBridge implements 
PythonProcessorBridge {
         }
 
         controller.reloadProcessor(getProcessorType(), 
processorDetails.getProcessorVersion(), workingDir.getAbsolutePath());
-        initializePythonSide(new CompletableFuture<>());
+        initializePythonSide();
         lastModified = moduleFile.lastModified();
 
         return true;
     }
 
-    private PythonProcessorProxy createProxy() {
-        final String implementedInterface = processorDetails.getInterface();
-        if (FlowFileTransform.class.getName().equals(implementedInterface)) {
-            return new FlowFileTransformProxy(this);
-        }
-        if (RecordTransform.class.getName().equals(implementedInterface)) {
-            return new RecordTransformProxy(this);
-        }
-
-        throw new IllegalArgumentException("Python Processor does not 
implement any of the valid interfaces. Interface implemented: " + 
implementedInterface);
-    }
-
 
     public static class Builder {
         private PythonController controller;
diff --git 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
index d7f46cda67..0af61d738a 100644
--- 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
+++ 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
@@ -29,21 +29,20 @@ import py4j.Py4JNetworkException;
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Supplier;
 
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class FlowFileTransformProxy extends PythonProcessorProxy {
 
-    private final PythonProcessorBridge bridge;
     private volatile FlowFileTransform transform;
 
-    public FlowFileTransformProxy(final PythonProcessorBridge bridge) {
-        super(bridge);
-        this.bridge = bridge;
+    public FlowFileTransformProxy(final String processorType, final 
Supplier<PythonProcessorBridge> bridgeFactory) {
+        super(processorType, bridgeFactory);
     }
 
-
     @OnScheduled
     public void setContext(final ProcessContext context) {
+        final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new 
IllegalStateException(this + " is not finished initializing"));
         final Optional<PythonProcessorAdapter> optionalAdapter = 
bridge.getProcessorAdapter();
         if (optionalAdapter.isEmpty()) {
             throw new IllegalStateException(this + " is not finished 
initializing");
diff --git 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
index dfcf5fd8c5..1b9383819e 100644
--- 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
+++ 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
@@ -26,8 +26,10 @@ import org.apache.nifi.components.AsyncLoadedProcessor;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 
 import java.util.Collection;
@@ -38,15 +40,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Supplier;
 
 @SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
 @SupportsSensitiveDynamicProperties
 public abstract class PythonProcessorProxy extends AbstractProcessor 
implements AsyncLoadedProcessor {
-    private final PythonProcessorBridge bridge;
+    private final String processorType;
+    private volatile PythonProcessorInitializationContext initContext;
+    private volatile PythonProcessorBridge bridge;
     private volatile Set<Relationship> cachedRelationships = null;
     private volatile List<PropertyDescriptor> cachedPropertyDescriptors = null;
     private volatile Map<String, PropertyDescriptor> cachedDynamicDescriptors 
= null;
-    private volatile boolean supportsDynamicProperties;
+    private volatile Boolean supportsDynamicProperties;
 
     protected static final Relationship REL_ORIGINAL = new 
Relationship.Builder()
         .name("original")
@@ -61,16 +66,55 @@ public abstract class PythonProcessorProxy extends 
AbstractProcessor implements
         REL_ORIGINAL,
         REL_FAILURE);
 
-    public PythonProcessorProxy(final PythonProcessorBridge bridge) {
-        this.bridge = bridge;
+    public PythonProcessorProxy(final String processorType, final 
Supplier<PythonProcessorBridge> bridgeFactory) {
+        this.processorType = processorType;
+
+        Thread.ofVirtual().name("Initialize " + processorType).start(() -> {
+            this.bridge = bridgeFactory.get();
+
+            // If initialization context has already been set, initialize 
bridge.
+            final PythonProcessorInitializationContext pythonInitContext = 
initContext;
+            if (pythonInitContext != null) {
+                this.bridge.initialize(pythonInitContext);
+            }
+        });
     }
 
-    public void onPythonSideInitialized(final PythonProcessorAdapter adapter) {
-        supportsDynamicProperties = adapter.isDynamicPropertySupported();
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);
+
+        final PythonProcessorInitializationContext initContext = new 
PythonProcessorInitializationContext() {
+            @Override
+            public String getIdentifier() {
+                return context.getIdentifier();
+            }
+
+            @Override
+            public ComponentLog getLogger() {
+                return context.getLogger();
+            }
+        };
+
+        this.initContext = initContext;
+
+        // If Bridge has already been set, initialize it.
+        final PythonProcessorBridge bridge = this.bridge;
+        if (bridge != null) {
+            bridge.initialize(initContext);
+        }
+    }
+
+    protected Optional<PythonProcessorBridge> getBridge() {
+        return Optional.ofNullable(this.bridge);
     }
 
     @Override
     public LoadState getState() {
+        if (bridge == null) {
+            return LoadState.INITIALIZING_ENVIRONMENT;
+        }
+
         return bridge.getLoadState();
     }
 
@@ -80,6 +124,10 @@ public abstract class PythonProcessorProxy extends 
AbstractProcessor implements
             return this.cachedPropertyDescriptors;
         }
 
+        if (bridge == null) {
+            return Collections.emptyList();
+        }
+
         final Optional<PythonProcessorAdapter> optionalAdapter = 
bridge.getProcessorAdapter();
         if (optionalAdapter.isEmpty()) {
             // If we don't have the adapter yet, use whatever is cached, even 
if it's old, or an empty List if we have nothing cached.
@@ -99,6 +147,14 @@ public abstract class PythonProcessorProxy extends 
AbstractProcessor implements
 
     @Override
     protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        if (bridge == null) {
+            return List.of(new ValidationResult.Builder()
+                .subject("Processor")
+                .explanation("Python environment is not yet initialized")
+                .valid(false)
+                .build());
+        }
+
         final LoadState loadState = bridge.getLoadState();
         if (loadState == LoadState.LOADING_PROCESSOR_CODE || loadState == 
LoadState.DOWNLOADING_DEPENDENCIES) {
             return List.of(new ValidationResult.Builder()
@@ -144,6 +200,10 @@ public abstract class PythonProcessorProxy extends 
AbstractProcessor implements
             return cachedDynamicDescriptors.get(propertyDescriptorName);
         }
 
+        if (bridge == null) {
+            return null;
+        }
+
         try {
             final Optional<PythonProcessorAdapter> optionalAdapter = 
bridge.getProcessorAdapter();
             return optionalAdapter.map(adapter -> 
adapter.getSupportedDynamicPropertyDescriptor(propertyDescriptorName))
@@ -155,7 +215,18 @@ public abstract class PythonProcessorProxy extends 
AbstractProcessor implements
     }
 
     protected boolean isSupportsDynamicPropertyDescriptor() {
-        return supportsDynamicProperties;
+        if (this.supportsDynamicProperties != null) {
+            return supportsDynamicProperties;
+        }
+
+        if (bridge == null) {
+            return false;
+        }
+
+        final Optional<PythonProcessorAdapter> adapter = 
bridge.getProcessorAdapter();
+        final boolean supported = 
adapter.map(PythonProcessorAdapter::isDynamicPropertySupported).orElse(false);
+        supportsDynamicProperties = supported;
+        return supported;
     }
 
     @OnScheduled
@@ -193,6 +264,10 @@ public abstract class PythonProcessorProxy extends 
AbstractProcessor implements
     }
 
     private Set<Relationship> fetchRelationshipsFromPythonProcessor() {
+        if (bridge == null) {
+            return Collections.emptySet();
+        }
+
         Set<Relationship> processorRelationships;
         try {
             processorRelationships = bridge.getProcessorAdapter()
@@ -210,6 +285,10 @@ public abstract class PythonProcessorProxy extends 
AbstractProcessor implements
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
+        if (bridge == null) {
+            throw new IllegalStateException("Processor is not yet 
initialized");
+        }
+
         reload();
         bridge.getProcessorAdapter()
             .orElseThrow(() -> new IllegalStateException("Processor has not 
finished initializing"))
@@ -218,6 +297,10 @@ public abstract class PythonProcessorProxy extends 
AbstractProcessor implements
 
     @OnStopped
     public void onStopped(final ProcessContext context) {
+        if (bridge == null) {
+            throw new IllegalStateException("Processor is not yet 
initialized");
+        }
+
         bridge.getProcessorAdapter()
             .orElseThrow(() -> new IllegalStateException("Processor has not 
finished initializing"))
             .onStopped(context);
@@ -225,10 +308,14 @@ public abstract class PythonProcessorProxy extends 
AbstractProcessor implements
 
     @Override
     public String toString() {
-        return "PythonProcessor[type=" + bridge.getProcessorType() + ", id=" + 
getIdentifier() + "]";
+        return "PythonProcessor[type=" + processorType + ", id=" + 
getIdentifier() + "]";
     }
 
     private void reload() {
+        if (bridge == null) {
+            return;
+        }
+
         final boolean reloaded = bridge.reload();
         if (reloaded) {
             getLogger().info("Successfully reloaded Processor");
diff --git 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
index 93f140b052..ecae852962 100644
--- 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
+++ 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
@@ -56,10 +56,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Supplier;
 
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class RecordTransformProxy extends PythonProcessorProxy {
-    private final PythonProcessorBridge bridge;
     private volatile RecordTransform transform;
 
     static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
@@ -78,9 +78,8 @@ public class RecordTransformProxy extends 
PythonProcessorProxy {
         .build();
 
 
-    public RecordTransformProxy(final PythonProcessorBridge bridge) {
-        super(bridge);
-        this.bridge = bridge;
+    public RecordTransformProxy(final String processorType, final 
Supplier<PythonProcessorBridge> bridgeFactory) {
+        super(processorType, bridgeFactory);
     }
 
     @Override
@@ -94,6 +93,8 @@ public class RecordTransformProxy extends 
PythonProcessorProxy {
 
     @OnScheduled
     public void setProcessContext(final ProcessContext context) {
+        final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new 
IllegalStateException(this + " is not finished initializing"));
+
         final Optional<PythonProcessorAdapter> adapterOptional = 
bridge.getProcessorAdapter();
         if (adapterOptional.isEmpty()) {
             throw new IllegalStateException(this + " is not finished 
initializing");
diff --git 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml
index 188ad20cfc..2e272e6451 100644
--- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml
+++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml
@@ -82,7 +82,17 @@
             <artifactId>nifi-record-serialization-service-api</artifactId>
             <scope>test</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
index 5660fb277d..30b7e9e1b4 100644
--- 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
+++ 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
@@ -17,34 +17,28 @@
 
 package org.apache.nifi.py4j;
 
+import org.apache.nifi.components.AsyncLoadedProcessor;
+import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.Validator;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.mock.MockComponentLogger;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.mock.MockProcessorInitializationContext;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.python.ControllerServiceTypeLookup;
 import org.apache.nifi.python.PythonBridge;
 import org.apache.nifi.python.PythonBridgeInitializationContext;
 import org.apache.nifi.python.PythonProcessConfig;
 import org.apache.nifi.python.PythonProcessorDetails;
-import org.apache.nifi.python.processor.EmptyAttributeMap;
 import org.apache.nifi.python.processor.FlowFileTransformProxy;
-import org.apache.nifi.python.processor.PythonProcessorBridge;
-import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
-import org.apache.nifi.python.processor.RecordTransform;
-import org.apache.nifi.python.processor.RecordTransformResult;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockPropertyValue;
 import org.apache.nifi.util.TestRunner;
@@ -54,7 +48,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -74,13 +67,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
@@ -166,7 +156,7 @@ public class PythonControllerInteractionIT {
         final List<PythonProcessorDetails> extensionDetails = 
bridge.getProcessorTypes();
         final List<String> types = extensionDetails.stream()
             .map(PythonProcessorDetails::getProcessorType)
-            .collect(Collectors.toList());
+            .toList();
 
         assertTrue(types.contains(PRETTY_PRINT_JSON));
         assertTrue(types.contains("ConvertCsvToExcel"));
@@ -186,10 +176,7 @@ public class PythonControllerInteractionIT {
         // Create a PrettyPrintJson Processor
         final byte[] jsonContent = 
Files.readAllBytes(Paths.get("src/test/resources/json/input/simple-person.json"));
         for (int i=0; i < 3; i++) {
-            final PythonProcessorBridge prettyPrintJson = 
createProcessor(PRETTY_PRINT_JSON);
-            assertNotNull(prettyPrintJson);
-
-            final FlowFileTransformProxy wrapper = new 
FlowFileTransformProxy(prettyPrintJson);
+            final FlowFileTransformProxy wrapper = 
createFlowFileTransform(PRETTY_PRINT_JSON);
             final TestRunner runner = TestRunners.newTestRunner(wrapper);
 
             runner.enqueue(jsonContent);
@@ -203,11 +190,7 @@ public class PythonControllerInteractionIT {
     @Disabled("Just for manual testing...")
     public void runPrettyPrintJsonManyThreads() throws IOException {
         // Create a PrettyPrintJson Processor
-        final PythonProcessorBridge prettyPrintJson = 
createProcessor(PRETTY_PRINT_JSON);
-        assertNotNull(prettyPrintJson);
-
-        // Setup
-        final FlowFileTransformProxy wrapper = new 
FlowFileTransformProxy(prettyPrintJson);
+        final FlowFileTransformProxy wrapper = 
createFlowFileTransform(PRETTY_PRINT_JSON);
         final TestRunner runner = TestRunners.newTestRunner(wrapper);
 
         final int flowFileCount = 100_000;
@@ -226,12 +209,8 @@ public class PythonControllerInteractionIT {
 
     @Test
     public void testSimplePrettyPrint() throws IOException {
-        // Discover extensions so that they can be created
-        final PythonProcessorBridge prettyPrintJson = 
createProcessor(PRETTY_PRINT_JSON);
-        assertNotNull(prettyPrintJson);
-
         // Setup
-        final FlowFileTransformProxy wrapper = new 
FlowFileTransformProxy(prettyPrintJson);
+        final FlowFileTransformProxy wrapper = 
createFlowFileTransform(PRETTY_PRINT_JSON);
         final TestRunner runner = TestRunners.newTestRunner(wrapper);
         
runner.enqueue(Paths.get("src/test/resources/json/input/simple-person.json"));
         runner.setProperty("Indentation", "2");
@@ -250,8 +229,7 @@ public class PythonControllerInteractionIT {
 
     @Test
     public void testValidator() {
-        final PythonProcessorBridge prettyPrintJson = 
createProcessor(PRETTY_PRINT_JSON);
-        final FlowFileTransformProxy wrapper = new 
FlowFileTransformProxy(prettyPrintJson);
+        final FlowFileTransformProxy wrapper = 
createFlowFileTransform(PRETTY_PRINT_JSON);
         final TestRunner runner = TestRunners.newTestRunner(wrapper);
 
         runner.setProperty("Indentation", "-1");
@@ -273,11 +251,7 @@ public class PythonControllerInteractionIT {
     @Test
     public void testCsvToExcel() {
         // Create a PrettyPrintJson Processor
-        final PythonProcessorBridge csvToExcel = 
createProcessor("ConvertCsvToExcel");
-        assertNotNull(csvToExcel);
-
-        // Setup
-        final FlowFileTransformProxy wrapper = new 
FlowFileTransformProxy(csvToExcel);
+        final FlowFileTransformProxy wrapper = 
createFlowFileTransform("ConvertCsvToExcel");
         final TestRunner runner = TestRunners.newTestRunner(wrapper);
         runner.enqueue("name, number\nJohn Doe, 500");
 
@@ -289,11 +263,8 @@ public class PythonControllerInteractionIT {
 
     @Test
     public void testExpressionLanguageWithAttributes() {
-        final PythonProcessorBridge writeProperty = 
createProcessor("WritePropertyToFlowFile");
-        assertNotNull(writeProperty);
-
         // Setup
-        final FlowFileTransformProxy wrapper = new 
FlowFileTransformProxy(writeProperty);
+        final FlowFileTransformProxy wrapper = 
createFlowFileTransform("WritePropertyToFlowFile");
         final TestRunner runner = TestRunners.newTestRunner(wrapper);
         runner.setProperty("Message", "Hola Mundo");
         runner.enqueue("Hello World");
@@ -308,11 +279,7 @@ public class PythonControllerInteractionIT {
     @Test
     public void testPythonPackage() {
         // Create a WriteNumber Processor
-        final PythonProcessorBridge procBridge = 
createProcessor("WriteNumber");
-        assertNotNull(procBridge);
-
-        // Setup
-        final FlowFileTransformProxy wrapper = new 
FlowFileTransformProxy(procBridge);
+        final FlowFileTransformProxy wrapper = 
createFlowFileTransform("WriteNumber");
         final TestRunner runner = TestRunners.newTestRunner(wrapper);
         runner.enqueue("");
 
@@ -326,6 +293,14 @@ public class PythonControllerInteractionIT {
         assertTrue(resultNum <= 1000);
     }
 
+    private FlowFileTransformProxy createFlowFileTransform(final String type) {
+        final Processor processor = createProcessor(type);
+        assertNotNull(processor);
+
+        processor.initialize(new MockProcessorInitializationContext());
+        return (FlowFileTransformProxy) processor;
+    }
+
     @Test
     public void testImportRequirements() {
         // Discover extensions so that they can be created
@@ -340,12 +315,8 @@ public class PythonControllerInteractionIT {
         assertEquals(1, dependencies.size());
         assertEquals("numpy==1.25.0", dependencies.get(0));
 
-        // Create a PrettyPrintJson Processor
-        final PythonProcessorBridge writeNumPyVersion = 
createProcessor("WriteNumpyVersion");
-        assertNotNull(writeNumPyVersion);
-
         // Setup
-        final FlowFileTransformProxy wrapper = new 
FlowFileTransformProxy(writeNumPyVersion);
+        final FlowFileTransformProxy wrapper = 
createFlowFileTransform("WriteNumpyVersion");
         final TestRunner runner = TestRunners.newTestRunner(wrapper);
         runner.enqueue("Hello World");
 
@@ -359,13 +330,9 @@ public class PythonControllerInteractionIT {
 
     @Test
     public void testControllerService() throws InitializationException {
-        final PythonProcessorBridge processor = 
createProcessor("LookupAddress");
-        assertNotNull(processor);
-
-        controllerServiceMap.put("StringLookupService", 
TestLookupService.class);
-
         // Setup
-        final FlowFileTransformProxy wrapper = new 
FlowFileTransformProxy(processor);
+        controllerServiceMap.put("StringLookupService", 
TestLookupService.class);
+        final FlowFileTransformProxy wrapper = 
createFlowFileTransform("LookupAddress");
         final TestRunner runner = TestRunners.newTestRunner(wrapper);
         final StringLookupService lookupService = new 
TestLookupService((Collections.singletonMap("John Doe", "123 My Street")));
         runner.addControllerService("lookup", lookupService);
@@ -392,13 +359,8 @@ public class PythonControllerInteractionIT {
         // Ensure that we started with "Hello, World" because if the test is 
run multiple times, we may already be starting with the modified version
         replaceFileText(sourceFile, replacement, originalMessage);
 
-        // Create a PrettyPrintJson Processor
-        final PythonProcessorBridge processor = 
createProcessor("WriteMessage");
-        processor.initialize(createInitContext());
-        assertNotNull(processor);
-
         // Setup
-        final FlowFileTransformProxy wrapper = new 
FlowFileTransformProxy(processor);
+        final FlowFileTransformProxy wrapper = 
createFlowFileTransform("WriteMessage");
         final TestRunner runner = TestRunners.newTestRunner(wrapper);
         runner.enqueue("");
 
@@ -465,8 +427,7 @@ public class PythonControllerInteractionIT {
         assertEquals(1, v2Count);
 
         // Create a WriteMessage Processor, version 0.0.1-SNAPSHOT
-        final PythonProcessorBridge procV1 = createProcessor("WriteMessage");
-        final FlowFileTransformProxy wrapperV1 = new 
FlowFileTransformProxy(procV1);
+        final FlowFileTransformProxy wrapperV1 = 
createFlowFileTransform("WriteMessage");
         final TestRunner runnerV1 = TestRunners.newTestRunner(wrapperV1);
         runnerV1.enqueue("");
 
@@ -477,9 +438,8 @@ public class PythonControllerInteractionIT {
         
runnerV1.getFlowFilesForRelationship("success").get(0).assertContentEquals("Hello,
 World");
 
         // Create an instance of WriteMessage V2
-        final PythonProcessorBridge procV2 = createProcessor("WriteMessage", 
"0.0.2-SNAPSHOT");
-        final FlowFileTransformProxy wrapperV2 = new 
FlowFileTransformProxy(procV2);
-        final TestRunner runnerV2 = TestRunners.newTestRunner(wrapperV2);
+        final Processor procV2 = createProcessor("WriteMessage", 
"0.0.2-SNAPSHOT");
+        final TestRunner runnerV2 = TestRunners.newTestRunner(procV2);
         runnerV2.enqueue("");
 
         // Trigger the processor
@@ -490,40 +450,23 @@ public class PythonControllerInteractionIT {
     }
 
     @Test
-    public void testRecordTransformWithDynamicProperties() {
-        // Create a PrettyPrintJson Processor
-        final PythonProcessorBridge processor = 
createProcessor("SetRecordField");
-        assertNotNull(processor);
-
-        // Mock out ProcessContext to reflect that the processor should set 
the 'name' field to 'Jane Doe'
-        final PropertyDescriptor nameDescriptor = new 
PropertyDescriptor.Builder()
-            .name("name")
-            .dynamic(true)
-            .addValidator(Validator.VALID)
-            .build();
-        final PropertyDescriptor numberDescriptor = new 
PropertyDescriptor.Builder()
-            .name("number")
-            .dynamic(true)
-            .addValidator(StandardValidators.INTEGER_VALIDATOR)
-            .build();
-
-        final Map<PropertyDescriptor, String> propertyMap = new HashMap<>();
-        propertyMap.put(nameDescriptor, "Jane Doe");
-        propertyMap.put(numberDescriptor, "8");
-
-        final ProcessContext context = createContext(propertyMap);
+    public void testRecordTransformWithDynamicProperties() throws 
InitializationException {
+        // Create a SetRecordField Processor
+        final TestRunner runner = 
createRecordTransformRunner("SetRecordField");
+        runner.setProperty("name", "Jane Doe");
+        runner.setProperty("number", "8");
 
         // Create a Record to transform and transform it
         final String json = "[{ \"name\": \"John Doe\" }]";
-        final RecordSchema schema = createSimpleRecordSchema("name");
-        final RecordTransform recordTransform = (RecordTransform) 
processor.getProcessorAdapter().get().getProcessor();
-        recordTransform.setContext(context);
-        final RecordTransformResult result = 
recordTransform.transformRecord(json, schema, new EmptyAttributeMap()).get(0);
+        runner.enqueue(json);
+        runner.run();
+        runner.assertTransferCount("original", 1);
+        runner.assertTransferCount("success", 1);
 
         // Verify the results
-        assertEquals("success", result.getRelationship());
-        assertNull(result.getSchema());
-        assertEquals("{\"name\": \"Jane Doe\", \"number\": \"8\"}", 
result.getRecordJson());
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship("success").get(0);
+        out.assertContentEquals("""
+            [{"name":"Jane Doe","number":"8"}]""");
     }
 
     private ProcessContext createContext(final Map<PropertyDescriptor, String> 
propertyValues) {
@@ -543,72 +486,44 @@ public class PythonControllerInteractionIT {
         return context;
     }
 
-    @Test
-    public void testRecordTransformWithInnerRecord() {
-        // Create a PrettyPrintJson Processor
-        final PythonProcessorBridge processor = 
createProcessor("SetRecordField");
+    private TestRunner createRecordTransformRunner(final String type) throws 
InitializationException {
+        final Processor processor = createProcessor("SetRecordField");
         assertNotNull(processor);
 
-        // Mock out ProcessContext to reflect that the processor should set 
the 'name' field to 'Jane Doe'
-        final PropertyDescriptor nameDescriptor = new 
PropertyDescriptor.Builder()
-            .name("name")
-            .dynamic(true)
-            .addValidator(Validator.VALID)
-            .build();
-
-        final Map<PropertyDescriptor, String> propertyMap = new HashMap<>();
-        propertyMap.put(nameDescriptor, "Jane Doe");
-        final ProcessContext context = createContext(propertyMap);
-
-        // Create a Record to transform and transform it
-        final String json = "[{\"name\": \"Jake Doe\", \"father\": { \"name\": 
\"John Doe\" }}]";
-        final RecordSchema recordSchema = createTwoLevelRecord().getSchema();
-        final RecordTransform recordTransform = (RecordTransform) 
processor.getProcessorAdapter().get().getProcessor();
-        recordTransform.setContext(context);
-        final RecordTransformResult result = 
recordTransform.transformRecord(json, recordSchema, new 
EmptyAttributeMap()).get(0);
+        final JsonTreeReader reader = new JsonTreeReader();
+        final JsonRecordSetWriter writer = new JsonRecordSetWriter();
 
-        // Verify the results
-        assertEquals("success", result.getRelationship());
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("reader", reader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(reader);
+        runner.enableControllerService(writer);
+        runner.setProperty("Record Reader", "reader");
+        runner.setProperty("Record Writer", "writer");
 
-        assertEquals("{\"name\": \"Jane Doe\", \"father\": {\"name\": \"John 
Doe\"}}", result.getRecordJson());
+        return runner;
     }
 
-
     @Test
-    public void testLogger() throws ExecutionException, InterruptedException {
-        bridge.discoverExtensions();
-
-        final String procId = createId();
-        // Create the Processor, but we do not use this.createProcessor() 
because we want to explicitly have access to the
-        // initialization context to inject in the logger that we want.
-        final PythonProcessorBridge logContentsBridge = 
bridge.createProcessor(procId, "LogContents", VERSION, true);
-
-        final ComponentLog logger = Mockito.mock(ComponentLog.class);
-        final PythonProcessorInitializationContext initContext = new 
PythonProcessorInitializationContext() {
-            @Override
-            public String getIdentifier() {
-                return procId;
-            }
-
-            @Override
-            public ComponentLog getLogger() {
-                return logger;
-            }
-        };
+    public void testRecordTransformWithInnerRecord() throws 
InitializationException {
+        // Create a SetRecordField Processor
+        final TestRunner runner = 
createRecordTransformRunner("SetRecordField");
+        runner.setProperty("name", "Jane Doe");
 
-        logContentsBridge.initialize(initContext).get();
-
-        final TestRunner runner = 
TestRunners.newTestRunner(logContentsBridge.getProcessorProxy());
-        runner.enqueue("Hello World");
+        // Create a Record to transform and transform it
+        final String json = "[{\"name\": \"Jake Doe\", \"father\": { \"name\": 
\"John Doe\" }}]";
+        runner.enqueue(json);
         runner.run();
 
-        runner.assertTransferCount("original", 1);
+        // Verify the results
         runner.assertTransferCount("success", 1);
-        final ArgumentCaptor<String> argumentCaptor = 
ArgumentCaptor.forClass(String.class);
-        Mockito.verify(logger).info(argumentCaptor.capture());
-        assertEquals("Hello World", argumentCaptor.getValue());
+        runner.assertTransferCount("original", 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship("success").get(0);
+        out.assertContentEquals("""
+            [{"name":"Jane Doe","father":{"name":"John Doe"}}]""");
     }
 
+
     private RecordSchema createSimpleRecordSchema(final String... fieldNames) {
         return createSimpleRecordSchema(Arrays.asList(fieldNames));
     }
@@ -623,30 +538,6 @@ public class PythonControllerInteractionIT {
         return schema;
     }
 
-    private Record createSimpleRecord(final Map<String, Object> values) {
-        final List<RecordField> recordFields = new ArrayList<>();
-        for (final Map.Entry<String, Object> entry : values.entrySet()) {
-            final DataType dataType = 
DataTypeUtils.inferDataType(entry.getValue(), 
RecordFieldType.STRING.getDataType());
-            recordFields.add(new RecordField(entry.getKey(), dataType, true));
-        }
-
-        final RecordSchema schema = new SimpleRecordSchema(recordFields);
-        return new MapRecord(schema, values);
-    }
-
-    private Record createTwoLevelRecord() {
-        final Map<String, Object> innerPersonValues = new HashMap<>();
-        innerPersonValues.put("name", "Jake Doe");
-        final Record innerPersonRecord = createSimpleRecord(innerPersonValues);
-
-        final Map<String, Object> outerPersonValues = new HashMap<>();
-        outerPersonValues.put("name", "John Doe");
-        outerPersonValues.put("father", innerPersonRecord);
-        final Record outerPersonRecord = createSimpleRecord(outerPersonValues);
-
-        return outerPersonRecord;
-    }
-
 
     public interface StringLookupService extends ControllerService {
         Optional<String> lookup(Map<String, String> coordinates);
@@ -670,37 +561,40 @@ public class PythonControllerInteractionIT {
         return UUID.randomUUID().toString();
     }
 
-    private PythonProcessorBridge createProcessor(final String type) {
+    private Processor createProcessor(final String type) {
         return createProcessor(type, VERSION);
     }
 
-    private PythonProcessorBridge createProcessor(final String type, final 
String version) {
+    private Processor createProcessor(final String type, final String version) 
{
         bridge.discoverExtensions();
-        final PythonProcessorBridge processor = 
bridge.createProcessor(createId(), type, version, true);
-        final Future<Void> future = processor.initialize(createInitContext());
-
-        try {
-            future.get();
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException(e.getCause());
-        }
+        final AsyncLoadedProcessor processor = 
bridge.createProcessor(createId(), type, version, true);
 
-        return processor;
-    }
+        final ProcessorInitializationContext initContext = new 
MockProcessorInitializationContext();
+        processor.initialize(initContext);
 
-    private PythonProcessorInitializationContext createInitContext() {
-        return new PythonProcessorInitializationContext() {
-            @Override
-            public String getIdentifier() {
-                return "unit-test-id";
+        final long maxInitTime = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(30L);
+        while (true) {
+            final LoadState state = processor.getState();
+            if (state == LoadState.FINISHED_LOADING) {
+                break;
+            }
+            if (state == LoadState.DEPENDENCY_DOWNLOAD_FAILED || state == 
LoadState.LOADING_PROCESSOR_CODE_FAILED) {
+                throw new RuntimeException("Failed to initialize processor of 
type %s version %s".formatted(type, version));
             }
 
-            @Override
-            public ComponentLog getLogger() {
-                return new MockComponentLogger();
+            if (System.currentTimeMillis() > maxInitTime) {
+                throw new RuntimeException("Timed out waiting for processor of 
type %s version %s to initialize".formatted(type, version));
             }
-        };
+
+            try {
+                Thread.sleep(10L);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("Interrupted while initializing 
processor of type %s version %s".formatted(type, version));
+            }
+        }
+        processor.initialize(new MockProcessorInitializationContext());
+        return processor;
     }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
index f33b62bc81..2911a84839 100644
--- 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
+++ 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
@@ -17,7 +17,7 @@
 
 package org.apache.nifi.python;
 
-import org.apache.nifi.python.processor.PythonProcessorBridge;
+import org.apache.nifi.components.AsyncLoadedProcessor;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -64,7 +64,7 @@ public class DisabledPythonBridge implements PythonBridge {
     }
 
     @Override
-    public PythonProcessorBridge createProcessor(final String identifier, 
final String type, final String version, final boolean preferIsolatedProcess) {
+    public AsyncLoadedProcessor createProcessor(final String identifier, final 
String type, final String version, final boolean preferIsolatedProcess) {
         throw new UnsupportedOperationException("Cannot create Processor of 
type " + type + " because Python extensions are disabled");
     }
 
diff --git 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
index dd15d75740..20d2c1f180 100644
--- 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
+++ 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
@@ -17,7 +17,7 @@
 
 package org.apache.nifi.python;
 
-import org.apache.nifi.python.processor.PythonProcessorBridge;
+import org.apache.nifi.components.AsyncLoadedProcessor;
 
 import java.io.IOException;
 import java.util.List;
@@ -88,8 +88,7 @@ public interface PythonBridge {
     void discoverExtensions();
 
     /**
-     * Creates a Processor with the given identifier, type, and version. Then 
returns a PythonProcessorBridge that provides access to all
-     * necessary information and objects for interacting with this Processor 
from the Java side.
+     * Creates a Processor with the given identifier, type, and version.
      *
      * @param identifier the Processor's identifier
      * @param type the Processor's type
@@ -97,7 +96,7 @@ public interface PythonBridge {
      * @param preferIsolatedProcess whether or not to prefer launching a 
Python Process that is isolated for just this one instance of the Processor
      * @return a PythonProcessorBridge that can be used for interacting with 
the Processor
      */
-    PythonProcessorBridge createProcessor(String identifier, String type, 
String version, boolean preferIsolatedProcess);
+    AsyncLoadedProcessor createProcessor(String identifier, String type, 
String version, boolean preferIsolatedProcess);
 
     /**
      * A notification that the Processor with the given identifier, type, and 
version was removed from the flow. This triggers the bridge
diff --git 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java
 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java
index 7d08bdfae3..0a5b1af2c9 100644
--- 
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java
+++ 
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java
@@ -18,10 +18,8 @@
 package org.apache.nifi.python.processor;
 
 import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
-import org.apache.nifi.processor.Processor;
 
 import java.util.Optional;
-import java.util.concurrent.Future;
 
 /**
  * A model object that is used to bridge the gap between what is necessary for 
the Framework to interact
@@ -34,12 +32,6 @@ public interface PythonProcessorBridge {
      */
     Optional<PythonProcessorAdapter> getProcessorAdapter();
 
-    /**
-     * @return a proxy for the actual Processor implementation that will 
trigger the appropriate method on the Python side, or an empty Optional
-     * if the processor/adapter have not yet been initialized
-     */
-    Processor getProcessorProxy();
-
     /**
      * @return the name of the Processor implementation. This will not contain 
a 'python.' prefix.
      */
@@ -57,7 +49,7 @@ public interface PythonProcessorBridge {
      * Initializes the Processor
      * @param context the initialization context
      */
-    Future<Void> initialize(PythonProcessorInitializationContext context);
+    void initialize(PythonProcessorInitializationContext context);
 
     /**
      * @return the current state of the Processor loading

Reply via email to