markap14 commented on code in PR #7003:
URL: https://github.com/apache/nifi/pull/7003#discussion_r1164316855


##########
nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py:
##########
@@ -0,0 +1,531 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import importlib
+import sys
+import importlib.util  # Note requires Python 3.4+
+import inspect
+import logging
+import subprocess
+import ast
+import pkgutil
+from pathlib import Path
+
+logger = logging.getLogger("org.apache.nifi.py4j.ExtensionManager")
+
+# A simple wrapper class to encompass a processor type and its version
+class ExtensionId:
+    def __init__(self, classname=None, version=None):
+        self.classname = classname
+        self.version = version
+
+    def __hash__(self):
+        return hash((self.classname, self.version))
+
+    def __eq__(self, other):
+        return (self.classname, self.version) == (other.classname, 
other.version)
+
+
+class ExtensionDetails:
+    class Java:
+        implements = ['org.apache.nifi.python.PythonProcessorDetails']
+
+    def __init__(self, gateway, type, version='Unknown', dependencies=None, 
source_location=None, package_name=None, description=None, tags=None):
+        self.gateway = gateway
+        if dependencies is None:
+            dependencies = []
+        if tags is None:
+            tags = []
+
+        self.type = type
+        self.version = version
+        self.dependencies = dependencies
+        self.source_location = source_location
+        self.package_name = package_name
+        self.description = description
+        self.tags = tags
+
+    def getProcessorType(self):
+        return self.type
+
+    def getProcessorVersion(self):
+        return self.version
+
+    def getSourceLocation(self):
+        return self.source_location
+
+    def getPyPiPackageName(self):
+        return self.package_name
+
+    def getDependencies(self):
+        list = self.gateway.jvm.java.util.ArrayList()
+        for dep in self.dependencies:
+            list.add(dep)
+
+        return list
+
+    def getCapabilityDescription(self):
+        return self.description
+
+    def getTags(self):
+        list = self.gateway.jvm.java.util.ArrayList()
+        for tag in self.tags:
+            list.add(tag)
+
+        return list
+
+
+
+
+class ExtensionManager:
+    """
+    ExtensionManager is responsible for discovery of extensions types and the 
lifecycle management of those extension types.
+    Discovery of extension types includes finding what extension types are 
available
+    (e.g., which Processor types exist on the system), as well as information 
about those extension types, such as
+    the extension's documentation (tags and capability description).
+
+    Lifecycle management includes determining the third-party dependencies 
that an extension has and ensuring that those
+    third-party dependencies have been imported.
+    """
+
+    processorInterfaces = 
['org.apache.nifi.python.processor.FlowFileTransform', 
'org.apache.nifi.python.processor.RecordTransform']
+    processor_details = {}
+    processor_class_by_name = {}
+    module_files_by_extension_type = {}
+    dependency_directories = {}
+
+    def __init__(self, gateway):
+        self.gateway = gateway
+
+
+    def getProcessorTypes(self):
+        """
+        :return: a list of Processor types that have been discovered by the 
#discoverExtensions method
+        """
+        return self.processor_details.values()
+
+    def getProcessorClass(self, type, version, work_dir):
+        """
+        Returns the Python class that can be used to instantiate a processor 
of the given type.
+        Additionally, it ensures that the required third-party dependencies 
are on the system path in order to ensure that
+        the necessary libraries are available to the Processor so that it can 
be instantiated and used.
+
+        :param type: the type of Processor
+        :param version: the version of the Processor
+        :param work_dir: the working directory for extensions
+        :return: the Python class that can be used to instantiate a Processor 
of the given type and version
+
+        :raises ValueError: if there is no known Processor with the given type 
and version
+        """
+        id = ExtensionId(classname=type, version=version)
+        if id in self.processor_class_by_name:
+            return self.processor_class_by_name[id]
+
+        if id not in self.module_files_by_extension_type:
+            raise ValueError('Invalid Processor Type: No module is known to 
contain Processor of type ' + type + ' version ' + version)
+        module_file = self.module_files_by_extension_type[id]
+
+        if id in self.processor_details:
+            extension_working_dir = os.path.join(work_dir, 'extensions', type, 
version)
+            sys.path.insert(0, extension_working_dir)
+
+        details = self.processor_details[id]
+        processor_class = self.__load_extension_module__(module_file, 
details.local_dependencies)
+        self.processor_class_by_name[id] = processor_class
+        return processor_class
+
+
+    def reload_processor(self, processor_type, version, work_dir):
+        """
+        Reloads the class definition for the given processor type. This is 
used in order to ensure that any changes that have
+        been made to the Processor are reloaded and will take effect.
+
+        :param processor_type: the type of the processor whose class 
definition should be reloaded
+        :param version: the version of the processor whose class definition 
should be reloaded
+        :param work_dir: the working directory
+        :return: the new class definition
+        """
+        id = ExtensionId(classname=processor_type, version=version)
+
+        # get the python module file that contains the specified processor
+        module_file = self.get_module_file(processor_type, version)
+
+        # Call load_extension to ensure that we load all necessary 
dependencies, in case they have changed
+        self.__gather_extension_details__(module_file, work_dir)
+
+        # Reload the processor class itself
+        details = self.processor_details[id]
+        processor_class = self.__load_extension_module__(module_file, 
details.local_dependencies)
+
+        # Update our cache so that when the processor is created again, the 
new class will be used
+        self.processor_class_by_name[id] = processor_class
+
+
+    def get_module_file(self, processor_type, version):
+        """
+        Returns the module file that contains the source for the given 
Processor type and version
+        :param processor_type: the Processor type
+        :param version: the version of the Processor
+        :return: the file that contains the source for the given Processor
+
+        :raises ValueError: if no Processor type is known for the given type 
and version
+        """
+        id = ExtensionId(processor_type, version)
+        if id not in self.module_files_by_extension_type:
+            raise ValueError('Invalid Processor Type: No module is known to 
contain Processor of type ' + processor_type + ' version ' + version)
+
+        return self.module_files_by_extension_type[id]
+
+
+    # Discover extensions using the 'prefix' method described in
+    # 
https://packaging.python.org/en/latest/guides/creating-and-discovering-plugins/
+    def discoverExtensions(self, dirs, work_dir):
+        """
+        Discovers any extensions that are available in any of the given 
directories, as well as any extensions that are available
+        from PyPi repositories
+
+        :param dirs: the directories to check for any local extensions
+        :param work_dir: the working directory
+        """
+        self.__discover_local_extensions__(dirs, work_dir)
+        self.__discover_extensions_from_pypi__(work_dir)
+
+    def __discover_extensions_from_pypi__(self, work_dir):

Review Comment:
   Yeah, agreed. I had misunderstood that __method__ was the standard 
convention for private methods, but it should just be __method with the 
__method__ reserved for 'magic' methods. Will update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to