http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index f57343e..3e07995 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller; import static java.util.Objects.requireNonNull; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -27,13 +28,16 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -46,6 +50,9 @@ import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.Connectable; @@ -57,23 +64,34 @@ import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogRepositoryFactory; +import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.SchedulingContext; +import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.processor.StandardSchedulingContext; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.ReflectionUtils; import org.quartz.CronExpression; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * ProcessorNode provides thread-safe access to a FlowFileProcessor as it exists within a controlled flow. This node keeps track of the processor, its scheduling information and its relationships to - * other processors and whatever scheduled futures exist for it. Must be thread safe. + * ProcessorNode provides thread-safe access to a FlowFileProcessor as it exists + * within a controlled flow. This node keeps track of the processor, its + * scheduling information and its relationships to other processors and whatever + * scheduled futures exist for it. Must be thread safe. * */ public class StandardProcessorNode extends ProcessorNode implements Connectable { + private static final Logger LOG = LoggerFactory.getLogger(StandardProcessorNode.class); + public static final String BULLETIN_OBSERVER_ID = "bulletin-observer"; public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; @@ -86,16 +104,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final Map<Relationship, Set<Connection>> connections; private final AtomicReference<Set<Relationship>> undefinedRelationshipsToTerminate; private final AtomicReference<List<Connection>> incomingConnectionsRef; - private final ReentrantReadWriteLock rwLock; - private final Lock readLock; - private final Lock writeLock; private final AtomicBoolean isolated; private final AtomicBoolean lossTolerant; private final AtomicReference<ScheduledState> scheduledState; private final AtomicReference<String> comments; private final AtomicReference<Position> position; private final AtomicReference<String> annotationData; - private final AtomicReference<String> schedulingPeriod; // stored as string so it's presented to user as they entered it + private final AtomicReference<String> schedulingPeriod; // stored as string + // so it's presented + // to user as they + // entered it private final AtomicReference<String> yieldPeriod; private final AtomicReference<String> penalizationPeriod; private final AtomicReference<Map<String, String>> style; @@ -114,10 +132,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private long runNanos = 0L; private SchedulingStrategy schedulingStrategy; // guarded by read/write lock + // ??????? NOT any more @SuppressWarnings("deprecation") - public StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory, - final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider) { + public StandardProcessorNode(final Processor processor, final String uuid, + final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, + final ControllerServiceProvider controllerServiceProvider) { super(processor, uuid, validationContextFactory, controllerServiceProvider); this.processor = processor; @@ -126,9 +146,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable connections = new HashMap<>(); incomingConnectionsRef = new AtomicReference<List<Connection>>(new ArrayList<Connection>()); scheduledState = new AtomicReference<>(ScheduledState.STOPPED); - rwLock = new ReentrantReadWriteLock(false); - readLock = rwLock.readLock(); - writeLock = rwLock.writeLock(); lossTolerant = new AtomicBoolean(false); final Set<Relationship> emptySetOfRelationships = new HashSet<>(); undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships); @@ -147,15 +164,21 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); final Class<?> procClass = processor.getClass(); - triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class); - sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SideEffectFree.class); - batchSupported = procClass.isAnnotationPresent(SupportsBatching.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SupportsBatching.class); - triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerSerially.class); + triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class) + || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class); + sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class) + || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SideEffectFree.class); + batchSupported = procClass.isAnnotationPresent(SupportsBatching.class) + || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SupportsBatching.class); + triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class) + || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerSerially.class); triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class) - || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class); + || procClass.isAnnotationPresent( + org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class); this.validationContextFactory = validationContextFactory; eventDrivenSupported = (procClass.isAnnotationPresent(EventDriven.class) - || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class)) && !triggeredSerially && !triggerWhenEmpty; + || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class)) + && !triggeredSerially && !triggerWhenEmpty; final boolean inputRequirementPresent = procClass.isAnnotationPresent(InputRequirement.class); if (inputRequirementPresent) { @@ -176,26 +199,23 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * Provides and opportunity to retain information about this particular processor instance + * Provides and opportunity to retain information about this particular + * processor instance * - * @param comments new comments + * @param comments + * new comments */ @Override public void setComments(final String comments) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - this.comments.set(comments); - } finally { - writeLock.unlock(); + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } + this.comments.set(comments); } @Override public ScheduledState getScheduledState() { - return scheduledState.get(); + return this.scheduledState.get(); } @Override @@ -226,7 +246,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * @return if true flow file content generated by this processor is considered loss tolerant + * @return if true flow file content generated by this processor is + * considered loss tolerant */ @Override public boolean isLossTolerant() { @@ -239,7 +260,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * @return true if the processor has the {@link TriggerWhenEmpty} annotation, false otherwise. + * @return true if the processor has the {@link TriggerWhenEmpty} + * annotation, false otherwise. */ @Override public boolean isTriggerWhenEmpty() { @@ -247,7 +269,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * @return true if the processor has the {@link SideEffectFree} annotation, false otherwise. + * @return true if the processor has the {@link SideEffectFree} annotation, + * false otherwise. */ @Override public boolean isSideEffectFree() { @@ -260,7 +283,9 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * @return true if the processor has the {@link TriggerWhenAnyDestinationAvailable} annotation, false otherwise. + * @return true if the processor has the + * {@link TriggerWhenAnyDestinationAvailable} annotation, false + * otherwise. */ @Override public boolean isTriggerWhenAnyDestinationAvailable() { @@ -268,38 +293,31 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * Indicates whether flow file content made by this processor must be persisted + * Indicates whether flow file content made by this processor must be + * persisted * - * @param lossTolerant tolerant + * @param lossTolerant + * tolerant */ @Override public void setLossTolerant(final boolean lossTolerant) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - this.lossTolerant.set(lossTolerant); - } finally { - writeLock.unlock(); + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } + this.lossTolerant.set(lossTolerant); } /** * Indicates whether the processor runs on only the primary node. * - * @param isolated isolated + * @param isolated + * isolated */ public void setIsolated(final boolean isolated) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - this.isolated.set(isolated); - } finally { - writeLock.unlock(); + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } + this.isolated.set(isolated); } @Override @@ -308,33 +326,28 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return true; } final Set<Relationship> terminatable = undefinedRelationshipsToTerminate.get(); - if (terminatable == null) { - return false; - } - return terminatable.contains(relationship); + return terminatable == null ? false : terminatable.contains(relationship); } @Override public void setAutoTerminatedRelationships(final Set<Relationship> terminate) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } - for (final Relationship rel : terminate) { - if (!getConnections(rel).isEmpty()) { - throw new IllegalStateException("Cannot mark relationship '" + rel.getName() + "' as auto-terminated because Connection already exists with this relationship"); - } + for (final Relationship rel : terminate) { + if (!getConnections(rel).isEmpty()) { + throw new IllegalStateException("Cannot mark relationship '" + rel.getName() + + "' as auto-terminated because Connection already exists with this relationship"); } - undefinedRelationshipsToTerminate.set(new HashSet<>(terminate)); - } finally { - writeLock.unlock(); } + undefinedRelationshipsToTerminate.set(new HashSet<>(terminate)); } /** - * @return an unmodifiable Set that contains all of the ProcessorRelationship objects that are configured to be auto-terminated + * @return an unmodifiable Set that contains all of the + * ProcessorRelationship objects that are configured to be + * auto-terminated */ @Override public Set<Relationship> getAutoTerminatedRelationships() { @@ -346,7 +359,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * @return the value of the processor's {@link CapabilityDescription} annotation, if one exists, else <code>null</code>. + * @return the value of the processor's {@link CapabilityDescription} + * annotation, if one exists, else <code>null</code>. */ @SuppressWarnings("deprecation") public String getProcessorDescription() { @@ -355,7 +369,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable if (capDesc != null) { description = capDesc.value(); } else { - final org.apache.nifi.processor.annotation.CapabilityDescription deprecatedCapDesc = processor.getClass().getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class); + final org.apache.nifi.processor.annotation.CapabilityDescription deprecatedCapDesc = processor.getClass() + .getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class); if (deprecatedCapDesc != null) { description = deprecatedCapDesc.value(); } @@ -366,20 +381,19 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void setName(final String name) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - super.setName(name); - } finally { - writeLock.unlock(); + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } + super.setName(name); } /** - * @param timeUnit determines the unit of time to represent the scheduling period. If null will be reported in units of {@link #DEFAULT_SCHEDULING_TIME_UNIT} - * @return the schedule period that should elapse before subsequent cycles of this processor's tasks + * @param timeUnit + * determines the unit of time to represent the scheduling + * period. If null will be reported in units of + * {@link #DEFAULT_SCHEDULING_TIME_UNIT} + * @return the schedule period that should elapse before subsequent cycles + * of this processor's tasks */ @Override public long getSchedulingPeriod(final TimeUnit timeUnit) { @@ -388,37 +402,32 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public boolean isEventDrivenSupported() { - readLock.lock(); - try { - return this.eventDrivenSupported; - } finally { - readLock.unlock(); - } + return this.eventDrivenSupported; } /** * Updates the Scheduling Strategy used for this Processor * - * @param schedulingStrategy strategy + * @param schedulingStrategy + * strategy * - * @throws IllegalArgumentException if the SchedulingStrategy is not not applicable for this Processor + * @throws IllegalArgumentException + * if the SchedulingStrategy is not not applicable for this + * Processor */ @Override public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) { - writeLock.lock(); - try { - if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !eventDrivenSupported) { - // not valid. Just ignore it. We don't throw an Exception because if a developer changes a Processor so that - // it no longer supports EventDriven mode, we don't want the app to fail to startup if it was already in Event-Driven - // Mode. Instead, we will simply leave it in Timer-Driven mode - return; - } - - this.schedulingStrategy = schedulingStrategy; - setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY); - } finally { - writeLock.unlock(); + if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !eventDrivenSupported) { + // not valid. Just ignore it. We don't throw an Exception because if + // a developer changes a Processor so that + // it no longer supports EventDriven mode, we don't want the app to + // fail to startup if it was already in Event-Driven + // Mode. Instead, we will simply leave it in Timer-Driven mode + return; } + + this.schedulingStrategy = schedulingStrategy; + setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY); } /** @@ -426,12 +435,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable */ @Override public SchedulingStrategy getSchedulingStrategy() { - readLock.lock(); - try { - return this.schedulingStrategy; - } finally { - readLock.unlock(); - } + return this.schedulingStrategy; } @Override @@ -441,63 +445,51 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void setScheduldingPeriod(final String schedulingPeriod) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } - switch (schedulingStrategy) { - case CRON_DRIVEN: { - try { - new CronExpression(schedulingPeriod); - } catch (final Exception e) { - throw new IllegalArgumentException("Scheduling Period is not a valid cron expression: " + schedulingPeriod); - } - } - break; - case PRIMARY_NODE_ONLY: - case TIMER_DRIVEN: { - final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS); - if (schedulingNanos < 0) { - throw new IllegalArgumentException("Scheduling Period must be positive"); - } - this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); - } - break; - case EVENT_DRIVEN: - default: - return; + switch (schedulingStrategy) { + case CRON_DRIVEN: { + try { + new CronExpression(schedulingPeriod); + } catch (final Exception e) { + throw new IllegalArgumentException( + "Scheduling Period is not a valid cron expression: " + schedulingPeriod); } - - this.schedulingPeriod.set(schedulingPeriod); - } finally { - writeLock.unlock(); } + break; + case PRIMARY_NODE_ONLY: + case TIMER_DRIVEN: { + final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), + TimeUnit.NANOSECONDS); + if (schedulingNanos < 0) { + throw new IllegalArgumentException("Scheduling Period must be positive"); + } + this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); + } + break; + case EVENT_DRIVEN: + default: + return; + } + + this.schedulingPeriod.set(schedulingPeriod); } @Override public long getRunDuration(final TimeUnit timeUnit) { - readLock.lock(); - try { - return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS); - } finally { - readLock.unlock(); - } + return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS); } @Override public void setRunDuration(final long duration, final TimeUnit timeUnit) { - writeLock.lock(); - try { - if (duration < 0) { - throw new IllegalArgumentException("Run Duration must be non-negative value; cannot set to " + timeUnit.toSeconds(duration) + " seconds"); - } - - this.runNanos = timeUnit.toNanos(duration); - } finally { - writeLock.unlock(); + if (duration < 0) { + throw new IllegalArgumentException("Run Duration must be non-negative value; cannot set to " + + timeUnit.toSeconds(duration) + " seconds"); } + + this.runNanos = timeUnit.toNanos(duration); } @Override @@ -512,32 +504,32 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void setYieldPeriod(final String yieldPeriod) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); - if (yieldMillis < 0) { - throw new IllegalArgumentException("Yield duration must be positive"); - } - this.yieldPeriod.set(yieldPeriod); - } finally { - writeLock.unlock(); + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } + final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); + if (yieldMillis < 0) { + throw new IllegalArgumentException("Yield duration must be positive"); + } + this.yieldPeriod.set(yieldPeriod); } /** - * Causes the processor not to be scheduled for some period of time. This duration can be obtained and set via the {@link #getYieldPeriod(TimeUnit)} and {@link #setYieldPeriod(long, TimeUnit)} - * methods. + * Causes the processor not to be scheduled for some period of time. This + * duration can be obtained and set via the + * {@link #getYieldPeriod(TimeUnit)} and + * {@link #setYieldPeriod(long, TimeUnit)} methods. */ @Override public void yield() { final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); yield(yieldMillis, TimeUnit.MILLISECONDS); - final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 1000) + " seconds" : yieldMillis + " milliseconds"; - LoggerFactory.getLogger(processor.getClass()).debug("{} has chosen to yield its resources; will not be scheduled to run again for {}", processor, yieldDuration); + final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 1000) + " seconds" + : yieldMillis + " milliseconds"; + LoggerFactory.getLogger(processor.getClass()).debug( + "{} has chosen to yield its resources; will not be scheduled to run again for {}", processor, + yieldDuration); } @Override @@ -549,7 +541,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * @return the number of milliseconds since Epoch at which time this processor is to once again be scheduled. + * @return the number of milliseconds since Epoch at which time this + * processor is to once again be scheduled. */ @Override public long getYieldExpiration() { @@ -568,42 +561,36 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void setPenalizationPeriod(final String penalizationPeriod) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), TimeUnit.MILLISECONDS); - if (penalizationMillis < 0) { - throw new IllegalArgumentException("Penalization duration must be positive"); - } - this.penalizationPeriod.set(penalizationPeriod); - } finally { - writeLock.unlock(); + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), + TimeUnit.MILLISECONDS); + if (penalizationMillis < 0) { + throw new IllegalArgumentException("Penalization duration must be positive"); } + this.penalizationPeriod.set(penalizationPeriod); } /** - * Determines the number of concurrent tasks that may be running for this processor. + * Determines the number of concurrent tasks that may be running for this + * processor. * - * @param taskCount a number of concurrent tasks this processor may have running - * @throws IllegalArgumentException if the given value is less than 1 + * @param taskCount + * a number of concurrent tasks this processor may have running + * @throws IllegalArgumentException + * if the given value is less than 1 */ @Override public void setMaxConcurrentTasks(final int taskCount) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) { - throw new IllegalArgumentException(); - } - if (!triggeredSerially) { - concurrentTaskCount.set(taskCount); - } - } finally { - writeLock.unlock(); + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) { + throw new IllegalArgumentException(); + } + if (!triggeredSerially) { + concurrentTaskCount.set(taskCount); } } @@ -613,7 +600,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * @return the number of tasks that may execute concurrently for this processor + * @return the number of tasks that may execute concurrently for this + * processor */ @Override public int getMaxConcurrentTasks() { @@ -633,13 +621,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public Set<Connection> getConnections() { final Set<Connection> allConnections = new HashSet<>(); - readLock.lock(); - try { - for (final Set<Connection> connectionSet : connections.values()) { - allConnections.addAll(connectionSet); - } - } finally { - readLock.unlock(); + for (final Set<Connection> connectionSet : connections.values()) { + allConnections.addAll(connectionSet); } return allConnections; @@ -652,14 +635,9 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public Set<Connection> getConnections(final Relationship relationship) { - final Set<Connection> applicableConnections; - readLock.lock(); - try { - applicableConnections = connections.get(relationship); - } finally { - readLock.unlock(); - } - return (applicableConnections == null) ? Collections.<Connection> emptySet() : Collections.unmodifiableSet(applicableConnections); + Set<Connection> applicableConnections = connections.get(relationship); + return (applicableConnections == null) ? Collections.<Connection> emptySet() + : Collections.unmodifiableSet(applicableConnections); } @Override @@ -667,52 +645,52 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable Objects.requireNonNull(connection, "connection cannot be null"); if (!connection.getSource().equals(this) && !connection.getDestination().equals(this)) { - throw new IllegalStateException("Cannot a connection to a ProcessorNode for which the ProcessorNode is neither the Source nor the Destination"); + throw new IllegalStateException( + "Cannot a connection to a ProcessorNode for which the ProcessorNode is neither the Source nor the Destination"); } - writeLock.lock(); - try { - List<Connection> updatedIncoming = null; - if (connection.getDestination().equals(this)) { - // don't add the connection twice. This may occur if we have a self-loop because we will be told - // to add the connection once because we are the source and again because we are the destination. - final List<Connection> incomingConnections = incomingConnectionsRef.get(); - updatedIncoming = new ArrayList<>(incomingConnections); - if (!updatedIncoming.contains(connection)) { - updatedIncoming.add(connection); - } + List<Connection> updatedIncoming = null; + if (connection.getDestination().equals(this)) { + // don't add the connection twice. This may occur if we have a + // self-loop because we will be told + // to add the connection once because we are the source and again + // because we are the destination. + final List<Connection> incomingConnections = incomingConnectionsRef.get(); + updatedIncoming = new ArrayList<>(incomingConnections); + if (!updatedIncoming.contains(connection)) { + updatedIncoming.add(connection); } + } - if (connection.getSource().equals(this)) { - // don't add the connection twice. This may occur if we have a self-loop because we will be told - // to add the connection once because we are the source and again because we are the destination. - if (!destinations.containsKey(connection)) { - for (final Relationship relationship : connection.getRelationships()) { - final Relationship rel = getRelationship(relationship.getName()); - Set<Connection> set = connections.get(rel); - if (set == null) { - set = new HashSet<>(); - connections.put(rel, set); - } + if (connection.getSource().equals(this)) { + // don't add the connection twice. This may occur if we have a + // self-loop because we will be told + // to add the connection once because we are the source and again + // because we are the destination. + if (!destinations.containsKey(connection)) { + for (final Relationship relationship : connection.getRelationships()) { + final Relationship rel = getRelationship(relationship.getName()); + Set<Connection> set = connections.get(rel); + if (set == null) { + set = new HashSet<>(); + connections.put(rel, set); + } - set.add(connection); + set.add(connection); - destinations.put(connection, connection.getDestination()); - } + destinations.put(connection, connection.getDestination()); + } - final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); - if (autoTerminated != null) { - autoTerminated.removeAll(connection.getRelationships()); - this.undefinedRelationshipsToTerminate.set(autoTerminated); - } + final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); + if (autoTerminated != null) { + autoTerminated.removeAll(connection.getRelationships()); + this.undefinedRelationshipsToTerminate.set(autoTerminated); } } + } - if (updatedIncoming != null) { - incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); - } - } finally { - writeLock.unlock(); + if (updatedIncoming != null) { + incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); } } @@ -724,74 +702,68 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void updateConnection(final Connection connection) throws IllegalStateException { if (requireNonNull(connection).getSource().equals(this)) { - writeLock.lock(); - try { - // - // update any relationships - // - // first check if any relations were removed. - final List<Relationship> existingRelationships = new ArrayList<>(); - for (final Map.Entry<Relationship, Set<Connection>> entry : connections.entrySet()) { - if (entry.getValue().contains(connection)) { - existingRelationships.add(entry.getKey()); - } + // update any relationships + // + // first check if any relations were removed. + final List<Relationship> existingRelationships = new ArrayList<>(); + for (final Map.Entry<Relationship, Set<Connection>> entry : connections.entrySet()) { + if (entry.getValue().contains(connection)) { + existingRelationships.add(entry.getKey()); } + } - for (final Relationship rel : connection.getRelationships()) { - if (!existingRelationships.contains(rel)) { - // relationship was removed. Check if this is legal. - final Set<Connection> connectionsForRelationship = getConnections(rel); - if (connectionsForRelationship != null && connectionsForRelationship.size() == 1 && this.isRunning() && !isAutoTerminated(rel) && getRelationships().contains(rel)) { - // if we are running and we do not terminate undefined relationships and this is the only - // connection that defines the given relationship, and that relationship is required, - // then it is not legal to remove this relationship from this connection. - throw new IllegalStateException("Cannot remove relationship " + rel.getName() + " from Connection because doing so would invalidate Processor " - + this + ", which is currently running"); - } + for (final Relationship rel : connection.getRelationships()) { + if (!existingRelationships.contains(rel)) { + // relationship was removed. Check if this is legal. + final Set<Connection> connectionsForRelationship = getConnections(rel); + if (connectionsForRelationship != null && connectionsForRelationship.size() == 1 && this.isRunning() + && !isAutoTerminated(rel) && getRelationships().contains(rel)) { + // if we are running and we do not terminate undefined + // relationships and this is the only + // connection that defines the given relationship, and + // that relationship is required, + // then it is not legal to remove this relationship from + // this connection. + throw new IllegalStateException("Cannot remove relationship " + rel.getName() + + " from Connection because doing so would invalidate Processor " + this + + ", which is currently running"); } } + } - // remove the connection from any list that currently contains - for (final Set<Connection> list : connections.values()) { - list.remove(connection); - } + // remove the connection from any list that currently contains + for (final Set<Connection> list : connections.values()) { + list.remove(connection); + } - // add the connection in for all relationships listed. - for (final Relationship rel : connection.getRelationships()) { - Set<Connection> set = connections.get(rel); - if (set == null) { - set = new HashSet<>(); - connections.put(rel, set); - } - set.add(connection); + // add the connection in for all relationships listed. + for (final Relationship rel : connection.getRelationships()) { + Set<Connection> set = connections.get(rel); + if (set == null) { + set = new HashSet<>(); + connections.put(rel, set); } + set.add(connection); + } - // update to the new destination - destinations.put(connection, connection.getDestination()); + // update to the new destination + destinations.put(connection, connection.getDestination()); - final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); - if (autoTerminated != null) { - autoTerminated.removeAll(connection.getRelationships()); - this.undefinedRelationshipsToTerminate.set(autoTerminated); - } - } finally { - writeLock.unlock(); + final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); + if (autoTerminated != null) { + autoTerminated.removeAll(connection.getRelationships()); + this.undefinedRelationshipsToTerminate.set(autoTerminated); } } if (connection.getDestination().equals(this)) { - writeLock.lock(); - try { - // update our incoming connections -- we can just remove & re-add the connection to - // update the list. - final List<Connection> incomingConnections = incomingConnectionsRef.get(); - final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); - updatedIncoming.remove(connection); - updatedIncoming.add(connection); - incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); - } finally { - writeLock.unlock(); - } + // update our incoming connections -- we can just remove & re-add + // the connection to update the list. + final List<Connection> incomingConnections = incomingConnectionsRef.get(); + final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); + updatedIncoming.remove(connection); + updatedIncoming.add(connection); + incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); } } @@ -803,45 +775,39 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable for (final Relationship relationship : connection.getRelationships()) { final Set<Connection> connectionsForRelationship = getConnections(relationship); if ((connectionsForRelationship == null || connectionsForRelationship.size() <= 1) && isRunning()) { - throw new IllegalStateException("This connection cannot be removed because its source is running and removing it will invalidate this processor"); + throw new IllegalStateException( + "This connection cannot be removed because its source is running and removing it will invalidate this processor"); } } - writeLock.lock(); - try { - for (final Set<Connection> connectionList : this.connections.values()) { - connectionList.remove(connection); - } - - connectionRemoved = (destinations.remove(connection) != null); - } finally { - writeLock.unlock(); + for (final Set<Connection> connectionList : this.connections.values()) { + connectionList.remove(connection); } + + connectionRemoved = (destinations.remove(connection) != null); } if (connection.getDestination().equals(this)) { - writeLock.lock(); - try { - final List<Connection> incomingConnections = incomingConnectionsRef.get(); - if (incomingConnections.contains(connection)) { - final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); - updatedIncoming.remove(connection); - incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); - return; - } - } finally { - writeLock.unlock(); + final List<Connection> incomingConnections = incomingConnectionsRef.get(); + if (incomingConnections.contains(connection)) { + final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); + updatedIncoming.remove(connection); + incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); + return; } } if (!connectionRemoved) { - throw new IllegalArgumentException("Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source"); + throw new IllegalArgumentException( + "Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source"); } } /** - * @param relationshipName name - * @return the relationship for this nodes processor for the given name or creates a new relationship for the given name + * @param relationshipName + * name + * @return the relationship for this nodes processor for the given name or + * creates a new relationship for the given name */ @Override public Relationship getRelationship(final String relationshipName) { @@ -868,59 +834,45 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * @return the Set of destination processors for all relationships excluding any destinations that are this processor itself (self-loops) + * @return the Set of destination processors for all relationships excluding + * any destinations that are this processor itself (self-loops) */ public Set<Connectable> getDestinations() { final Set<Connectable> nonSelfDestinations = new HashSet<>(); - readLock.lock(); - try { - for (final Connectable connectable : destinations.values()) { - if (connectable != this) { - nonSelfDestinations.add(connectable); - } + for (final Connectable connectable : destinations.values()) { + if (connectable != this) { + nonSelfDestinations.add(connectable); } - } finally { - readLock.unlock(); } return nonSelfDestinations; } public Set<Connectable> getDestinations(final Relationship relationship) { - readLock.lock(); - try { - final Set<Connectable> destinationSet = new HashSet<>(); - final Set<Connection> relationshipConnections = connections.get(relationship); - if (relationshipConnections != null) { - for (final Connection connection : relationshipConnections) { - destinationSet.add(destinations.get(connection)); - } + final Set<Connectable> destinationSet = new HashSet<>(); + final Set<Connection> relationshipConnections = connections.get(relationship); + if (relationshipConnections != null) { + for (final Connection connection : relationshipConnections) { + destinationSet.add(destinations.get(connection)); } - return destinationSet; - } finally { - readLock.unlock(); } + return destinationSet; } public Set<Relationship> getUndefinedRelationships() { final Set<Relationship> undefined = new HashSet<>(); - readLock.lock(); - try { - final Set<Relationship> relationships; - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - relationships = processor.getRelationships(); - } + final Set<Relationship> relationships; + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + relationships = processor.getRelationships(); + } - if (relationships == null) { - return undefined; - } - for (final Relationship relation : relationships) { - final Set<Connection> connectionSet = this.connections.get(relation); - if (connectionSet == null || connectionSet.isEmpty()) { - undefined.add(relation); - } + if (relationships == null) { + return undefined; + } + for (final Relationship relation : relationships) { + final Set<Connection> connectionSet = this.connections.get(relation); + if (connectionSet == null || connectionSet.isEmpty()) { + undefined.add(relation); } - } finally { - readLock.unlock(); } return undefined; } @@ -928,36 +880,22 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable /** * Determines if the given node is a destination for this node * - * @param node node + * @param node + * node * @return true if is a direct destination node; false otherwise */ boolean isRelated(final ProcessorNode node) { - readLock.lock(); - try { - return this.destinations.containsValue(node); - } finally { - readLock.unlock(); - } + return this.destinations.containsValue(node); } @Override public boolean isRunning() { - readLock.lock(); - try { - return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0; - } finally { - readLock.unlock(); - } + return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0; } @Override public int getActiveThreadCount() { - readLock.lock(); - try { - return processScheduler.getActiveThreadCount(this); - } finally { - readLock.unlock(); - } + return processScheduler.getActiveThreadCount(this); } List<Connection> getIncomingNonLoopConnections() { @@ -974,14 +912,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public boolean isValid() { - readLock.lock(); try { - final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData()); + final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), + getAnnotationData()); final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - validationResults = getProcessor().validate(validationContext); - } + validationResults = getProcessor().validate(validationContext); for (final ValidationResult result : validationResults) { if (!result.isValid()) { @@ -996,36 +932,34 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } switch (getInputRequirement()) { - case INPUT_ALLOWED: - break; - case INPUT_FORBIDDEN: { - if (!getIncomingNonLoopConnections().isEmpty()) { - return false; - } - break; + case INPUT_ALLOWED: + break; + case INPUT_FORBIDDEN: { + if (!getIncomingNonLoopConnections().isEmpty()) { + return false; } - case INPUT_REQUIRED: { - if (getIncomingNonLoopConnections().isEmpty()) { - return false; - } - break; + break; + } + case INPUT_REQUIRED: { + if (getIncomingNonLoopConnections().isEmpty()) { + return false; } + break; + } } } catch (final Throwable t) { + LOG.warn("Failed during validation", t); return false; - } finally { - readLock.unlock(); } - return true; } @Override public Collection<ValidationResult> getValidationErrors() { final List<ValidationResult> results = new ArrayList<>(); - readLock.lock(); try { - final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData()); + final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), + getAnnotationData()); final Collection<ValidationResult> validationResults; try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { @@ -1041,43 +975,37 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable for (final Relationship relationship : getUndefinedRelationships()) { if (!isAutoTerminated(relationship)) { final ValidationResult error = new ValidationResult.Builder() - .explanation("Relationship '" + relationship.getName() + "' is not connected to any component and is not auto-terminated") - .subject("Relationship " + relationship.getName()) - .valid(false) - .build(); + .explanation("Relationship '" + relationship.getName() + + "' is not connected to any component and is not auto-terminated") + .subject("Relationship " + relationship.getName()).valid(false).build(); results.add(error); } } switch (getInputRequirement()) { - case INPUT_ALLOWED: - break; - case INPUT_FORBIDDEN: { - final int incomingConnCount = getIncomingNonLoopConnections().size(); - if (incomingConnCount != 0) { - results.add(new ValidationResult.Builder() - .explanation("Processor does not allow upstream connections but currently has " + incomingConnCount) - .subject("Upstream Connections") - .valid(false) - .build()); - } - break; + case INPUT_ALLOWED: + break; + case INPUT_FORBIDDEN: { + final int incomingConnCount = getIncomingNonLoopConnections().size(); + if (incomingConnCount != 0) { + results.add(new ValidationResult.Builder().explanation( + "Processor does not allow upstream connections but currently has " + incomingConnCount) + .subject("Upstream Connections").valid(false).build()); } - case INPUT_REQUIRED: { - if (getIncomingNonLoopConnections().isEmpty()) { - results.add(new ValidationResult.Builder() + break; + } + case INPUT_REQUIRED: { + if (getIncomingNonLoopConnections().isEmpty()) { + results.add(new ValidationResult.Builder() .explanation("Processor requires an upstream connection but currently has none") - .subject("Upstream Connections") - .valid(false) - .build()); - } - break; + .subject("Upstream Connections").valid(false).build()); } + break; + } } } catch (final Throwable t) { - results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build()); - } finally { - readLock.unlock(); + results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()) + .valid(false).build()); } return results; } @@ -1090,7 +1018,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable /** * Establishes node equality (based on the processor's identifier) * - * @param other node + * @param other + * node * @return true if equal */ @Override @@ -1128,18 +1057,15 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void setProcessGroup(final ProcessGroup group) { - writeLock.lock(); - try { - this.processGroup.set(group); - } finally { - writeLock.unlock(); - } + this.processGroup.set(group); } @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { processor.onTrigger(context, sessionFactory); + } catch (Exception ex) { + ex.printStackTrace(); } } @@ -1149,25 +1075,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setScheduledState(final ScheduledState scheduledState) { - this.scheduledState.set(scheduledState); - if (!scheduledState.equals(ScheduledState.RUNNING)) { // if user stops processor, clear yield expiration - yieldExpiration.set(0L); - } - } - - @Override public void setAnnotationData(final String data) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot set AnnotationData while processor is running"); - } - - this.annotationData.set(data); - } finally { - writeLock.unlock(); + if (isRunning()) { + throw new IllegalStateException("Cannot set AnnotationData while processor is running"); } + + this.annotationData.set(data); } @Override @@ -1187,75 +1100,55 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void verifyCanDelete(final boolean ignoreConnections) { - readLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException(this + " is running"); - } + if (isRunning()) { + throw new IllegalStateException(this + " is running"); + } - if (!ignoreConnections) { - for (final Set<Connection> connectionSet : connections.values()) { - for (final Connection connection : connectionSet) { - connection.verifyCanDelete(); - } + if (!ignoreConnections) { + for (final Set<Connection> connectionSet : connections.values()) { + for (final Connection connection : connectionSet) { + connection.verifyCanDelete(); } + } - for (final Connection connection : incomingConnectionsRef.get()) { - if (connection.getSource().equals(this)) { - connection.verifyCanDelete(); - } else { - throw new IllegalStateException(this + " is the destination of another component"); - } + for (final Connection connection : incomingConnectionsRef.get()) { + if (connection.getSource().equals(this)) { + connection.verifyCanDelete(); + } else { + throw new IllegalStateException(this + " is the destination of another component"); } } - } finally { - readLock.unlock(); } } @Override public void verifyCanStart() { - readLock.lock(); - try { - switch (getScheduledState()) { - case DISABLED: - throw new IllegalStateException(this + " cannot be started because it is disabled"); - case RUNNING: - throw new IllegalStateException(this + " cannot be started because it is already running"); - case STOPPED: - break; - } - verifyNoActiveThreads(); - - if (!isValid()) { - throw new IllegalStateException(this + " is not in a valid state"); - } - } finally { - readLock.unlock(); - } + this.verifyCanStart(null); } @Override public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) { - switch (getScheduledState()) { - case DISABLED: - throw new IllegalStateException(this + " cannot be started because it is disabled"); - case RUNNING: - throw new IllegalStateException(this + " cannot be started because it is already running"); - case STOPPED: - break; + if (this.getScheduledState() == ScheduledState.RUNNING) { + throw new IllegalStateException(this + " cannot be started because it is already running"); } + verifyNoActiveThreads(); - final Set<String> ids = new HashSet<>(); - for (final ControllerServiceNode node : ignoredReferences) { - ids.add(node.getIdentifier()); - } + if (ignoredReferences != null) { + final Set<String> ids = new HashSet<>(); + for (final ControllerServiceNode node : ignoredReferences) { + ids.add(node.getIdentifier()); + } - final Collection<ValidationResult> validationResults = getValidationErrors(ids); - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - throw new IllegalStateException(this + " cannot be started because it is not valid: " + result); + final Collection<ValidationResult> validationResults = getValidationErrors(ids); + for (final ValidationResult result : validationResults) { + if (!result.isValid()) { + throw new IllegalStateException(this + " cannot be started because it is not valid: " + result); + } + } + } else { + if (!isValid()) { + throw new IllegalStateException(this + " is not in a valid state"); } } } @@ -1269,41 +1162,26 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void verifyCanUpdate() { - readLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException(this + " is not stopped"); - } - } finally { - readLock.unlock(); + if (isRunning()) { + throw new IllegalStateException(this + " is not stopped"); } } @Override public void verifyCanEnable() { - readLock.lock(); - try { - if (getScheduledState() != ScheduledState.DISABLED) { - throw new IllegalStateException(this + " is not disabled"); - } - - verifyNoActiveThreads(); - } finally { - readLock.unlock(); + if (getScheduledState() != ScheduledState.DISABLED) { + throw new IllegalStateException(this + " is not disabled"); } + + verifyNoActiveThreads(); } @Override public void verifyCanDisable() { - readLock.lock(); - try { - if (getScheduledState() != ScheduledState.STOPPED) { - throw new IllegalStateException(this + " is not stopped"); - } - verifyNoActiveThreads(); - } finally { - readLock.unlock(); + if (getScheduledState() != ScheduledState.STOPPED) { + throw new IllegalStateException(this + " is not stopped"); } + verifyNoActiveThreads(); } @Override @@ -1324,4 +1202,195 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } } + + /** + * Will idempotently start the processor using the following sequence: <i> + * <ul> + * <li>Validate Processor's state (e.g., PropertyDescriptors, + * ControllerServices etc.)</li> + * <li>Transition (atomically) Processor's scheduled state form STOPPED to + * STARTING. If the above state transition succeeds, then execute the start + * task (asynchronously) which will be re-tried until @OnScheduled is + * executed successfully and "schedulingAgentCallback' is invoked, or until + * STOP operation is initiated on this processor. If state transition fails + * it means processor is already being started and WARN message will be + * logged explaining it.</li> + * </ul> + * </i> + * <p> + * Any exception thrown while invoking operations annotated with @OnSchedule + * will be caught and logged after which @OnUnscheduled operation will be + * invoked (quietly) and the start sequence will be repeated (re-try) after + * delay provided by 'administrativeYieldMillis'. + * </p> + * <p> + * Upon successful completion of start sequence (@OnScheduled -> + * 'schedulingAgentCallback') the attempt will be made to transition + * processor's scheduling state to RUNNING at which point processor is + * considered to be fully started and functioning. If upon successful + * invocation of @OnScheduled operation the processor can not be + * transitioned to RUNNING state (e.g., STOP operation was invoked on the + * processor while it's @OnScheduled operation was executing), the + * processor's @OnUnscheduled operation will be invoked and its scheduling + * state will be set to STOPPED at which point the processor is considered + * to be fully stopped. + * </p> + */ + @Override + public <T extends ProcessContext & ControllerServiceLookup> void start(final ScheduledExecutorService taskScheduler, + final long administrativeYieldMillis, final T processContext, final Runnable schedulingAgentCallback) { + if (!this.isValid()) { + throw new IllegalStateException( "Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors()); + } + + if (this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING)) { // will ensure that the Processor represented by this node can only be started once + final Runnable startProcRunnable = new Runnable() { + @Override + public void run() { + try { + SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, getControllerServiceProvider(), + StandardProcessorNode.this, processContext.getStateManager()); + invokeOnScheduleAsync(taskScheduler, schedulingContext); + if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { + schedulingAgentCallback.run(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle + } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); + scheduledState.set(ScheduledState.STOPPED); + } + } catch (final Exception e) { + final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; + final ProcessorLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); + + procLog.error( "{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}", + new Object[] { StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis + " milliseconds" }, cause); + LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause); + + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); + if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated + taskScheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS); + } else { + scheduledState.set(ScheduledState.STOPPED); + } + } + } + }; + taskScheduler.execute(startProcRunnable); + } else { + LOG.warn("Can not start Processor since it's already in the process of being started"); + } + } + + /** + * Will idempotently stop the processor using the following sequence: <i> + * <ul> + * <li>Transition (atomically) Processor's scheduled state form RUNNING to + * STOPPING. If the above state transition succeeds, then execute the stop + * task (asynchronously) where 'activeThreadMonitorCallback' provided by the + * {@link ProcessScheduler} will be called to check if this processor still + * has active threads. If it does, the task will be re-scheduled with delay + * of 100 milliseconds until there are no more active threads, at which + * point processor's @OnUnscheduled and @OnStopped operation will be invoked + * and its scheduled state is set to STOPPED which completes processor stop + * sequence.</li> + * </ul> + * </i> + * <p> + * If for some reason processor's scheduled state can not be transitioned to + * STOPPING (e.g., the processor didn't finish @OnScheduled operation when + * stop was called), the attempt will be made to transition processor's + * scheduled state from STARTING to STOPPING which will allow + * {@link #start(ScheduledExecutorService, long, ProcessContext, Runnable)} + * method to initiate processor's shutdown upon exiting @OnScheduled + * operation, otherwise the processor's scheduled state will remain + * unchanged ensuring that multiple calls to this method are idempotent. + * </p> + */ + @Override + public <T extends ProcessContext & ControllerServiceLookup> void stop(final ScheduledExecutorService scheduler, + final T processContext, final Callable<Boolean> activeThreadMonitorCallback) { + if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once + final Runnable stopProcRunnable = new Runnable() { + @Override + public void run() { + try { + if (activeThreadMonitorCallback.call()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); + scheduledState.set(ScheduledState.STOPPED); + } else { + scheduler.schedule(this, 100, TimeUnit.MILLISECONDS); + } + } catch (Exception e) { + LOG.warn("Failed while shutting down processor " + processor, e); + } + } + }; + scheduler.execute(stopProcRunnable); + } else { + /* + * We do compareAndSet() instead of set() to ensure that Processor + * stoppage is handled consistently including a condition where + * Processor never got a chance to transition to RUNNING state + * before stop() was called. If that happens the stop processor + * routine will be initiated in start() method, otherwise the IF + * part will handle the stop processor routine. + */ + this.scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.STOPPING); + } + } + + /** + * Will invoke processor's methods annotated with @OnSchedule asynchronously + * to ensure that it could be interrupted if stop action was initiated on + * the processor that may be sitting in the infinitely blocking @OnSchedule + * operation. While this approach paves the way for further enhancements + * related to managing processor'slife-cycle operation at the moment the + * interrupt will not happen automatically. This is primarily to preserve + * the existing behavior or the NiFi where stop operation can only be + * invoked once the processor is started. Unfortunately that could mean that + * the processor may be blocking indefinitely in the @Oncheduled call. To + * deal with that a new NiFi property has been introduced + * <i>nifi.processor.start.timeout</i> which allows one to set the time (in + * milliseconds) of how long to wait before canceling the @OnScheduled task + * allowing processor's stop sequence to proceed. The default value for this + * property is {@link Long#MAX_VALUE}. + * <p> + * NOTE: Canceling the task does not guarantee that the task will actually + * completes (successfully or otherwise), since cancellation of the task + * will issue a simple Thread.interrupt(). However code inside + * of @OnScheduled operation is written purely and will ignore thread + * interrupts you may end up with runaway thread which may eventually + * require NiFi reboot. In any event, the above explanation will be logged + * (WARN) informing a user so further actions could be taken. + * </p> + */ + private void invokeOnScheduleAsync(ScheduledExecutorService taskScheduler, final SchedulingContext schedulingContext) throws ExecutionException { + Future<Void> executionResult = taskScheduler.submit(new Callable<Void>() { + @SuppressWarnings("deprecation") + @Override + public Void call() throws Exception { + ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, processor, schedulingContext); + return null; + } + }); + + long onScheduleTimeout = Long.parseLong(NiFiProperties.getInstance() + .getProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, String.valueOf(Long.MAX_VALUE))); + try { + executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName() + + "' @OnSchedule operation to finish."); + Thread.currentThread().interrupt(); + } catch (TimeoutException e) { + executionResult.cancel(true); + LOG.warn("Timed out while waiting for the task executing @OnSchedule operation for '" + + this.processor.getClass().getSimpleName() + + "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not " + + "guarantee that the task will be canceled since the code inside @OnSchedule method may " + + "have been written to ignore interrupts which may result in runaway thread which could lead to more issues " + + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" + + this.processor + "' that needs to be documented, reported and eventually fixed."); + } + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java new file mode 100644 index 0000000..b931c64 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.scheduling; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.controller.ReportingTaskNode; + +/** + * Base implementation of the {@link SchedulingAgent} which encapsulates the + * updates to the {@link ScheduleState} based on invoked operation and then + * delegates to the corresponding 'do' methods. For example; By invoking + * {@link #schedule(Connectable, ScheduleState)} the the + * {@link ScheduleState#setScheduled(boolean)} with value 'true' will be + * invoked. + * + * @see EventDrivenSchedulingAgent + * @see TimerDrivenSchedulingAgent + * @see QuartzSchedulingAgent + */ +abstract class AbstractSchedulingAgent implements SchedulingAgent { + + @Override + public void schedule(Connectable connectable, ScheduleState scheduleState) { + scheduleState.setScheduled(true); + this.doSchedule(connectable, scheduleState); + } + + @Override + public void unschedule(Connectable connectable, ScheduleState scheduleState) { + scheduleState.setScheduled(false); + this.doUnschedule(connectable, scheduleState); + } + + @Override + public void schedule(ReportingTaskNode taskNode, ScheduleState scheduleState) { + scheduleState.setScheduled(true); + this.doSchedule(taskNode, scheduleState); + } + + @Override + public void unschedule(ReportingTaskNode taskNode, ScheduleState scheduleState) { + scheduleState.setScheduled(false); + this.doUnschedule(taskNode, scheduleState); + } + + /** + * Schedules the provided {@link Connectable}. Its {@link ScheduleState} + * will be set to <i>true</i> + * + * @param connectable + * the instance of {@link Connectable} + * @param scheduleState + * the instance of {@link ScheduleState} + */ + protected abstract void doSchedule(Connectable connectable, ScheduleState scheduleState); + + /** + * Unschedules the provided {@link Connectable}. Its {@link ScheduleState} + * will be set to <i>false</i> + * + * @param connectable + * the instance of {@link Connectable} + * @param scheduleState + * the instance of {@link ScheduleState} + */ + protected abstract void doUnschedule(Connectable connectable, ScheduleState scheduleState); + + /** + * Schedules the provided {@link ReportingTaskNode}. Its + * {@link ScheduleState} will be set to <i>true</i> + * + * @param connectable + * the instance of {@link ReportingTaskNode} + * @param scheduleState + * the instance of {@link ScheduleState} + */ + protected abstract void doSchedule(ReportingTaskNode connectable, ScheduleState scheduleState); + + /** + * Unschedules the provided {@link ReportingTaskNode}. Its + * {@link ScheduleState} will be set to <i>false</i> + * + * @param connectable + * the instance of {@link ReportingTaskNode} + * @param scheduleState + * the instance of {@link ScheduleState} + */ + protected abstract void doUnschedule(ReportingTaskNode connectable, ScheduleState scheduleState); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 76d7c08..37cab01 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -51,7 +51,7 @@ import org.apache.nifi.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class EventDrivenSchedulingAgent implements SchedulingAgent { +public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class); @@ -94,24 +94,24 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { } @Override - public void schedule(final ReportingTaskNode taskNode, ScheduleState scheduleState) { + public void doSchedule(final ReportingTaskNode taskNode, ScheduleState scheduleState) { throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode"); } @Override - public void unschedule(ReportingTaskNode taskNode, ScheduleState scheduleState) { + public void doUnschedule(ReportingTaskNode taskNode, ScheduleState scheduleState) { throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode"); } @Override - public void schedule(final Connectable connectable, final ScheduleState scheduleState) { + public void doSchedule(final Connectable connectable, final ScheduleState scheduleState) { workerQueue.resumeWork(connectable); logger.info("Scheduled {} to run in Event-Driven mode", connectable); scheduleStates.put(connectable, scheduleState); } @Override - public void unschedule(final Connectable connectable, final ScheduleState scheduleState) { + public void doUnschedule(final Connectable connectable, final ScheduleState scheduleState) { workerQueue.suspendWork(connectable); logger.info("Stopped scheduling {} to run", connectable); }
