NIFI-2909 Adding per-instance class loading capability through @RequiresInstanceClassLoading annotation NIFI-1712 Applying per-instance class loading to HBaseClientService to allow specifying Phoenix Client JAR
-Refactoring the ClassLoading so that every processor, controller service, and reporting task gets an InstanceClassLoader with a parent of the NAR ClassLoader, and only components with @RequiresInstanceClassLoading will make a copy of the NAR ClassLoader resources, and addressing some review feedback This closes #1156 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d1d05372 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d1d05372 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d1d05372 Branch: refs/heads/master Commit: d1d053725b72d91fbfca1a2e86691e9c9a1a3f2f Parents: 2f0d9a3 Author: Bryan Bende <bbe...@apache.org> Authored: Mon Oct 10 09:27:57 2016 -0400 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Tue Nov 8 13:21:27 2016 -0500 ---------------------------------------------------------------------- .../behavior/RequiresInstanceClassLoading.java | 42 +++ .../nifi/components/PropertyDescriptor.java | 28 ++ .../org/apache/nifi/util/file/FileUtils.java | 1 + .../util/file/classloader/ClassLoaderUtils.java | 79 ++++-- .../file/classloader/TestClassLoaderUtils.java | 41 +++ .../src/main/asciidoc/developer-guide.adoc | 52 ++++ .../init/ControllerServiceInitializer.java | 9 +- .../init/ProcessorInitializer.java | 8 +- .../init/ReportingTaskingInitializer.java | 8 +- .../controller/AbstractConfiguredComponent.java | 209 +++++++++----- .../nifi/controller/ConfiguredComponent.java | 21 +- .../apache/nifi/controller/ProcessorNode.java | 9 +- .../apache/nifi/controller/FlowController.java | 60 ++-- .../controller/StandardFlowSynchronizer.java | 23 +- .../nifi/controller/StandardProcessorNode.java | 39 +-- .../reporting/AbstractReportingTaskNode.java | 33 +-- .../reporting/StandardReportingTaskNode.java | 11 +- .../scheduling/EventDrivenSchedulingAgent.java | 8 +- .../scheduling/StandardProcessScheduler.java | 6 +- .../service/ControllerServiceLoader.java | 15 +- .../service/StandardControllerServiceNode.java | 36 +-- .../StandardControllerServiceProvider.java | 16 +- .../tasks/ContinuallyRunConnectableTask.java | 4 +- .../tasks/ContinuallyRunProcessorTask.java | 2 +- .../controller/tasks/ReportingTaskWrapper.java | 4 +- .../nifi/groups/StandardProcessGroup.java | 8 +- .../controller/TestStandardProcessorNode.java | 275 ++++++++++++++++++- .../scheduling/TestProcessorLifecycle.java | 37 +-- .../TestStandardProcessScheduler.java | 18 +- .../TestStandardControllerServiceProvider.java | 76 ++--- .../service/util/TestControllerService.java | 2 +- .../ModifiesClasspathNoAnnotationProcessor.java | 50 ++++ .../processors/ModifiesClasspathProcessor.java | 50 ++++ .../org.apache.nifi.processor.Processor | 16 ++ .../TestClasspathResources/resource1.txt | 15 + .../TestClasspathResources/resource2.txt | 15 + .../TestClasspathResources/resource3.txt | 15 + .../src/test/resources/logback-test.xml | 7 +- .../org/apache/nifi/nar/ExtensionManager.java | 82 ++++++ .../apache/nifi/nar/InstanceClassLoader.java | 147 ++++++++++ .../java/org/apache/nifi/nar/NarCloseable.java | 20 +- .../nifi/web/controller/ControllerFacade.java | 2 +- .../dao/impl/StandardControllerServiceDAO.java | 10 +- .../nifi/web/dao/impl/StandardProcessorDAO.java | 10 +- .../web/dao/impl/StandardReportingTaskDAO.java | 10 +- .../nifi/controller/MonitorMemoryTest.java | 14 +- .../apache/nifi/hbase/HBaseClientService.java | 8 + .../nifi/hbase/HBase_1_1_2_ClientService.java | 5 +- 48 files changed, 1280 insertions(+), 376 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/RequiresInstanceClassLoading.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/RequiresInstanceClassLoading.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/RequiresInstanceClassLoading.java new file mode 100644 index 0000000..f7566a6 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/RequiresInstanceClassLoading.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.behavior; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a component can use to indicate that the framework should create a new ClassLoader + * for each instance of the component, copying all resources from the component's NARClassLoader to a + * new ClassLoader which will only be used by a given instance of the component. + * + * This annotation is typically used when a component has one or more PropertyDescriptors which set + * dynamicallyModifiesClasspath(boolean) to true. + * + * When this annotation is used it is important to note that each added instance of the component will increase + * the overall memory footprint more than that of a component without this annotation. + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface RequiresInstanceClassLoading { +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java index 532a034..1299d3d 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java @@ -79,6 +79,11 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor> * Language */ private final boolean expressionLanguageSupported; + /** + * indicates whether or not this property represents resources that should be added + * to the classpath for this instance of the component + */ + private final boolean dynamicallyModifiesClasspath; /** * the interface of the {@link ControllerService} that this Property refers @@ -102,6 +107,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor> this.required = builder.required; this.sensitive = builder.sensitive; this.dynamic = builder.dynamic; + this.dynamicallyModifiesClasspath = builder.dynamicallyModifiesClasspath; this.expressionLanguageSupported = builder.expressionLanguageSupported; this.controllerServiceDefinition = builder.controllerServiceDefinition; this.validators = new ArrayList<>(builder.validators); @@ -232,6 +238,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor> private boolean sensitive = false; private boolean expressionLanguageSupported = false; private boolean dynamic = false; + private boolean dynamicallyModifiesClasspath = false; private Class<? extends ControllerService> controllerServiceDefinition; private List<Validator> validators = new ArrayList<>(); @@ -244,6 +251,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor> this.required = specDescriptor.required; this.sensitive = specDescriptor.sensitive; this.dynamic = specDescriptor.dynamic; + this.dynamicallyModifiesClasspath = specDescriptor.dynamicallyModifiesClasspath; this.expressionLanguageSupported = specDescriptor.expressionLanguageSupported; this.controllerServiceDefinition = specDescriptor.getControllerServiceDefinition(); this.validators = new ArrayList<>(specDescriptor.validators); @@ -332,6 +340,22 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor> } /** + * Specifies that the value of this property represents one or more resources that the + * framework should add to the classpath of the given component. + * + * NOTE: If a component contains a PropertyDescriptor where dynamicallyModifiesClasspath is set to true, + * the component must also be annotated with @RequiresInstanceClassloading, otherwise the component will be + * considered invalid. + * + * @param dynamicallyModifiesClasspath whether or not this property should be used by the framework to modify the classpath + * @return the builder + */ + public Builder dynamicallyModifiesClasspath(final boolean dynamicallyModifiesClasspath) { + this.dynamicallyModifiesClasspath = dynamicallyModifiesClasspath; + return this; + } + + /** * @param values contrained set of values * @return the builder */ @@ -492,6 +516,10 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor> return expressionLanguageSupported; } + public boolean isDynamicClasspathModifier() { + return dynamicallyModifiesClasspath; + } + public Class<? extends ControllerService> getControllerServiceDefinition() { return controllerServiceDefinition; } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java index ff4da8e..960bc40 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java @@ -591,4 +591,5 @@ public class FileUtils { return digest.digest(); } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java index bc6728c..318d0a7 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java @@ -17,6 +17,8 @@ package org.apache.nifi.util.file.classloader; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FilenameFilter; @@ -24,23 +26,54 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; -import java.util.stream.Collectors; +import java.util.Set; public class ClassLoaderUtils { - public static ClassLoader getCustomClassLoader(String modulePath, ClassLoader parentClassLoader, FilenameFilter filenameFilter) throws MalformedURLException { - // Split and trim the module path(s) - List<String> modules = (modulePath == null) - ? null - : Arrays.stream(modulePath.split(",")).filter(StringUtils::isNotBlank).map(String::trim).collect(Collectors.toList()); + static final Logger logger = LoggerFactory.getLogger(ClassLoaderUtils.class); - URL[] classpaths = getURLsForClasspath(modules, filenameFilter); + public static ClassLoader getCustomClassLoader(String modulePath, ClassLoader parentClassLoader, FilenameFilter filenameFilter) throws MalformedURLException { + URL[] classpaths = getURLsForClasspath(modulePath, filenameFilter, false); return createModuleClassLoader(classpaths, parentClassLoader); } - protected static URL[] getURLsForClasspath(List<String> modulePaths, FilenameFilter filenameFilter) throws MalformedURLException { + /** + * + * @param modulePath a module path to get URLs from, the module path may be a comma-separated list of paths + * @param filenameFilter a filter to apply when a module path is a directory and performs a listing, a null filter will return all matches + * @return an array of URL instances representing all of the modules resolved from processing modulePath + * @throws MalformedURLException if a module path does not exist + */ + public static URL[] getURLsForClasspath(String modulePath, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException { + return getURLsForClasspath(modulePath == null ? Collections.emptySet() : Collections.singleton(modulePath), filenameFilter, suppressExceptions); + } + + /** + * + * @param modulePaths one or modules paths to get URLs from, each module path may be a comma-separated list of paths + * @param filenameFilter a filter to apply when a module path is a directory and performs a listing, a null filter will return all matches + * @param suppressExceptions if true then all modules will attempt to be resolved even if some throw an exception, if false the first exception will be thrown + * @return an array of URL instances representing all of the modules resolved from processing modulePaths + * @throws MalformedURLException if a module path does not exist + */ + public static URL[] getURLsForClasspath(Set<String> modulePaths, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException { + // use LinkedHashSet to maintain the ordering that the incoming paths are processed + Set<String> modules = new LinkedHashSet<>(); + if (modulePaths != null) { + modulePaths.stream() + .flatMap(path -> Arrays.stream(path.split(","))) + .filter(StringUtils::isNotBlank) + .map(String::trim) + .forEach(m -> modules.add(m)); + } + return toURLs(modules, filenameFilter, suppressExceptions); + } + + protected static URL[] toURLs(Set<String> modulePaths, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException { List<URL> additionalClasspath = new LinkedList<>(); if (modulePaths != null) { for (String modulePathString : modulePaths) { @@ -52,23 +85,33 @@ public class ClassLoaderUtils { isUrl = false; } if (!isUrl) { - File modulePath = new File(modulePathString); + try { + File modulePath = new File(modulePathString); - if (modulePath.exists()) { + if (modulePath.exists()) { - additionalClasspath.add(modulePath.toURI().toURL()); + additionalClasspath.add(modulePath.toURI().toURL()); - if (modulePath.isDirectory()) { - File[] files = modulePath.listFiles(filenameFilter); + if (modulePath.isDirectory()) { + File[] files = modulePath.listFiles(filenameFilter); - if (files != null) { - for (File jarFile : files) { - additionalClasspath.add(jarFile.toURI().toURL()); + if (files != null) { + for (File classpathResource : files) { + if (classpathResource.isDirectory()) { + logger.warn("Recursive directories are not supported, skipping " + classpathResource.getAbsolutePath()); + } else { + additionalClasspath.add(classpathResource.toURI().toURL()); + } + } } } + } else { + throw new MalformedURLException("Path specified does not exist"); + } + } catch (MalformedURLException e) { + if (!suppressExceptions) { + throw e; } - } else { - throw new MalformedURLException("Path specified does not exist"); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java index d2826e3..ba85e07 100644 --- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java @@ -18,9 +18,13 @@ package org.apache.nifi.util.file.classloader; import java.io.FilenameFilter; import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashSet; +import java.util.Set; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -79,6 +83,43 @@ public class TestClassLoaderUtils { assertNotNull(ClassLoaderUtils.getCustomClassLoader(jarFilePath, this.getClass().getClassLoader(), getJarFilenameFilter())); } + @Test + public void testGetURLsForClasspathWithDirectory() throws MalformedURLException { + final String jarFilePath = "src/test/resources/TestClassLoaderUtils"; + URL[] urls = ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, false); + assertEquals(2, urls.length); + } + + @Test + public void testGetURLsForClasspathWithSingleJAR() throws MalformedURLException { + final String jarFilePath = "src/test/resources/TestClassLoaderUtils/TestSuccess.jar"; + URL[] urls = ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, false); + assertEquals(1, urls.length); + } + + @Test(expected = MalformedURLException.class) + public void testGetURLsForClasspathWithSomeNonExistentAndNoSuppression() throws MalformedURLException { + final String jarFilePath = "src/test/resources/TestClassLoaderUtils/TestSuccess.jar,src/test/resources/TestClassLoaderUtils/FakeTest.jar"; + ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, false); + } + + @Test + public void testGetURLsForClasspathWithSomeNonExistentAndSuppression() throws MalformedURLException { + final String jarFilePath = "src/test/resources/TestClassLoaderUtils/TestSuccess.jar,src/test/resources/TestClassLoaderUtils/FakeTest.jar"; + URL[] urls = ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, true); + assertEquals(1, urls.length); + } + + @Test + public void testGetURLsForClasspathWithSetAndSomeNonExistentAndSuppression() throws MalformedURLException { + final Set<String> modules = new HashSet<>(); + modules.add("src/test/resources/TestClassLoaderUtils/TestSuccess.jar,src/test/resources/TestClassLoaderUtils/FakeTest1.jar"); + modules.add("src/test/resources/TestClassLoaderUtils/FakeTest2.jar,src/test/resources/TestClassLoaderUtils/FakeTest3.jar"); + + URL[] urls = ClassLoaderUtils.getURLsForClasspath(modules, null, true); + assertEquals(1, urls.length); + } + protected FilenameFilter getJarFilenameFilter(){ return (dir, name) -> name != null && name.endsWith(".jar"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-docs/src/main/asciidoc/developer-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc index 299f510..195b4f1 100644 --- a/nifi-docs/src/main/asciidoc/developer-guide.adoc +++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc @@ -2269,6 +2269,58 @@ API artifacts into the same NAR is often acceptable. +[[per-instance-classloading]] +== Per-Instance ClassLoading + +A component developer may wish to add additional resources to the componentâs classpath at runtime. +For example, you may want to provide the location of a JDBC driver to a processor that interacts with a +relational database, thus allowing the processor to work with any driver rather than trying to bundle a +driver into the NAR. + +This may be accomplished by declaring one or more PropertyDescriptor instances with +`dynamicallyModifiesClasspath` set to true. For example: + + +[source,java] +---- +PropertyDescriptor EXTRA_RESOURCE = new PropertyDescriptor.Builder() + .name("Extra Resources") + .description("The path to one or more resources to add to the classpath.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamicallyModifiesClasspath(true) + .build(); +---- + +When these properties are set on a component, the framework identifies all properties where +`dynamicallyModifiesClasspath` is set to true. For each of these properties, the framework +attempts to resolve filesystem resources from the value of the property. The value may be a +comma-separated list of one or more directories or files, where any paths that do not exist are +skipped. If the resource represents a directory, the directory is listed, and all of the files +in that directory are added to the classpath individually. + +Each property may impose further restrictions on the format of the value through the validators. +For example, using StandardValidators.FILE_EXISTS_VALIDATOR restricts the property to accepting a +single file. Using StandardValidators.NON_EMPTY_VALIDATOR allows any combination of comma-separated +files or directories. + +Resources are added to the instance ClassLoader by adding them to an inner ClassLoader that is always +checked first. Anytime the value of these properties change, the inner ClassLoader is closed and +re-created with the new resources. + +NiFi provides the `@RequiresInstanceClassLoading` annotation to further expand and isolate the libraries +available on a componentâs classpath. You can annotate a component with `@RequiresInstanceClassLoading` +to indicate that the instance ClassLoader for the component requires a copy of all the resources in the +component's NAR ClassLoader. When `@RequiresInstanceClassLoading` is not present, the +instance ClassLoader simply has it's parent ClassLoader set to the NAR ClassLoader, rather than +copying resources. + +Because @RequiresInstanceClassLoading copies resources from the NAR ClassLoader for each instance of the +component, use this capability judiciously. If ten instances of one component are created, all classes +from the component's NAR ClassLoader are loaded into memory ten times. This could eventually increase the +memory footprint significantly when enough instances of the component are created. + + == How to contribute to Apache NiFi http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java index c641afe..90c1e24 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java @@ -19,6 +19,7 @@ package org.apache.nifi.documentation.init; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.documentation.ConfigurableComponentInitializer; import org.apache.nifi.documentation.mock.MockConfigurationContext; import org.apache.nifi.documentation.mock.MockControllerServiceInitializationContext; @@ -38,15 +39,15 @@ public class ControllerServiceInitializer implements ConfigurableComponentInitia @Override public void initialize(ConfigurableComponent component) throws InitializationException { ControllerService controllerService = (ControllerService) component; - - try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { - controllerService.initialize(new MockControllerServiceInitializationContext()); + ControllerServiceInitializationContext context = new MockControllerServiceInitializationContext(); + try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), context.getIdentifier())) { + controllerService.initialize(context); } } @Override public void teardown(ConfigurableComponent component) { - try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { + try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { ControllerService controllerService = (ControllerService) component; final ComponentLog logger = new MockComponentLogger(); http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java index 221f9e5..ae28299 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java @@ -26,6 +26,7 @@ import org.apache.nifi.documentation.util.ReflectionUtils; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.ProcessorInitializationContext; /** * Initializes a Procesor using a MockProcessorInitializationContext @@ -37,15 +38,16 @@ public class ProcessorInitializer implements ConfigurableComponentInitializer { @Override public void initialize(ConfigurableComponent component) { Processor processor = (Processor) component; - try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { - processor.initialize(new MockProcessorInitializationContext()); + ProcessorInitializationContext initializationContext = new MockProcessorInitializationContext(); + try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), initializationContext.getIdentifier())) { + processor.initialize(initializationContext); } } @Override public void teardown(ConfigurableComponent component) { Processor processor = (Processor) component; - try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { + try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { final ComponentLog logger = new MockComponentLogger(); final MockProcessContext context = new MockProcessContext(); http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java index 8233e2e..041ff3e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java @@ -25,6 +25,7 @@ import org.apache.nifi.documentation.mock.MockReportingInitializationContext; import org.apache.nifi.documentation.util.ReflectionUtils; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; /** @@ -37,15 +38,16 @@ public class ReportingTaskingInitializer implements ConfigurableComponentInitial @Override public void initialize(ConfigurableComponent component) throws InitializationException { ReportingTask reportingTask = (ReportingTask) component; - try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { - reportingTask.initialize(new MockReportingInitializationContext()); + ReportingInitializationContext context = new MockReportingInitializationContext(); + try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), context.getIdentifier())) { + reportingTask.initialize(context); } } @Override public void teardown(ConfigurableComponent component) { ReportingTask reportingTask = (ReportingTask) component; - try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { + try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { final MockConfigurationContext context = new MockConfigurationContext(); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, reportingTask, new MockComponentLogger(), context); http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index 6460050..cc9404e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -16,10 +16,27 @@ */ package org.apache.nifi.controller; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.InstanceClassLoader; +import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; + +import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -30,14 +47,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.nifi.components.ConfigurableComponent; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.nar.NarCloseable; - public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent { private final String id; @@ -48,13 +57,17 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone private final AtomicReference<String> annotationData = new AtomicReference<>(); private final String componentType; private final String componentCanonicalClass; + private final VariableRegistry variableRegistry; + private final ComponentLog logger; + private final Lock lock = new ReentrantLock(); private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>(); public AbstractConfiguredComponent(final ConfigurableComponent component, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, - final String componentType, final String componentCanonicalClass) { + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, + final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry, + final ComponentLog logger) { this.id = id; this.component = component; this.validationContextFactory = validationContextFactory; @@ -62,6 +75,8 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone this.name = new AtomicReference<>(component.getClass().getSimpleName()); this.componentType = componentType; this.componentCanonicalClass = componentCanonicalClass; + this.variableRegistry = variableRegistry; + this.logger = logger; } @Override @@ -90,44 +105,69 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone } @Override - public void setProperty(final String name, final String value) { - if (null == name || null == value) { - throw new IllegalArgumentException(); + public void setProperties(Map<String, String> properties) { + if (properties == null) { + return; } lock.lock(); try { verifyModifiable(); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { - final PropertyDescriptor descriptor = component.getPropertyDescriptor(name); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), id)) { + final Set<String> modulePaths = new LinkedHashSet<>(); + for (final Map.Entry<String, String> entry : properties.entrySet()) { + if (entry.getKey() != null && entry.getValue() == null) { + removeProperty(entry.getKey()); + } else if (entry.getKey() != null) { + setProperty(entry.getKey(), entry.getValue()); + + // for any properties that dynamically modify the classpath, attempt to evaluate them for expression language + final PropertyDescriptor descriptor = component.getPropertyDescriptor(entry.getKey()); + if (descriptor.isDynamicClasspathModifier() && !StringUtils.isEmpty(entry.getValue())) { + final StandardPropertyValue propertyValue = new StandardPropertyValue(entry.getValue(), null, variableRegistry); + modulePaths.add(propertyValue.evaluateAttributeExpressions().getValue()); + } + } + } - final String oldValue = properties.put(descriptor, value); - if (!value.equals(oldValue)) { + processClasspathModifiers(modulePaths); + } + } finally { + lock.unlock(); + } + } - if (descriptor.getControllerServiceDefinition() != null) { - if (oldValue != null) { - final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(oldValue); - if (oldNode != null) { - oldNode.removeReference(this); - } - } + // Keep setProperty/removeProperty private so that all calls go through setProperties + private void setProperty(final String name, final String value) { + if (null == name || null == value) { + throw new IllegalArgumentException("Name or Value can not be null"); + } - final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(value); - if (newNode != null) { - newNode.addReference(this); - } - } + final PropertyDescriptor descriptor = component.getPropertyDescriptor(name); - try { - component.onPropertyModified(descriptor, oldValue, value); - } catch (final Exception e) { - // nothing really to do here... + final String oldValue = properties.put(descriptor, value); + if (!value.equals(oldValue)) { + + if (descriptor.getControllerServiceDefinition() != null) { + if (oldValue != null) { + final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(oldValue); + if (oldNode != null) { + oldNode.removeReference(this); } } + + final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(value); + if (newNode != null) { + newNode.addReference(this); + } + } + + try { + component.onPropertyModified(descriptor, oldValue, value); + } catch (final Exception e) { + // nothing really to do here... } - } finally { - lock.unlock(); } } @@ -141,48 +181,74 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone * @return true if removed; false otherwise * @throws java.lang.IllegalArgumentException if the name is null */ - @Override - public boolean removeProperty(final String name) { + private boolean removeProperty(final String name) { if (null == name) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Name can not be null"); } - lock.lock(); - try { - verifyModifiable(); + final PropertyDescriptor descriptor = component.getPropertyDescriptor(name); + String value = null; + if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { - final PropertyDescriptor descriptor = component.getPropertyDescriptor(name); - String value = null; - if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) { - - if (descriptor.getControllerServiceDefinition() != null) { - if (value != null) { - final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value); - if (oldNode != null) { - oldNode.removeReference(this); - } - } + if (descriptor.getControllerServiceDefinition() != null) { + if (value != null) { + final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value); + if (oldNode != null) { + oldNode.removeReference(this); } + } + } - try { - component.onPropertyModified(descriptor, value, null); - } catch (final Exception e) { - // nothing really to do here... - } + try { + component.onPropertyModified(descriptor, value, null); + } catch (final Exception e) { + logger.error(e.getMessage(), e); + } + + return true; + } + + return false; + } - return true; + /** + * Adds all of the modules identified by the given module paths to the InstanceClassLoader for this component. + * + * @param modulePaths a list of module paths where each entry can be a comma-separated list of multiple module paths + */ + private void processClasspathModifiers(final Set<String> modulePaths) { + try { + final URL[] urls = ClassLoaderUtils.getURLsForClasspath(modulePaths, null, true); + + if (logger.isDebugEnabled()) { + logger.debug("Adding {} resources to the classpath for {}", new Object[] {urls.length, name}); + for (URL url : urls) { + logger.debug(url.getFile()); } } - } finally { - lock.unlock(); + + final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + if (!(classLoader instanceof InstanceClassLoader)) { + // Really shouldn't happen, but if we somehow got here and don't have an InstanceClassLoader then log a warning and move on + final String classLoaderName = classLoader == null ? "null" : classLoader.getClass().getName(); + if (logger.isWarnEnabled()) { + logger.warn(String.format("Unable to modify the classpath for %s, expected InstanceClassLoader, but found %s", name, classLoaderName)); + } + return; + } + + final InstanceClassLoader instanceClassLoader = (InstanceClassLoader) classLoader; + instanceClassLoader.setInstanceResources(urls); + } catch (MalformedURLException e) { + // Shouldn't get here since we are suppressing errors + logger.warn("Error processing classpath resources", e); } - return false; } @Override public Map<PropertyDescriptor, String> getProperties() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { final List<PropertyDescriptor> supported = component.getPropertyDescriptors(); if (supported == null || supported.isEmpty()) { return Collections.unmodifiableMap(properties); @@ -226,35 +292,35 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone @Override public String toString() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { return component.toString(); } } @Override public Collection<ValidationResult> validate(final ValidationContext context) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { return component.validate(context); } } @Override public PropertyDescriptor getPropertyDescriptor(final String name) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { return component.getPropertyDescriptor(name); } } @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { component.onPropertyModified(descriptor, oldValue, newValue); } } @Override public List<PropertyDescriptor> getPropertyDescriptors() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { return component.getPropertyDescriptors(); } } @@ -286,7 +352,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) { validationResults = component.validate(validationContext); } @@ -327,4 +393,9 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone protected ValidationContextFactory getValidationContextFactory() { return this.validationContextFactory; } + + protected VariableRegistry getVariableRegistry() { + return this.variableRegistry; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java index f1ee11e..91119ec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java @@ -35,25 +35,7 @@ public interface ConfiguredComponent extends Authorizable { public void setAnnotationData(String data); - /** - * Sets the property with the given name to the given value - * - * @param name the name of the property to update - * @param value the value to update the property to - */ - public void setProperty(String name, String value); - - /** - * Removes the property and value for the given property name if a - * descriptor and value exists for the given name. If the property is - * optional its value might be reset to default or will be removed entirely - * if was a dynamic property. - * - * @param name the property to remove - * @return true if removed; false otherwise - * @throws java.lang.IllegalArgumentException if the name is null - */ - public boolean removeProperty(String name); + public void setProperties(Map<String, String> properties); public Map<PropertyDescriptor, String> getProperties(); @@ -75,4 +57,5 @@ public interface ConfiguredComponent extends Authorizable { * @return the Canonical Class Name of the component */ String getCanonicalClassName(); + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 0fe306c..08b4abe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -28,10 +28,12 @@ import org.apache.nifi.controller.scheduling.ScheduleState; import org.apache.nifi.controller.scheduling.SchedulingAgent; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.scheduling.SchedulingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,9 +45,10 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen protected final AtomicReference<ScheduledState> scheduledState; public ProcessorNode(final Processor processor, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, - final String componentType, final String componentCanonicalClass) { - super(processor, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass); + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, + final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry, + final ComponentLog logger) { + super(processor, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, logger); this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED); } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 89a4379..ba5ed36 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -731,7 +731,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private void notifyComponentsConfigurationRestored() { for (final ProcessorNode procNode : getGroup(getRootGroupId()).findAllProcessors()) { final Processor processor = procNode.getProcessor(); - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor); } } @@ -739,7 +739,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R for (final ControllerServiceNode serviceNode : getAllControllerServices()) { final ControllerService service = serviceNode.getControllerServiceImplementation(); - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass())) { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass(), service.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service); } } @@ -747,7 +747,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R for (final ReportingTaskNode taskNode : getAllReportingTasks()) { final ReportingTask task = taskNode.getReportingTask(); - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(task.getClass())) { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(task.getClass(), task.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, task); } } @@ -1046,21 +1046,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R creationSuccessful = false; } + final ComponentLog logger = new SimpleProcessLogger(id, processor); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry); final ProcessorNode procNode; if (creationSuccessful) { - procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties); + procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties, variableRegistry, logger); } else { final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties); + procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties, variableRegistry, logger); } final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); if (firstTimeAdded) { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); } catch (final Exception e) { logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID); @@ -1068,7 +1069,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } if (firstTimeAdded) { - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass())) { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), processor.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor()); } } @@ -1082,14 +1083,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); try { - final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type); + final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type, identifier); final Class<?> rawClass; if (detectedClassLoaderForType == null) { // try to find from the current class loader rawClass = Class.forName(type); } else { // try to find from the registered classloader for that type - rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type)); + rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type, identifier)); } Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); @@ -1328,7 +1329,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // invoke any methods annotated with @OnShutdown on Controller Services for (final ControllerServiceNode serviceNode : getAllControllerServices()) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) { final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext); } @@ -1337,7 +1338,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // invoke any methods annotated with @OnShutdown on Reporting Tasks for (final ReportingTaskNode taskNode : getAllReportingTasks()) { final ConfigurationContext configContext = taskNode.getConfigurationContext(); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, taskNode.getReportingTask(), configContext); } } @@ -1609,12 +1610,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) { final String serviceId = controllerServiceDTO.getId(); final ControllerServiceNode serviceNode = getControllerServiceNode(serviceId); - - for (final Map.Entry<String, String> entry : controllerServiceDTO.getProperties().entrySet()) { - if (entry.getValue() != null) { - serviceNode.setProperty(entry.getKey(), entry.getValue()); - } - } + serviceNode.setProperties(controllerServiceDTO.getProperties()); } // @@ -1728,11 +1724,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } if (config.getProperties() != null) { - for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) { - if (entry.getValue() != null) { - procNode.setProperty(entry.getKey(), entry.getValue()); - } - } + procNode.setProperties(config.getProperties()); } group.addProcessor(procNode); @@ -2826,7 +2818,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R boolean creationSuccessful = true; final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); try { - final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type); + final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type, id); final Class<?> rawClass; if (detectedClassLoader == null) { rawClass = Class.forName(type); @@ -2851,15 +2843,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + final ComponentLog logger = new SimpleProcessLogger(id, task); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry); final ReportingTaskNode taskNode; if (creationSuccessful) { - taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry); + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry, logger); } else { final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry); + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry, logger); } taskNode.setName(task.getClass().getSimpleName()); @@ -2875,7 +2868,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie); } - try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getReportingTask().getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask()); } catch (final Exception e) { @@ -2929,7 +2922,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R reportingTaskNode.verifyCanDelete(); - try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getReportingTask().getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext()); } @@ -2947,6 +2940,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } reportingTasks.remove(reportingTaskNode.getIdentifier()); + ExtensionManager.removeInstanceClassLoaderIfExists(reportingTaskNode.getIdentifier()); } @Override @@ -2966,7 +2960,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (firstTimeAdded) { final ControllerService service = serviceNode.getControllerServiceImplementation(); - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass())) { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass(), service.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service); } } @@ -3085,7 +3079,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R service.verifyCanDelete(); - try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass(), service.getIdentifier())) { final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null, variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext); } @@ -3106,6 +3100,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R rootControllerServices.remove(service.getIdentifier()); getStateManagerProvider().onComponentRemoved(service.getIdentifier()); + ExtensionManager.removeInstanceClassLoaderIfExists(service.getIdentifier()); + LOG.info("{} removed from Flow Controller", service, this); } @@ -3451,17 +3447,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED; final ProcessGroup rootGroup = getGroup(getRootGroupId()); for (final ProcessorNode procNode : rootGroup.findAllProcessors()) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState); } } for (final ControllerServiceNode serviceNode : getAllControllerServices()) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState); } } for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index eb9bcac..8cfb3f3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -408,11 +408,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { .filter(e -> controllerServiceMapping.containsKey(e.getValue())) .collect(Collectors.toSet()); + final Map<String,String> controllerServiceProps = new HashMap<>(); + for (Map.Entry<PropertyDescriptor, String> propEntry : propertyDescriptors) { final PropertyDescriptor propertyDescriptor = propEntry.getKey(); final ControllerServiceNode clone = controllerServiceMapping.get(propEntry.getValue()); - reportingTask.setProperty(propertyDescriptor.getName(), clone.getIdentifier()); + controllerServiceProps.put(propertyDescriptor.getName(), clone.getIdentifier()); } + + reportingTask.setProperties(controllerServiceProps); } } } @@ -514,14 +518,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy())); reportingTask.setAnnotationData(dto.getAnnotationData()); - - for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) { - if (entry.getValue() == null) { - reportingTask.removeProperty(entry.getKey()); - } else { - reportingTask.setProperty(entry.getKey(), entry.getValue()); - } - } + reportingTask.setProperties(dto.getProperties()); final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), @@ -922,13 +919,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { procNode.setAutoTerminatedRelationships(relationships); } - for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) { - if (entry.getValue() == null) { - procNode.removeProperty(entry.getKey()); - } else { - procNode.setProperty(entry.getKey(), entry.getValue()); - } - } + procNode.setProperties(config.getProperties()); final ScheduledState scheduledState = ScheduledState.valueOf(processorDTO.getState()); if (ScheduledState.RUNNING.equals(scheduledState)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 42790fd..5ff9f22 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -54,6 +54,7 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; @@ -135,19 +136,21 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable // ??????? NOT any more public StandardProcessorNode(final Processor processor, final String uuid, - final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, - final ControllerServiceProvider controllerServiceProvider, final NiFiProperties nifiProperties) { + final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, + final ControllerServiceProvider controllerServiceProvider, final NiFiProperties nifiProperties, + final VariableRegistry variableRegistry, final ComponentLog logger) { this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider, - processor.getClass().getSimpleName(), processor.getClass().getCanonicalName(), nifiProperties); + processor.getClass().getSimpleName(), processor.getClass().getCanonicalName(), nifiProperties, variableRegistry, logger); } public StandardProcessorNode(final Processor processor, final String uuid, - final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, - final ControllerServiceProvider controllerServiceProvider, - final String componentType, final String componentCanonicalClass, final NiFiProperties nifiProperties) { + final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, + final ControllerServiceProvider controllerServiceProvider, + final String componentType, final String componentCanonicalClass, final NiFiProperties nifiProperties, + final VariableRegistry variableRegistry, final ComponentLog logger) { - super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass); + super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, logger); this.processor = processor; identifier = new AtomicReference<>(uuid); @@ -811,7 +814,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable Relationship returnRel = specRel; final Set<Relationship> relationships; - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { relationships = processor.getRelationships(); } @@ -857,7 +860,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable public Set<Relationship> getUndefinedRelationships() { final Set<Relationship> undefined = new HashSet<>(); final Set<Relationship> relationships; - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { relationships = processor.getRelationships(); } @@ -913,7 +916,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable .newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) { validationResults = getProcessor().validate(validationContext); } @@ -960,7 +963,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier()); final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) { validationResults = getProcessor().validate(validationContext); } @@ -1036,14 +1039,14 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public Collection<Relationship> getRelationships() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) { return getProcessor().getRelationships(); } } @Override public String toString() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) { return getProcessor().toString(); } } @@ -1060,7 +1063,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { processor.onTrigger(context, sessionFactory); } } @@ -1240,7 +1243,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable<Void>() { @Override public Void call() throws Exception { - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext); return null; } @@ -1250,7 +1253,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); } scheduledState.set(ScheduledState.STOPPED); @@ -1325,7 +1328,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable try { if (scheduleState.isScheduled()) { schedulingAgent.unschedule(StandardProcessorNode.this, scheduleState); - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); } } @@ -1334,7 +1337,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable // performing the lifecycle actions counts as 1 thread. final boolean allThreadsComplete = scheduleState.getActiveThreadCount() == 1; if (allThreadsComplete) { - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); }