http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 21747cf..dc8056d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -33,6 +33,7 @@ import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -50,27 +51,26 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon private volatile String comment; private volatile ScheduledState scheduledState = ScheduledState.STOPPED; - protected final VariableRegistry variableRegistry; - public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, - final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, - final ValidationContextFactory validationContextFactory, final VariableRegistry variableRegistry) { + final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, + final ValidationContextFactory validationContextFactory, final VariableRegistry variableRegistry, + final ComponentLog logger) { this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory, - reportingTask.getClass().getSimpleName(), reportingTask.getClass().getCanonicalName(),variableRegistry); + reportingTask.getClass().getSimpleName(), reportingTask.getClass().getCanonicalName(),variableRegistry, logger); } public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, - final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, - final ValidationContextFactory validationContextFactory, - final String componentType, final String componentCanonicalClass, VariableRegistry variableRegistry) { + final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, + final ValidationContextFactory validationContextFactory, + final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry, + final ComponentLog logger) { - super(reportingTask, id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass); + super(reportingTask, id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, logger); this.reportingTask = reportingTask; this.processScheduler = processScheduler; this.serviceLookup = controllerServiceProvider; - this.variableRegistry = variableRegistry; } @Override @@ -115,7 +115,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon @Override public ConfigurationContext getConfigurationContext() { - return new StandardConfigurationContext(this, serviceLookup, getSchedulingPeriod(), variableRegistry); + return new StandardConfigurationContext(this, serviceLookup, getSchedulingPeriod(), getVariableRegistry()); } @Override @@ -135,17 +135,6 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon this.scheduledState = state; } - @Override - public void setProperty(final String name, final String value) { - super.setProperty(name, value); - } - - @Override - public boolean removeProperty(String name) { - return super.removeProperty(name); - } - - public boolean isDisabled() { return scheduledState == ScheduledState.DISABLED; }
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java index b57faa1..bb58577 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java @@ -24,6 +24,7 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingTask; @@ -34,15 +35,15 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, - final VariableRegistry variableRegistry) { - super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry); + final VariableRegistry variableRegistry, final ComponentLog logger) { + super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, logger); this.flowController = controller; } public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, - final String componentType, final String canonicalClassName, VariableRegistry variableRegistry) { - super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,variableRegistry); + final String componentType, final String canonicalClassName, final VariableRegistry variableRegistry, final ComponentLog logger) { + super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,variableRegistry, logger); this.flowController = controller; } @@ -58,6 +59,6 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme @Override public ReportingContext getReportingContext() { - return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), flowController, getReportingTask(), variableRegistry); + return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), flowController, getReportingTask(), getVariableRegistry()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 860b4da..0c4972b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -287,7 +287,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { } try { - try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(worker.getClass())) { + try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(worker.getClass(), worker.getIdentifier())) { worker.onTrigger(processContext, sessionFactory); } catch (final ProcessException pe) { logger.error("{} failed to process session due to {}", worker, pe.toString()); @@ -305,7 +305,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { } } finally { if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(worker.getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(worker.getClass(), worker.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker, processContext); } } @@ -328,7 +328,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { } try { - try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(worker.getProcessor().getClass())) { + try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(worker.getProcessor().getClass(), worker.getIdentifier())) { worker.onTrigger(processContext, sessionFactory); } catch (final ProcessException pe) { final ComponentLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor()); @@ -347,7 +347,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { // if the processor is no longer scheduled to run and this is the last thread, // invoke the OnStopped methods if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(worker.getProcessor().getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(worker.getProcessor().getClass(), worker.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker.getProcessor(), processContext); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 305fad0..3cafbfe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -209,7 +209,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { return; } - try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext()); } @@ -262,7 +262,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { scheduleState.setScheduled(false); try { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext); } } catch (final Exception e) { @@ -436,7 +436,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) { final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); - try (final NarCloseable x = NarCloseable.withComponentNarLoader(connectable.getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(connectable.getClass(), connectable.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index 8b3dcf4..1596c63 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -166,11 +166,11 @@ public class ControllerServiceLoader { clone.setComments(controllerService.getComments()); if (controllerService.getProperties() != null) { + Map<String,String> properties = new HashMap<>(); for (Map.Entry<PropertyDescriptor, String> propEntry : controllerService.getProperties().entrySet()) { - if (propEntry.getValue() != null) { - clone.setProperty(propEntry.getKey().getName(), propEntry.getValue()); - } + properties.put(propEntry.getKey().getName(), propEntry.getValue()); } + clone.setProperties(properties); } return clone; @@ -188,14 +188,7 @@ public class ControllerServiceLoader { private static void configureControllerService(final ControllerServiceNode node, final Element controllerServiceElement, final StringEncryptor encryptor) { final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); node.setAnnotationData(dto.getAnnotationData()); - - for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) { - if (entry.getValue() == null) { - node.removeProperty(entry.getKey()); - } else { - node.setProperty(entry.getKey(), entry.getValue()); - } - } + node.setProperties(dto.getProperties()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 58671d4..e5a3b83 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -32,6 +32,7 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.ReflectionUtils; @@ -61,7 +62,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i private final ControllerService proxedControllerService; private final ControllerService implementation; private final ControllerServiceProvider serviceProvider; - private final VariableRegistry variableRegistry; private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED); private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -76,22 +76,22 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, - final VariableRegistry variableRegistry) { + final VariableRegistry variableRegistry, final ComponentLog logger) { this(proxiedControllerService, implementation, id, validationContextFactory, serviceProvider, - implementation.getClass().getSimpleName(), implementation.getClass().getCanonicalName(), variableRegistry); + implementation.getClass().getSimpleName(), implementation.getClass().getCanonicalName(), variableRegistry, logger); } public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, - final String componentType, final String componentCanonicalClass, VariableRegistry variableRegistry) { + final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry, + final ComponentLog logger) { - super(implementation, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass); + super(implementation, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, logger); this.proxedControllerService = proxiedControllerService; this.implementation = implementation; this.serviceProvider = serviceProvider; this.active = new AtomicBoolean(); - this.variableRegistry = variableRegistry; } @@ -203,16 +203,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } @Override - public void setProperty(final String name, final String value) { - super.setProperty(name, value); - } - - @Override - public boolean removeProperty(String name) { - return super.removeProperty(name); - } - - @Override public void verifyCanDelete() { if (getState() != ControllerServiceState.DISABLED) { throw new IllegalStateException(implementation.getIdentifier() + " cannot be deleted because it is not disabled"); @@ -340,12 +330,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) { if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) { this.active.set(true); - final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, variableRegistry); + final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, getVariableRegistry()); scheduler.execute(new Runnable() { @Override public void run() { try { - ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext); + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) { + ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext); + } boolean shouldEnable = false; synchronized (active) { shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED); @@ -367,7 +359,9 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i if (isActive()) { scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS); } else { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext); + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext); + } stateRef.set(ControllerServiceState.DISABLED); } } @@ -403,7 +397,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) { - final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, variableRegistry); + final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, getVariableRegistry()); scheduler.execute(new Runnable() { @Override public void run() { @@ -423,7 +417,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i * */ private void invokeDisable(ConfigurationContext configContext) { - try { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext); } catch (Exception e) { final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index b4d7e26..e4937df 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -131,7 +131,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader(); try { - final ClassLoader cl = ExtensionManager.getClassLoader(type); + final ClassLoader cl = ExtensionManager.getClassLoader(type, id); final Class<?> rawClass; try { @@ -165,7 +165,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final boolean disabled = state != ControllerServiceState.ENABLED; // only allow method call if service state is ENABLED. if (disabled && !validDisabledMethods.contains(method)) { // Use nar class loader here because we are implicitly calling toString() on the original implementation. - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) { throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService.getIdentifier() + " because the Controller Service is disabled"); } catch (final Throwable e) { @@ -173,7 +173,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) { return method.invoke(originalService, args); } catch (final InvocationTargetException e) { // If the ControllerService throws an Exception, it'll be wrapped in an InvocationTargetException. We want @@ -194,14 +194,15 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final ComponentLog serviceLogger = new SimpleProcessLogger(id, originalService); originalService.initialize(new StandardControllerServiceInitializationContext(id, serviceLogger, this, getStateManager(id), nifiProperties)); + final ComponentLog logger = new SimpleProcessLogger(id, originalService); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this, variableRegistry); - final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this, variableRegistry); + final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this, variableRegistry, logger); serviceNodeHolder.set(serviceNode); serviceNode.setName(rawClass.getSimpleName()); if (firstTimeAdded) { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(originalService.getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService); } catch (final Exception e) { throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e); @@ -264,8 +265,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; + final ComponentLog logger = new SimpleProcessLogger(id, proxiedService); + final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, proxiedService, id, - new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry); + new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry, logger); return serviceNode; } @@ -585,6 +588,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } group.removeControllerService(serviceNode); + ExtensionManager.removeInstanceClassLoaderIfExists(serviceNode.getIdentifier()); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java index b5d6a4d..b856f11 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java @@ -76,7 +76,7 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> { if (shouldRun) { scheduleState.incrementActiveThreadCount(); try { - try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(connectable.getClass())) { + try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(connectable.getClass(), connectable.getIdentifier())) { connectable.onTrigger(processContext, sessionFactory); } catch (final ProcessException pe) { logger.error("{} failed to process session due to {}", connectable, pe.toString()); @@ -93,7 +93,7 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> { } } finally { if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(connectable.getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(connectable.getClass(), connectable.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index f3e8474..3bc2356 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -130,7 +130,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> { final long finishNanos = startNanos + batchNanos; int invocationCount = 0; try { - try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass())) { + try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())) { boolean shouldRun = true; while (shouldRun) { procNode.onTrigger(processContext, sessionFactory); http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java index ea93db1..5f14d19 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java @@ -37,7 +37,7 @@ public class ReportingTaskWrapper implements Runnable { @Override public synchronized void run() { scheduleState.incrementActiveThreadCount(); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) { taskNode.getReportingTask().onTrigger(taskNode.getReportingContext()); } catch (final Throwable t) { final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask()); @@ -50,7 +50,7 @@ public class ReportingTaskWrapper implements Runnable { // if the reporting task is no longer scheduled to run and this is the last thread, // invoke the OnStopped methods if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, taskNode.getReportingTask(), taskNode.getConfigurationContext()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 6f0dd84..fcfe838 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -53,6 +53,7 @@ import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.logging.LogRepositoryFactory; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.registry.VariableRegistry; @@ -349,7 +350,7 @@ public final class StandardProcessGroup implements ProcessGroup { private void shutdown(final ProcessGroup procGroup) { for (final ProcessorNode node : procGroup.getProcessors()) { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(node.getProcessor().getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(node.getProcessor().getClass(), node.getIdentifier())) { final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext); } @@ -708,7 +709,7 @@ public final class StandardProcessGroup implements ProcessGroup { conn.verifyCanDelete(); } - try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getProcessor().getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getProcessor().getClass(), processor.getIdentifier())) { final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext); } catch (final Exception e) { @@ -745,6 +746,7 @@ public final class StandardProcessGroup implements ProcessGroup { removeConnection(conn); } + ExtensionManager.removeInstanceClassLoaderIfExists(id); LOG.info("{} removed from flow", processor); } finally { writeLock.unlock(); @@ -1847,7 +1849,7 @@ public final class StandardProcessGroup implements ProcessGroup { service.verifyCanDelete(); - try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass(), service.getIdentifier())) { final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null, variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext); } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java index f6dc88e..60ff887 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java @@ -17,16 +17,6 @@ package org.apache.nifi.controller; -import static org.junit.Assert.assertEquals; - -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; - import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; @@ -35,26 +25,72 @@ import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.expression.ExpressionLanguageCompiler; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.InstanceClassLoader; +import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.StandardProcessContext; +import org.apache.nifi.processor.StandardProcessorInitializationContext; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.test.processors.ModifiesClasspathNoAnnotationProcessor; +import org.apache.nifi.test.processors.ModifiesClasspathProcessor; import org.apache.nifi.util.MockPropertyValue; +import org.apache.nifi.util.MockVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestStandardProcessorNode { + private MockVariableRegistry variableRegistry; + + @Before + public void setup() { + variableRegistry = new MockVariableRegistry(); + } + @Test(timeout = 10000) public void testStart() throws InterruptedException { System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessorNode.class.getResource("/conf/nifi.properties").getFile()); final ProcessorThatThrowsExceptionOnScheduled processor = new ProcessorThatThrowsExceptionOnScheduled(); final String uuid = UUID.randomUUID().toString(); - final StandardProcessorNode procNode = new StandardProcessorNode(processor, uuid, createValidationContextFactory(), null, null, NiFiProperties.createBasicNiFiProperties(null, null)); - final ScheduledExecutorService taskScheduler = new FlowEngine(2, "TestStandardProcessorNode", true); + ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(uuid, null, null, null, null); + processor.initialize(initContext); + + final StandardProcessorNode procNode = new StandardProcessorNode(processor, uuid, createValidationContextFactory(), null, null, + NiFiProperties.createBasicNiFiProperties(null, null), VariableRegistry.EMPTY_REGISTRY, Mockito.mock(ComponentLog.class)); + final ScheduledExecutorService taskScheduler = new FlowEngine(2, "TestClasspathResources", true); final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, null); final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() { @@ -81,6 +117,220 @@ public class TestStandardProcessorNode { assertEquals(1, processor.onStoppedCount); } + @Test + public void testSinglePropertyDynamicallyModifiesClasspath() throws MalformedURLException { + final PropertyDescriptor classpathProp = new PropertyDescriptor.Builder().name("Classpath Resources") + .dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp)); + final StandardProcessorNode procNode = createProcessorNode(processor); + + final Set<ClassLoader> classLoaders = new HashSet<>(); + classLoaders.add(procNode.getProcessor().getClass().getClassLoader()); + + // Load all of the extensions in src/test/java of this project + ExtensionManager.discoverExtensions(classLoaders); + + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ + // Should have an InstanceClassLoader here + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + assertTrue(contextClassLoader instanceof InstanceClassLoader); + + final InstanceClassLoader instanceClassLoader = (InstanceClassLoader) contextClassLoader; + + // Should not have any of the test resources loaded at this point + final URL[] testResources = getTestResources(); + for (URL testResource : testResources) { + if (containsResource(instanceClassLoader.getInstanceResources(), testResource)) { + fail("found resource that should not have been loaded"); + } + } + + // Simulate setting the properties of the processor to point to the test resources directory + final Map<String, String> properties = new HashMap<>(); + properties.put(classpathProp.getName(), "src/test/resources/TestClasspathResources"); + procNode.setProperties(properties); + + // Should have all of the resources loaded into the InstanceClassLoader now + for (URL testResource : testResources) { + assertTrue(containsResource(instanceClassLoader.getInstanceResources(), testResource)); + } + + // Should pass validation + assertTrue(procNode.isValid()); + } finally { + ExtensionManager.removeInstanceClassLoaderIfExists(procNode.getIdentifier()); + } + } + + @Test + public void testMultiplePropertiesDynamicallyModifyClasspathWithExpressionLanguage() throws MalformedURLException { + final PropertyDescriptor classpathProp1 = new PropertyDescriptor.Builder().name("Classpath Resource 1") + .dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + final PropertyDescriptor classpathProp2 = new PropertyDescriptor.Builder().name("Classpath Resource 2") + .dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp1, classpathProp2)); + final StandardProcessorNode procNode = createProcessorNode(processor); + + final Set<ClassLoader> classLoaders = new HashSet<>(); + classLoaders.add(procNode.getProcessor().getClass().getClassLoader()); + + // Load all of the extensions in src/test/java of this project + ExtensionManager.discoverExtensions(classLoaders); + + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ + // Should have an InstanceClassLoader here + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + assertTrue(contextClassLoader instanceof InstanceClassLoader); + + final InstanceClassLoader instanceClassLoader = (InstanceClassLoader) contextClassLoader; + + // Should not have any of the test resources loaded at this point + final URL[] testResources = getTestResources(); + for (URL testResource : testResources) { + if (containsResource(instanceClassLoader.getInstanceResources(), testResource)) { + fail("found resource that should not have been loaded"); + } + } + + // Simulate setting the properties pointing to two of the resources + final Map<String, String> properties = new HashMap<>(); + properties.put(classpathProp1.getName(), "src/test/resources/TestClasspathResources/resource1.txt"); + properties.put(classpathProp2.getName(), "src/test/resources/TestClasspathResources/${myResource}"); + + variableRegistry.setVariable(new VariableDescriptor("myResource"), "resource3.txt"); + + procNode.setProperties(properties); + + // Should have resources 1 and 3 loaded into the InstanceClassLoader now + assertTrue(containsResource(instanceClassLoader.getInstanceResources(), testResources[0])); + assertTrue(containsResource(instanceClassLoader.getInstanceResources(), testResources[2])); + assertFalse(containsResource(instanceClassLoader.getInstanceResources(), testResources[1])); + + // Should pass validation + assertTrue(procNode.isValid()); + } finally { + ExtensionManager.removeInstanceClassLoaderIfExists(procNode.getIdentifier()); + } + } + + @Test + public void testSomeNonExistentPropertiesDynamicallyModifyClasspath() throws MalformedURLException { + final PropertyDescriptor classpathProp1 = new PropertyDescriptor.Builder().name("Classpath Resource 1") + .dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + final PropertyDescriptor classpathProp2 = new PropertyDescriptor.Builder().name("Classpath Resource 2") + .dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp1, classpathProp2)); + final StandardProcessorNode procNode = createProcessorNode(processor); + + final Set<ClassLoader> classLoaders = new HashSet<>(); + classLoaders.add(procNode.getProcessor().getClass().getClassLoader()); + + // Load all of the extensions in src/test/java of this project + ExtensionManager.discoverExtensions(classLoaders); + + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ + // Should have an InstanceClassLoader here + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + assertTrue(contextClassLoader instanceof InstanceClassLoader); + + final InstanceClassLoader instanceClassLoader = (InstanceClassLoader) contextClassLoader; + + // Should not have any of the test resources loaded at this point + final URL[] testResources = getTestResources(); + for (URL testResource : testResources) { + if (containsResource(instanceClassLoader.getInstanceResources(), testResource)) { + fail("found resource that should not have been loaded"); + } + } + + // Simulate setting the properties pointing to two of the resources + final Map<String, String> properties = new HashMap<>(); + properties.put(classpathProp1.getName(), "src/test/resources/TestClasspathResources/resource1.txt"); + properties.put(classpathProp2.getName(), "src/test/resources/TestClasspathResources/DoesNotExist.txt"); + procNode.setProperties(properties); + + // Should have resources 1 and 3 loaded into the InstanceClassLoader now + assertTrue(containsResource(instanceClassLoader.getInstanceResources(), testResources[0])); + assertFalse(containsResource(instanceClassLoader.getInstanceResources(), testResources[1])); + assertFalse(containsResource(instanceClassLoader.getInstanceResources(), testResources[2])); + + // Should pass validation + assertTrue(procNode.isValid()); + } finally { + ExtensionManager.removeInstanceClassLoaderIfExists(procNode.getIdentifier()); + } + } + + @Test + public void testPropertyModifiesClasspathWhenProcessorMissingAnnotation() throws MalformedURLException { + final ModifiesClasspathNoAnnotationProcessor processor = new ModifiesClasspathNoAnnotationProcessor(); + final StandardProcessorNode procNode = createProcessorNode(processor); + + final Set<ClassLoader> classLoaders = new HashSet<>(); + classLoaders.add(procNode.getProcessor().getClass().getClassLoader()); + + // Load all of the extensions in src/test/java of this project + ExtensionManager.discoverExtensions(classLoaders); + + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ + // Can't validate the ClassLoader here b/c the class is missing the annotation + + // Simulate setting the properties pointing to two of the resources + final Map<String, String> properties = new HashMap<>(); + properties.put(ModifiesClasspathNoAnnotationProcessor.CLASSPATH_RESOURCE.getName(), + "src/test/resources/TestClasspathResources/resource1.txt"); + procNode.setProperties(properties); + + // Should not have loaded any of the resources + final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + assertTrue(classLoader instanceof URLClassLoader); + + final URL[] testResources = getTestResources(); + final URLClassLoader urlClassLoader = (URLClassLoader) classLoader; + assertFalse(containsResource(urlClassLoader.getURLs(), testResources[0])); + assertFalse(containsResource(urlClassLoader.getURLs(), testResources[1])); + assertFalse(containsResource(urlClassLoader.getURLs(), testResources[2])); + + // Should pass validation + assertTrue(procNode.isValid()); + + } finally { + ExtensionManager.removeInstanceClassLoaderIfExists(procNode.getIdentifier()); + } + } + + private StandardProcessorNode createProcessorNode(Processor processor) { + final String uuid = UUID.randomUUID().toString(); + final ValidationContextFactory validationContextFactory = createValidationContextFactory(); + final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, null); + final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); + final ComponentLog componentLog = Mockito.mock(ComponentLog.class); + + ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(uuid, componentLog, null, null, null); + processor.initialize(initContext); + + return new StandardProcessorNode(processor, uuid, validationContextFactory, processScheduler, null, + niFiProperties, variableRegistry, componentLog); + } + + private boolean containsResource(URL[] resources, URL resourceToFind) { + for (URL resource : resources) { + if (resourceToFind.getPath().equals(resource.getPath())) { + return true; + } + } + return false; + } + + private URL[] getTestResources() throws MalformedURLException { + URL resource1 = new File("src/test/resources/TestClasspathResources/resource1.txt").toURI().toURL(); + URL resource2 = new File("src/test/resources/TestClasspathResources/resource2.txt").toURI().toURL(); + URL resource3 = new File("src/test/resources/TestClasspathResources/resource3.txt").toURI().toURL(); + return new URL[] { resource1, resource2, resource3 }; + } + private ValidationContextFactory createValidationContextFactory() { return new ValidationContextFactory() { @@ -180,4 +430,5 @@ public class TestStandardProcessorNode { onStoppedCount++; } } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index e1b24a3..46d96be 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -83,10 +83,12 @@ public class TestProcessorLifecycle { private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class); private FlowController fc; + private Map<String,String> properties = new HashMap<>(); @Before public void before() throws Exception { System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestProcessorLifecycle.class.getResource("/nifi.properties").getFile()); + properties.put("P", "hello"); } @After @@ -124,7 +126,7 @@ public class TestProcessorLifecycle { this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); + testProcNode.setProperties(properties); assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); // validates idempotency @@ -149,7 +151,7 @@ public class TestProcessorLifecycle { ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run @@ -175,7 +177,7 @@ public class TestProcessorLifecycle { ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); // sets the scenario for the processor to run @@ -198,7 +200,7 @@ public class TestProcessorLifecycle { ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run @@ -241,7 +243,7 @@ public class TestProcessorLifecycle { ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run @@ -297,7 +299,7 @@ public class TestProcessorLifecycle { ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run @@ -333,7 +335,7 @@ public class TestProcessorLifecycle { ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run @@ -365,7 +367,7 @@ public class TestProcessorLifecycle { ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run @@ -396,7 +398,7 @@ public class TestProcessorLifecycle { ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run this.blockingInterruptableOnUnschedule(testProcessor); @@ -424,7 +426,7 @@ public class TestProcessorLifecycle { ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run this.blockingUninterruptableOnUnschedule(testProcessor); @@ -457,7 +459,7 @@ public class TestProcessorLifecycle { ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run @@ -504,8 +506,8 @@ public class TestProcessorLifecycle { ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "serv", true); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNode.setProperty("P", "hello"); - testProcNode.setProperty("S", testServiceNode.getIdentifier()); + properties.put("S", testServiceNode.getIdentifier()); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); testProcessor.withService = true; @@ -529,8 +531,9 @@ public class TestProcessorLifecycle { ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); testGroup.addProcessor(testProcNode); - testProcNode.setProperty("P", "hello"); - testProcNode.setProperty("S", testServiceNode.getIdentifier()); + + properties.put("S", testServiceNode.getIdentifier()); + testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); testProcessor.withService = true; @@ -556,11 +559,11 @@ public class TestProcessorLifecycle { this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNodeA.setProperty("P", "hello"); + testProcNodeA.setProperties(properties); testGroup.addProcessor(testProcNodeA); ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); - testProcNodeB.setProperty("P", "hello"); + testProcNodeB.setProperties(properties); testGroup.addProcessor(testProcNodeB); Collection<String> relationNames = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index e8185cb..ee2b103 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -21,7 +21,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -57,6 +59,7 @@ import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.StandardProcessorInitializationContext; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.registry.VariableRegistry; @@ -95,7 +98,8 @@ public class TestStandardProcessScheduler { reportingTask.initialize(config); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null, variableRegistry); - taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry); + final ComponentLog logger = Mockito.mock(ComponentLog.class); + taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry, logger); controller = Mockito.mock(FlowController.class); rootGroup = new MockProcessGroup(); @@ -129,18 +133,24 @@ public class TestStandardProcessScheduler { @Test(timeout = 60000) public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException { + final String uuid = UUID.randomUUID().toString(); final Processor proc = new ServiceReferencingProcessor(); + proc.initialize(new StandardProcessorInitializationContext(uuid, null, null, null, null)); final StandardControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(controller, scheduler, null, Mockito.mock(StateManagerProvider.class), variableRegistry, nifiProperties); final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true); rootGroup.addControllerService(service); - final ProcessorNode procNode = new StandardProcessorNode(proc, UUID.randomUUID().toString(), - new StandardValidationContextFactory(serviceProvider, variableRegistry), scheduler, serviceProvider, nifiProperties); + final ProcessorNode procNode = new StandardProcessorNode(proc, uuid, + new StandardValidationContextFactory(serviceProvider, variableRegistry), + scheduler, serviceProvider, nifiProperties, VariableRegistry.EMPTY_REGISTRY, + Mockito.mock(ComponentLog.class)); rootGroup.addProcessor(procNode); - procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier()); + Map<String,String> procProps = new HashMap<>(); + procProps.put(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier()); + procNode.setProperties(procProps); scheduler.enableControllerService(service); scheduler.startProcessor(procNode); http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index f0e1566..0b338fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; + import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.FlowController; @@ -44,6 +45,7 @@ import org.apache.nifi.controller.service.mock.ServiceB; import org.apache.nifi.controller.service.mock.ServiceC; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.StandardProcessGroup; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.NiFiProperties; @@ -88,6 +90,12 @@ public class TestStandardControllerServiceProvider { return new StandardProcessScheduler(null, null, stateManagerProvider, variableRegistry, NiFiProperties.createBasicNiFiProperties(null, null)); } + private void setProperty(ControllerServiceNode serviceNode, String propName, String propValue) { + Map<String,String> props = new LinkedHashMap<>(); + props.put(propName, propValue); + serviceNode.setProperties(props); + } + @Test public void testDisableControllerService() { final ProcessGroup procGroup = new MockProcessGroup(); @@ -118,7 +126,7 @@ public class TestStandardControllerServiceProvider { group.addControllerService(serviceNodeA); group.addControllerService(serviceNodeB); - serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B"); + setProperty(serviceNodeA, ServiceA.OTHER_SERVICE.getName(), "B"); try { provider.enableControllerService(serviceNodeA); @@ -158,7 +166,7 @@ public class TestStandardControllerServiceProvider { * https://issues.apache.org/jira/browse/NIFI-1143 */ @Test(timeout = 60000) - public void testConcurrencyWithEnablingReferencingServicesGraph() { + public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException { final ProcessScheduler scheduler = createScheduler(); for (int i = 0; i < 10000; i++) { testEnableReferencingServicesGraph(scheduler); @@ -195,10 +203,10 @@ public class TestStandardControllerServiceProvider { procGroup.addControllerService(serviceNode3); procGroup.addControllerService(serviceNode4); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); + setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2"); + setProperty(serviceNode2, ServiceA.OTHER_SERVICE.getName(), "4"); + setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2"); + setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4"); provider.enableControllerService(serviceNode4); provider.enableReferencingServices(serviceNode4); @@ -227,7 +235,7 @@ public class TestStandardControllerServiceProvider { final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceB.class.getName(), "2", false); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2"); final Map<String, ControllerServiceNode> nodeMap = new LinkedHashMap<>(); nodeMap.put("1", serviceNode1); @@ -257,7 +265,7 @@ public class TestStandardControllerServiceProvider { // add circular dependency on self. nodeMap.clear(); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1"); + setProperty(serviceNode1, ServiceA.OTHER_SERVICE_2.getName(), "1"); nodeMap.put("1", serviceNode1); nodeMap.put("2", serviceNode2); @@ -284,8 +292,8 @@ public class TestStandardControllerServiceProvider { // like that. nodeMap.clear(); final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1"); + setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "3"); + setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "1"); nodeMap.put("1", serviceNode1); nodeMap.put("3", serviceNode3); branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); @@ -307,10 +315,10 @@ public class TestStandardControllerServiceProvider { // Add multiple completely disparate branches. nodeMap.clear(); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2"); final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); final ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceB.class.getName(), "5", false); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); + setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "4"); nodeMap.put("1", serviceNode1); nodeMap.put("2", serviceNode2); nodeMap.put("3", serviceNode3); @@ -341,8 +349,8 @@ public class TestStandardControllerServiceProvider { // create 2 branches both dependent on the same service nodeMap.clear(); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2"); + setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2"); nodeMap.put("1", serviceNode1); nodeMap.put("2", serviceNode2); nodeMap.put("3", serviceNode3); @@ -367,7 +375,9 @@ public class TestStandardControllerServiceProvider { private ProcessorNode createProcessor(final StandardProcessScheduler scheduler, final ControllerServiceProvider serviceProvider) { final ProcessorNode procNode = new StandardProcessorNode(new DummyProcessor(), UUID.randomUUID().toString(), - new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider, NiFiProperties.createBasicNiFiProperties(null, null)); + new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider, + NiFiProperties.createBasicNiFiProperties(null, null), + VariableRegistry.EMPTY_REGISTRY, Mockito.mock(ComponentLog.class)); final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, null, variableRegistry); group.addProcessor(procNode); @@ -422,12 +432,12 @@ public class TestStandardControllerServiceProvider { procGroup.addControllerService(E); procGroup.addControllerService(F); - A.setProperty(ServiceA.OTHER_SERVICE.getName(), "B"); - B.setProperty(ServiceA.OTHER_SERVICE.getName(), "D"); - C.setProperty(ServiceA.OTHER_SERVICE.getName(), "B"); - C.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "D"); - E.setProperty(ServiceA.OTHER_SERVICE.getName(), "A"); - E.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "F"); + setProperty(A, ServiceA.OTHER_SERVICE.getName(), "B"); + setProperty(B, ServiceA.OTHER_SERVICE.getName(), "D"); + setProperty(C, ServiceA.OTHER_SERVICE.getName(), "B"); + setProperty(C, ServiceA.OTHER_SERVICE_2.getName(), "D"); + setProperty(E, ServiceA.OTHER_SERVICE.getName(), "A"); + setProperty(E, ServiceA.OTHER_SERVICE_2.getName(), "F"); provider.enableControllerServices(Arrays.asList(A, B, C, D, E, F)); @@ -465,12 +475,12 @@ public class TestStandardControllerServiceProvider { procGroup.addControllerService(D); procGroup.addControllerService(F); - A.setProperty(ServiceC.REQ_SERVICE_1.getName(), "B"); - A.setProperty(ServiceC.REQ_SERVICE_2.getName(), "D"); - B.setProperty(ServiceA.OTHER_SERVICE.getName(), "C"); + setProperty(A, ServiceC.REQ_SERVICE_1.getName(), "B"); + setProperty(A, ServiceC.REQ_SERVICE_2.getName(), "D"); + setProperty(B, ServiceA.OTHER_SERVICE.getName(), "C"); - F.setProperty(ServiceA.OTHER_SERVICE.getName(), "D"); - D.setProperty(ServiceA.OTHER_SERVICE.getName(), "C"); + setProperty(F, ServiceA.OTHER_SERVICE.getName(), "D"); + setProperty(D, ServiceA.OTHER_SERVICE.getName(), "C"); provider.enableControllerServices(Arrays.asList(C, F, A, B, D)); @@ -506,13 +516,13 @@ public class TestStandardControllerServiceProvider { procGroup.addControllerService(serviceNode6); procGroup.addControllerService(serviceNode7); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); - serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "6"); - serviceNode7.setProperty(ServiceC.REQ_SERVICE_1.getName(), "2"); - serviceNode7.setProperty(ServiceC.REQ_SERVICE_2.getName(), "3"); + setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2"); + setProperty(serviceNode2, ServiceA.OTHER_SERVICE.getName(), "4"); + setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2"); + setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4"); + setProperty(serviceNode5, ServiceA.OTHER_SERVICE.getName(), "6"); + setProperty(serviceNode7, ServiceC.REQ_SERVICE_1.getName(), "2"); + setProperty(serviceNode7, ServiceC.REQ_SERVICE_2.getName(), "3"); provider.enableControllerServices(Arrays.asList( serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5, serviceNode7)); http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java index d972dde..11a7a97 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java @@ -50,7 +50,7 @@ public class TestControllerService implements ControllerService { @Override public String getIdentifier() { - return null; + return "id"; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/test/processors/ModifiesClasspathNoAnnotationProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/test/processors/ModifiesClasspathNoAnnotationProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/test/processors/ModifiesClasspathNoAnnotationProcessor.java new file mode 100644 index 0000000..aea4f7e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/test/processors/ModifiesClasspathNoAnnotationProcessor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.test.processors; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Collections; +import java.util.List; + +/** + * A processor with a property descriptor that attempts to modify the classpath, but the processor + * does not have @RequiresInstanceClassLoading. + */ +public class ModifiesClasspathNoAnnotationProcessor extends AbstractProcessor { + + public static final PropertyDescriptor CLASSPATH_RESOURCE = new PropertyDescriptor.Builder() + .name("Classpath Resource") + .dynamicallyModifiesClasspath(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return Collections.singletonList(CLASSPATH_RESOURCE); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + } +}