http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java deleted file mode 100644 index 071be4d..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ /dev/null @@ -1,1243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import org.apache.nifi.controller.ProcessScheduler; -import org.apache.nifi.controller.ValidationContextFactory; -import org.apache.nifi.controller.ProcessorNode; -import static java.util.Objects.requireNonNull; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.ConnectableType; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.connectable.Position; -import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.logging.LogLevel; -import org.apache.nifi.logging.LogRepositoryFactory; -import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.TriggerSerially; -import org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable; -import org.apache.nifi.processor.annotation.TriggerWhenEmpty; -import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.util.FormatUtils; - -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.quartz.CronExpression; -import org.slf4j.LoggerFactory; - -/** - * ProcessorNode provides thread-safe access to a FlowFileProcessor as it exists - * within a controlled flow. This node keeps track of the processor, its - * scheduling information and its relationships to other processors and whatever - * scheduled futures exist for it. Must be thread safe. - * - * @author none - */ -public class StandardProcessorNode extends ProcessorNode implements Connectable { - - public static final String BULLETIN_OBSERVER_ID = "bulletin-observer"; - - public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; - public static final String DEFAULT_YIELD_PERIOD = "1 sec"; - public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec"; - private final AtomicReference<ProcessGroup> processGroup; - private final Processor processor; - private final AtomicReference<String> identifier; - private final Map<Connection, Connectable> destinations; - private final Map<Relationship, Set<Connection>> connections; - private final AtomicReference<Set<Relationship>> undefinedRelationshipsToTerminate; - private final AtomicReference<List<Connection>> incomingConnectionsRef; - private final ReentrantReadWriteLock rwLock; - private final Lock readLock; - private final Lock writeLock; - private final AtomicBoolean isolated; - private final AtomicBoolean lossTolerant; - private final AtomicReference<ScheduledState> scheduledState; - private final AtomicReference<String> comments; - private final AtomicReference<String> name; - private final AtomicReference<Position> position; - private final AtomicReference<String> annotationData; - private final AtomicReference<String> schedulingPeriod; // stored as string so it's presented to user as they entered it - private final AtomicReference<String> yieldPeriod; - private final AtomicReference<String> penalizationPeriod; - private final AtomicReference<Map<String, String>> style; - private final AtomicInteger concurrentTaskCount; - private final AtomicLong yieldExpiration; - private final AtomicLong schedulingNanos; - private final boolean triggerWhenEmpty; - private final boolean sideEffectFree; - private final boolean triggeredSerially; - private final boolean triggerWhenAnyDestinationAvailable; - private final boolean eventDrivenSupported; - private final boolean batchSupported; - private final ValidationContextFactory validationContextFactory; - private final ProcessScheduler processScheduler; - private long runNanos = 0L; - - private SchedulingStrategy schedulingStrategy; // guarded by read/write lock - - StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory, - final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider) { - super(processor, uuid, validationContextFactory, controllerServiceProvider); - - this.processor = processor; - identifier = new AtomicReference<>(uuid); - destinations = new HashMap<>(); - connections = new HashMap<>(); - incomingConnectionsRef = new AtomicReference<List<Connection>>(new ArrayList<Connection>()); - scheduledState = new AtomicReference<>(ScheduledState.STOPPED); - rwLock = new ReentrantReadWriteLock(false); - readLock = rwLock.readLock(); - writeLock = rwLock.writeLock(); - lossTolerant = new AtomicBoolean(false); - final Set<Relationship> emptySetOfRelationships = new HashSet<>(); - undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships); - comments = new AtomicReference<>(""); - name = new AtomicReference<>(processor.getClass().getSimpleName()); - schedulingPeriod = new AtomicReference<>("0 sec"); - schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS); - yieldPeriod = new AtomicReference<>(DEFAULT_YIELD_PERIOD); - yieldExpiration = new AtomicLong(0L); - concurrentTaskCount = new AtomicInteger(1); - position = new AtomicReference<>(new Position(0D, 0D)); - style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<String, String>())); - this.processGroup = new AtomicReference<>(); - processScheduler = scheduler; - annotationData = new AtomicReference<>(); - isolated = new AtomicBoolean(false); - penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); - - triggerWhenEmpty = processor.getClass().isAnnotationPresent(TriggerWhenEmpty.class); - sideEffectFree = processor.getClass().isAnnotationPresent(SideEffectFree.class); - batchSupported = processor.getClass().isAnnotationPresent(SupportsBatching.class); - triggeredSerially = processor.getClass().isAnnotationPresent(TriggerSerially.class); - triggerWhenAnyDestinationAvailable = processor.getClass().isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class); - this.validationContextFactory = validationContextFactory; - eventDrivenSupported = processor.getClass().isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty; - schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; - } - - /** - * @return comments about this specific processor instance - */ - @Override - public String getComments() { - return comments.get(); - } - - /** - * Provides and opportunity to retain information about this particular - * processor instance - * - * @param comments - */ - @Override - public void setComments(final String comments) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - this.comments.set(comments); - } finally { - writeLock.unlock(); - } - } - - @Override - public ScheduledState getScheduledState() { - return scheduledState.get(); - } - - @Override - public Position getPosition() { - return position.get(); - } - - @Override - public void setPosition(Position position) { - this.position.set(position); - } - - public Map<String, String> getStyle() { - return style.get(); - } - - public void setStyle(final Map<String, String> style) { - if (style != null) { - this.style.set(Collections.unmodifiableMap(new HashMap<>(style))); - } - } - - @Override - public String getIdentifier() { - return identifier.get(); - } - - /** - * @return if true flow file content generated by this processor is - * considered loss tolerant - */ - @Override - public boolean isLossTolerant() { - return lossTolerant.get(); - } - - /** - * @return if true processor runs only on the primary node - */ - public boolean isIsolated() { - return isolated.get(); - } - - /** - * @return true if the processor has the {@link TriggerWhenEmpty} - * annotation, false otherwise. - */ - @Override - public boolean isTriggerWhenEmpty() { - return triggerWhenEmpty; - } - - /** - * @return true if the processor has the {@link SideEffectFree} annotation, - * false otherwise. - */ - public boolean isSideEffectFree() { - return sideEffectFree; - } - - @Override - public boolean isHighThroughputSupported() { - return batchSupported; - } - - /** - * @return true if the processor has the - * {@link TriggerWhenAnyDestinationAvailable} annotation, false otherwise. - */ - public boolean isTriggerWhenAnyDestinationAvailable() { - return triggerWhenAnyDestinationAvailable; - } - - /** - * Indicates whether flow file content made by this processor must be - * persisted - * - * @param lossTolerant - */ - @Override - public void setLossTolerant(final boolean lossTolerant) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - this.lossTolerant.set(lossTolerant); - } finally { - writeLock.unlock(); - } - } - - /** - * Indicates whether the processor runs on only the primary node. - * - * @param isolated - */ - public void setIsolated(final boolean isolated) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - this.isolated.set(isolated); - } finally { - writeLock.unlock(); - } - } - - @Override - public boolean isAutoTerminated(final Relationship relationship) { - final Set<Relationship> terminatable = undefinedRelationshipsToTerminate.get(); - if (terminatable == null) { - return false; - } - return terminatable.contains(relationship); - } - - /** - * Indicates whether flow files transferred to undefined relationships - * should be terminated - * - * @param terminate - */ - @Override - public void setAutoTerminatedRelationships(final Set<Relationship> terminate) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - - for (final Relationship rel : terminate) { - if (!getConnections(rel).isEmpty()) { - throw new IllegalStateException("Cannot mark relationship '" + rel.getName() + "' as auto-terminated because Connection already exists with this relationship"); - } - } - undefinedRelationshipsToTerminate.set(new HashSet<>(terminate)); - } finally { - writeLock.unlock(); - } - } - - /** - * @return an unmodifiable Set that contains all of the - * ProcessorRelationship objects that are configured to be auto-terminated - */ - public Set<Relationship> getAutoTerminatedRelationships() { - Set<Relationship> relationships = undefinedRelationshipsToTerminate.get(); - if (relationships == null) { - relationships = new HashSet<>(); - } - return Collections.unmodifiableSet(relationships); - } - - @Override - public String getName() { - return name.get(); - } - - /** - * @return the value of the processor's {@link CapabilityDescription} - * annotation, if one exists, else <code>null</code>. - */ - public String getProcessorDescription() { - final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); - return (capDesc == null) ? null : capDesc.value(); - } - - @Override - public void setName(final String name) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - this.name.set(name); - } finally { - writeLock.unlock(); - } - } - - /** - * @param timeUnit determines the unit of time to represent the scheduling - * period. If null will be reported in units of - * {@link #DEFAULT_SCHEDULING_TIME_UNIT} - * @return the schedule period that should elapse before subsequent cycles - * of this processor's tasks - */ - @Override - public long getSchedulingPeriod(final TimeUnit timeUnit) { - return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); - } - - public boolean isEventDrivenSupported() { - readLock.lock(); - try { - return this.eventDrivenSupported; - } finally { - readLock.unlock(); - } - } - - /** - * Updates the Scheduling Strategy used for this Processor - * - * @param schedulingStrategy - * - * @throws IllegalArgumentException if the SchedulingStrategy is not not - * applicable for this Processor - */ - public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) { - writeLock.lock(); - try { - if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !eventDrivenSupported) { - // not valid. Just ignore it. We don't throw an Exception because if a developer changes a Processor so that - // it no longer supports EventDriven mode, we don't want the app to fail to startup if it was already in Event-Driven - // Mode. Instead, we will simply leave it in Timer-Driven mode - return; - } - - this.schedulingStrategy = schedulingStrategy; - setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY); - } finally { - writeLock.unlock(); - } - } - - /** - * Returns the currently configured scheduling strategy - * - * @return - */ - public SchedulingStrategy getSchedulingStrategy() { - readLock.lock(); - try { - return this.schedulingStrategy; - } finally { - readLock.unlock(); - } - } - - @Override - public String getSchedulingPeriod() { - return schedulingPeriod.get(); - } - - /** - * @param value the number of <code>timeUnit</code>s between scheduling - * intervals. - * @param timeUnit determines the unit of time to represent the scheduling - * period. - */ - @Override - public void setScheduldingPeriod(final String schedulingPeriod) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - - switch (schedulingStrategy) { - case CRON_DRIVEN: { - try { - new CronExpression(schedulingPeriod); - } catch (final Exception e) { - throw new IllegalArgumentException("Scheduling Period is not a valid cron expression: " + schedulingPeriod); - } - } - break; - case PRIMARY_NODE_ONLY: - case TIMER_DRIVEN: { - final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS); - if (schedulingNanos < 0) { - throw new IllegalArgumentException("Scheduling Period must be positive"); - } - this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); - } - break; - case EVENT_DRIVEN: - default: - return; - } - - this.schedulingPeriod.set(schedulingPeriod); - } finally { - writeLock.unlock(); - } - } - - @Override - public long getRunDuration(final TimeUnit timeUnit) { - readLock.lock(); - try { - return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS); - } finally { - readLock.unlock(); - } - } - - @Override - public void setRunDuration(final long duration, final TimeUnit timeUnit) { - writeLock.lock(); - try { - if (duration < 0) { - throw new IllegalArgumentException("Run Duration must be non-negative value; cannot set to " + timeUnit.toSeconds(duration) + " seconds"); - } - - this.runNanos = timeUnit.toNanos(duration); - } finally { - writeLock.unlock(); - } - } - - /** - * @param timeUnit determines the unit of time to represent the yield - * period. If null will be reported in units of - * {@link #DEFAULT_SCHEDULING_TIME_UNIT}. - * @return - */ - @Override - public long getYieldPeriod(final TimeUnit timeUnit) { - return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); - } - - public String getYieldPeriod() { - return yieldPeriod.get(); - } - - /** - * Updates the amount of time that this processor should avoid being - * scheduled when the processor calls - * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()} - * - * @param yieldPeriod - */ - @Override - public void setYieldPeriod(final String yieldPeriod) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); - if (yieldMillis < 0) { - throw new IllegalArgumentException("Yield duration must be positive"); - } - this.yieldPeriod.set(yieldPeriod); - } finally { - writeLock.unlock(); - } - } - - /** - * Causes the processor not to be scheduled for some period of time. This - * duration can be obtained and set via the - * {@link #getYieldPeriod(TimeUnit)} and - * {@link #setYieldPeriod(long, TimeUnit)} methods. - */ - @Override - public void yield() { - final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); - yield(yieldMillis, TimeUnit.MILLISECONDS); - - final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 1000) + " seconds" : yieldMillis + " milliseconds"; - LoggerFactory.getLogger(processor.getClass()).debug("{} has chosen to yield its resources; will not be scheduled to run again for {}", processor, yieldDuration); - } - - public void yield(final long period, final TimeUnit timeUnit) { - final long yieldMillis = TimeUnit.MILLISECONDS.convert(period, timeUnit); - yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); - - processScheduler.yield(this); - } - - /** - * @return the number of milliseconds since Epoch at which time this - * processor is to once again be scheduled. - */ - @Override - public long getYieldExpiration() { - return yieldExpiration.get(); - } - - @Override - public long getPenalizationPeriod(final TimeUnit timeUnit) { - return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); - } - - @Override - public String getPenalizationPeriod() { - return penalizationPeriod.get(); - } - - @Override - public void setPenalizationPeriod(final String penalizationPeriod) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), TimeUnit.MILLISECONDS); - if (penalizationMillis < 0) { - throw new IllegalArgumentException("Penalization duration must be positive"); - } - this.penalizationPeriod.set(penalizationPeriod); - } finally { - writeLock.unlock(); - } - } - - /** - * Determines the number of concurrent tasks that may be running for this - * processor. - * - * @param taskCount a number of concurrent tasks this processor may have - * running - * @throws IllegalArgumentException if the given value is less than 1 - */ - @Override - public void setMaxConcurrentTasks(final int taskCount) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) { - throw new IllegalArgumentException(); - } - if (!triggeredSerially) { - concurrentTaskCount.set(taskCount); - } - } finally { - writeLock.unlock(); - } - } - - public boolean isTriggeredSerially() { - return triggeredSerially; - } - - /** - * @return the number of tasks that may execute concurrently for this - * processor - */ - @Override - public int getMaxConcurrentTasks() { - return concurrentTaskCount.get(); - } - - public LogLevel getBulletinLevel() { - return LogRepositoryFactory.getRepository(getIdentifier()).getObservationLevel(BULLETIN_OBSERVER_ID); - } - - public void setBulletinLevel(final LogLevel level) { - LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID, level); - } - - @Override - public Set<Connection> getConnections() { - final Set<Connection> allConnections = new HashSet<>(); - readLock.lock(); - try { - for (final Set<Connection> connectionSet : connections.values()) { - allConnections.addAll(connectionSet); - } - } finally { - readLock.unlock(); - } - - return allConnections; - } - - @Override - public List<Connection> getIncomingConnections() { - return incomingConnectionsRef.get(); - } - - @Override - public Set<Connection> getConnections(final Relationship relationship) { - final Set<Connection> applicableConnections; - readLock.lock(); - try { - applicableConnections = connections.get(relationship); - } finally { - readLock.unlock(); - } - return (applicableConnections == null) ? Collections.<Connection>emptySet() : Collections.unmodifiableSet(applicableConnections); - } - - @Override - public void addConnection(final Connection connection) { - Objects.requireNonNull(connection, "connection cannot be null"); - - if (!connection.getSource().equals(this) && !connection.getDestination().equals(this)) { - throw new IllegalStateException("Cannot a connection to a ProcessorNode for which the ProcessorNode is neither the Source nor the Destination"); - } - - writeLock.lock(); - try { - List<Connection> updatedIncoming = null; - if (connection.getDestination().equals(this)) { - // don't add the connection twice. This may occur if we have a self-loop because we will be told - // to add the connection once because we are the source and again because we are the destination. - final List<Connection> incomingConnections = incomingConnectionsRef.get(); - updatedIncoming = new ArrayList<>(incomingConnections); - if (!updatedIncoming.contains(connection)) { - updatedIncoming.add(connection); - } - } - - if (connection.getSource().equals(this)) { - // don't add the connection twice. This may occur if we have a self-loop because we will be told - // to add the connection once because we are the source and again because we are the destination. - if (!destinations.containsKey(connection)) { - for (final Relationship relationship : connection.getRelationships()) { - final Relationship rel = getRelationship(relationship.getName()); - Set<Connection> set = connections.get(rel); - if (set == null) { - set = new HashSet<>(); - connections.put(rel, set); - } - - set.add(connection); - - destinations.put(connection, connection.getDestination()); - } - - final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); - if (autoTerminated != null) { - autoTerminated.removeAll(connection.getRelationships()); - this.undefinedRelationshipsToTerminate.set(autoTerminated); - } - } - } - - if (updatedIncoming != null) { - incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); - } - } finally { - writeLock.unlock(); - } - } - - @Override - public boolean hasIncomingConnection() { - return !incomingConnectionsRef.get().isEmpty(); - } - - @Override - public void updateConnection(final Connection connection) throws IllegalStateException { - if (requireNonNull(connection).getSource().equals(this)) { - writeLock.lock(); - try { - // - // update any relationships - // - // first check if any relations were removed. - final List<Relationship> existingRelationships = new ArrayList<>(); - for (final Map.Entry<Relationship, Set<Connection>> entry : connections.entrySet()) { - if (entry.getValue().contains(connection)) { - existingRelationships.add(entry.getKey()); - } - } - - for (final Relationship rel : connection.getRelationships()) { - if (!existingRelationships.contains(rel)) { - // relationship was removed. Check if this is legal. - final Set<Connection> connectionsForRelationship = getConnections(rel); - if (connectionsForRelationship != null && connectionsForRelationship.size() == 1 && this.isRunning() && !isAutoTerminated(rel) && getRelationships().contains(rel)) { - // if we are running and we do not terminate undefined relationships and this is the only - // connection that defines the given relationship, and that relationship is required, - // then it is not legal to remove this relationship from this connection. - throw new IllegalStateException("Cannot remove relationship " + rel.getName() + " from Connection because doing so would invalidate Processor " + this + ", which is currently running"); - } - } - } - - // remove the connection from any list that currently contains - for (final Set<Connection> list : connections.values()) { - list.remove(connection); - } - - // add the connection in for all relationships listed. - for (final Relationship rel : connection.getRelationships()) { - Set<Connection> set = connections.get(rel); - if (set == null) { - set = new HashSet<>(); - connections.put(rel, set); - } - set.add(connection); - } - - // update to the new destination - destinations.put(connection, connection.getDestination()); - - final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); - if (autoTerminated != null) { - autoTerminated.removeAll(connection.getRelationships()); - this.undefinedRelationshipsToTerminate.set(autoTerminated); - } - } finally { - writeLock.unlock(); - } - } - - if (connection.getDestination().equals(this)) { - writeLock.lock(); - try { - // update our incoming connections -- we can just remove & re-add the connection to - // update the list. - final List<Connection> incomingConnections = incomingConnectionsRef.get(); - final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); - updatedIncoming.remove(connection); - updatedIncoming.add(connection); - incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); - } finally { - writeLock.unlock(); - } - } - } - - @Override - public void removeConnection(final Connection connection) { - boolean connectionRemoved = false; - - if (requireNonNull(connection).getSource().equals(this)) { - for (final Relationship relationship : connection.getRelationships()) { - final Set<Connection> connectionsForRelationship = getConnections(relationship); - if ((connectionsForRelationship == null || connectionsForRelationship.size() <= 1) && isRunning()) { - throw new IllegalStateException("This connection cannot be removed because its source is running and removing it will invalidate this processor"); - } - } - - writeLock.lock(); - try { - for (final Set<Connection> connectionList : this.connections.values()) { - connectionList.remove(connection); - } - - connectionRemoved = (destinations.remove(connection) != null); - } finally { - writeLock.unlock(); - } - } - - if (connection.getDestination().equals(this)) { - writeLock.lock(); - try { - final List<Connection> incomingConnections = incomingConnectionsRef.get(); - if (incomingConnections.contains(connection)) { - final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); - updatedIncoming.remove(connection); - incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); - return; - } - } finally { - writeLock.unlock(); - } - } - - if (!connectionRemoved) { - throw new IllegalArgumentException("Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source"); - } - } - - /** - * Gets the relationship for this nodes processor for the given name or - * creates a new relationship for the given name. - * - * @param relationshipName - * @return - */ - @Override - public Relationship getRelationship(final String relationshipName) { - final Relationship specRel = new Relationship.Builder().name(relationshipName).build(); - Relationship returnRel = specRel; - - final Set<Relationship> relationships; - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - relationships = processor.getRelationships(); - } - - for (final Relationship rel : relationships) { - if (rel.equals(specRel)) { - returnRel = rel; - break; - } - } - return returnRel; - } - - public Processor getProcessor() { - return this.processor; - } - - /** - * Obtains the Set of destination processors for all relationships excluding - * any destinations that are this processor itself (self-loops) - * - * @return - */ - public Set<Connectable> getDestinations() { - final Set<Connectable> nonSelfDestinations = new HashSet<>(); - readLock.lock(); - try { - for (final Connectable connectable : destinations.values()) { - if (connectable != this) { - nonSelfDestinations.add(connectable); - } - } - } finally { - readLock.unlock(); - } - return nonSelfDestinations; - } - - public Set<Connectable> getDestinations(final Relationship relationship) { - readLock.lock(); - try { - final Set<Connectable> destinationSet = new HashSet<>(); - final Set<Connection> relationshipConnections = connections.get(relationship); - if (relationshipConnections != null) { - for (final Connection connection : relationshipConnections) { - destinationSet.add(destinations.get(connection)); - } - } - return destinationSet; - } finally { - readLock.unlock(); - } - } - - public Set<Relationship> getUndefinedRelationships() { - final Set<Relationship> undefined = new HashSet<>(); - readLock.lock(); - try { - final Set<Relationship> relationships; - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - relationships = processor.getRelationships(); - } - - if (relationships == null) { - return undefined; - } - for (final Relationship relation : relationships) { - final Set<Connection> connectionSet = this.connections.get(relation); - if (connectionSet == null || connectionSet.isEmpty()) { - undefined.add(relation); - } - } - } finally { - readLock.unlock(); - } - return undefined; - } - - /** - * Determines if the given node is a destination for this node - * - * @param node - * @return true if is a direct destination node; false otherwise - */ - boolean isRelated(final ProcessorNode node) { - readLock.lock(); - try { - return this.destinations.containsValue(node); - } finally { - readLock.unlock(); - } - } - - @Override - public boolean isRunning() { - readLock.lock(); - try { - return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0; - } finally { - readLock.unlock(); - } - } - - @Override - public boolean isValid() { - readLock.lock(); - try { - final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData()); - - final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - validationResults = getProcessor().validate(validationContext); - } - - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - return false; - } - } - - for (final Relationship undef : getUndefinedRelationships()) { - if (!isAutoTerminated(undef)) { - return false; - } - } - } catch (final Throwable t) { - return false; - } finally { - readLock.unlock(); - } - - return true; - } - - @Override - public Collection<ValidationResult> getValidationErrors() { - final List<ValidationResult> results = new ArrayList<>(); - readLock.lock(); - try { - final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData()); - - final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - validationResults = getProcessor().validate(validationContext); - } - - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - results.add(result); - } - } - - for (final Relationship relationship : getUndefinedRelationships()) { - if (!isAutoTerminated(relationship)) { - final ValidationResult error = new ValidationResult.Builder() - .explanation("Relationship '" + relationship.getName() + "' is not connected to any component and is not auto-terminated") - .subject("Relationship " + relationship.getName()) - .valid(false) - .build(); - results.add(error); - } - } - } catch (final Throwable t) { - results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build()); - } finally { - readLock.unlock(); - } - return results; - } - - /** - * Establishes node equality (based on the processor's identifier) - * - * @param other - * @return - */ - @Override - public boolean equals(final Object other) { - if (!(other instanceof ProcessorNode)) { - return false; - } - final ProcessorNode on = (ProcessorNode) other; - return new EqualsBuilder().append(identifier.get(), on.getIdentifier()).isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(7, 67).append(identifier).toHashCode(); - } - - @Override - public Collection<Relationship> getRelationships() { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - return getProcessor().getRelationships(); - } - } - - @Override - public String toString() { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - return getProcessor().toString(); - } - } - - @Override - public ProcessGroup getProcessGroup() { - return processGroup.get(); - } - - @Override - public void setProcessGroup(final ProcessGroup group) { - writeLock.lock(); - try { - this.processGroup.set(group); - } finally { - writeLock.unlock(); - } - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - processor.onTrigger(context, sessionFactory); - } - } - - @Override - public ConnectableType getConnectableType() { - return ConnectableType.PROCESSOR; - } - - public void setScheduledState(final ScheduledState scheduledState) { - this.scheduledState.set(scheduledState); - if (!scheduledState.equals(ScheduledState.RUNNING)) { // if user stops processor, clear yield expiration - yieldExpiration.set(0L); - } - } - - @Override - public void setAnnotationData(final String data) { - writeLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException("Cannot set AnnotationData while processor is running"); - } - - this.annotationData.set(data); - } finally { - writeLock.unlock(); - } - } - - @Override - public String getAnnotationData() { - return annotationData.get(); - } - - @Override - public Collection<ValidationResult> validate(final ValidationContext validationContext) { - return processor.validate(validationContext); - } - - @Override - public void verifyCanDelete() throws IllegalStateException { - verifyCanDelete(false); - } - - @Override - public void verifyCanDelete(final boolean ignoreConnections) { - readLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException(this + " is running"); - } - - if (!ignoreConnections) { - for (final Set<Connection> connectionSet : connections.values()) { - for (final Connection connection : connectionSet) { - connection.verifyCanDelete(); - } - } - - for (final Connection connection : incomingConnectionsRef.get()) { - if (connection.getSource().equals(this)) { - connection.verifyCanDelete(); - } else { - throw new IllegalStateException(this + " is the destination of another component"); - } - } - } - } finally { - readLock.unlock(); - } - } - - @Override - public void verifyCanStart() { - readLock.lock(); - try { - if (scheduledState.get() != ScheduledState.STOPPED) { - throw new IllegalStateException(this + " is not stopped"); - } - verifyNoActiveThreads(); - - if (!isValid()) { - throw new IllegalStateException(this + " is not in a valid state"); - } - } finally { - readLock.unlock(); - } - } - - @Override - public void verifyCanStop() { - if (getScheduledState() != ScheduledState.RUNNING) { - throw new IllegalStateException(this + " is not scheduled to run"); - } - } - - @Override - public void verifyCanUpdate() { - readLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException(this + " is not stopped"); - } - } finally { - readLock.unlock(); - } - } - - @Override - public void verifyCanEnable() { - readLock.lock(); - try { - if (getScheduledState() != ScheduledState.DISABLED) { - throw new IllegalStateException(this + " is not disabled"); - } - - verifyNoActiveThreads(); - } finally { - readLock.unlock(); - } - } - - @Override - public void verifyCanDisable() { - readLock.lock(); - try { - if (getScheduledState() != ScheduledState.STOPPED) { - throw new IllegalStateException(this + " is not stopped"); - } - verifyNoActiveThreads(); - } finally { - readLock.unlock(); - } - } - - private void verifyNoActiveThreads() throws IllegalStateException { - final int threadCount = processScheduler.getActiveThreadCount(this); - if (threadCount > 0) { - throw new IllegalStateException(this + " has " + threadCount + " threads still active"); - } - } - - @Override - public void verifyModifiable() throws IllegalStateException { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardSnippet.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardSnippet.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardSnippet.java deleted file mode 100644 index 2c9d85e..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardSnippet.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import javax.xml.bind.annotation.XmlRootElement; - -/** - * Represents a data flow snippet. - */ -@XmlRootElement(name = "snippet") -public class StandardSnippet implements Snippet { - - private String id; - private String parentGroupId; - private Boolean linked; - - private Set<String> processGroups = new HashSet<>(); - private Set<String> remoteProcessGroups = new HashSet<>(); - private Set<String> processors = new HashSet<>(); - private Set<String> inputPorts = new HashSet<>(); - private Set<String> outputPorts = new HashSet<>(); - private Set<String> connections = new HashSet<>(); - private Set<String> labels = new HashSet<>(); - private Set<String> funnels = new HashSet<>(); - - /** - * The id of this snippet. - * - * @return - */ - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - /** - * Whether or not this snippet is linked to the data flow. - * - * @return - */ - public boolean isLinked() { - if (linked == null) { - return false; - } else { - return linked; - } - } - - public void setLinked(Boolean linked) { - this.linked = linked; - } - - /** - * The parent group id of the components in this snippet. - * - * @return - */ - public String getParentGroupId() { - return parentGroupId; - } - - public void setParentGroupId(String parentGroupId) { - this.parentGroupId = parentGroupId; - } - - /** - * The connections in this snippet. - * - * @return - */ - public Set<String> getConnections() { - return Collections.unmodifiableSet(connections); - } - - public void addConnections(Collection<String> ids) { - connections.addAll(ids); - } - - /** - * The funnels in this snippet. - * - * @return - */ - public Set<String> getFunnels() { - return Collections.unmodifiableSet(funnels); - } - - public void addFunnels(Collection<String> ids) { - funnels.addAll(ids); - } - - /** - * The input ports in this snippet. - * - * @return - */ - public Set<String> getInputPorts() { - return Collections.unmodifiableSet(inputPorts); - } - - public void addInputPorts(Collection<String> ids) { - inputPorts.addAll(ids); - } - - /** - * The output ports in this snippet. - * - * @return - */ - public Set<String> getOutputPorts() { - return Collections.unmodifiableSet(outputPorts); - } - - public void addOutputPorts(Collection<String> ids) { - outputPorts.addAll(ids); - } - - /** - * The labels in this snippet. - * - * @return - */ - public Set<String> getLabels() { - return Collections.unmodifiableSet(labels); - } - - public void addLabels(Collection<String> ids) { - labels.addAll(ids); - } - - public Set<String> getProcessGroups() { - return Collections.unmodifiableSet(processGroups); - } - - public void addProcessGroups(Collection<String> ids) { - processGroups.addAll(ids); - } - - public Set<String> getProcessors() { - return Collections.unmodifiableSet(processors); - } - - public void addProcessors(Collection<String> ids) { - processors.addAll(ids); - } - - public Set<String> getRemoteProcessGroups() { - return Collections.unmodifiableSet(remoteProcessGroups); - } - - public void addRemoteProcessGroups(Collection<String> ids) { - remoteProcessGroups.addAll(ids); - } - - /** - * Determines if this snippet is empty. - * - * @return - */ - public boolean isEmpty() { - return processors.isEmpty() && processGroups.isEmpty() && remoteProcessGroups.isEmpty() && labels.isEmpty() - && inputPorts.isEmpty() && outputPorts.isEmpty() && connections.isEmpty() && funnels.isEmpty(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/Template.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/Template.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/Template.java deleted file mode 100644 index 26c4cd2..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/Template.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import org.apache.nifi.web.api.dto.TemplateDTO; - -public class Template { - - private final TemplateDTO dto; - - public Template(final TemplateDTO dto) { - this.dto = dto; - } - - /** - * Returns a TemplateDTO object that describes the contents of this Template - * - * @return - */ - public TemplateDTO getDetails() { - return dto; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/TemplateManager.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/TemplateManager.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/TemplateManager.java deleted file mode 100644 index aa095d1..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/TemplateManager.java +++ /dev/null @@ -1,507 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import static java.util.Objects.requireNonNull; - -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileFilter; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.nifi.stream.io.ByteArrayInputStream; -import org.apache.nifi.stream.io.ByteArrayOutputStream; -import org.apache.nifi.stream.io.DataOutputStream; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.persistence.TemplateDeserializer; -import org.apache.nifi.persistence.TemplateSerializer; -import org.apache.nifi.web.api.dto.ConnectableDTO; -import org.apache.nifi.web.api.dto.ConnectionDTO; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; -import org.apache.nifi.web.api.dto.ProcessorConfigDTO; -import org.apache.nifi.web.api.dto.ProcessorConfigDTO.PropertyDescriptorDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; -import org.apache.nifi.web.api.dto.TemplateDTO; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TemplateManager { - - private static final Logger logger = LoggerFactory.getLogger(TemplateManager.class); - - private final Path directory; - private final Map<String, Template> templateMap = new HashMap<>(); - - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); - private final Lock writeLock = rwLock.writeLock(); - - private final FileFilter templateFileFilter = new FileFilter() { - @Override - public boolean accept(File pathname) { - return pathname.getName().toLowerCase().endsWith(".template"); - } - }; - - public TemplateManager(final Path storageLocation) throws IOException { - directory = storageLocation; - if (!Files.exists(directory)) { - Files.createDirectories(directory); - } else { - if (!Files.isDirectory(directory)) { - throw new IllegalArgumentException(directory.toString() + " is not a directory"); - } - // use toFile().canXXX, rather than Files.is... because on Windows 7, we sometimes get the wrong result for Files.is... (running Java 7 update 9) - if (!directory.toFile().canExecute() || !directory.toFile().canWrite()) { - throw new IOException("Invalid permissions for directory " + directory.toString()); - } - } - } - - /** - * Adds a template to this manager. The contents of this template must be - * part of the current flow. This is going create a template based on a - * snippet of this flow. Any sensitive properties in the TemplateDTO will be - * removed. - * - * @param dto - * @return a copy of the given DTO - * @throws IOException if an I/O error occurs when persisting the Template - * @throws NullPointerException if the DTO is null - * @throws IllegalArgumentException if does not contain all required - * information, such as the template name or a processor's configuration - * element - */ - public Template addTemplate(final TemplateDTO dto) throws IOException { - scrubTemplate(dto.getSnippet()); - return importTemplate(dto); - } - - private void verifyCanImport(final TemplateDTO dto) { - // ensure the template is specified - if (dto == null || dto.getSnippet() == null) { - throw new IllegalArgumentException("Template details not specified."); - } - - // ensure the name is specified - if (StringUtils.isBlank(dto.getName())) { - throw new IllegalArgumentException("Template name cannot be blank."); - } - - readLock.lock(); - try { - for (final Template template : templateMap.values()) { - final TemplateDTO existingDto = template.getDetails(); - - // ensure a template with this name doesnt already exist - if (dto.getName().equals(existingDto.getName())) { - throw new IllegalStateException(String.format("A template named '%s' already exists.", dto.getName())); - } - } - } finally { - readLock.unlock(); - } - } - - /** - * Clears all Templates from the TemplateManager - */ - public void clear() throws IOException { - writeLock.lock(); - try { - templateMap.clear(); - - final File[] files = directory.toFile().listFiles(templateFileFilter); - if (files == null) { - return; - } - - for (final File file : files) { - boolean successful = false; - - for (int i = 0; i < 10; i++) { - if (file.delete()) { - successful = true; - break; - } - } - - if (!successful && file.exists()) { - throw new IOException("Failed to delete template file " + file.getAbsolutePath()); - } - } - } finally { - writeLock.unlock(); - } - } - - /** - * Returns the template with the given id, if it exists; else, returns null - * - * @param id - * @return - */ - public Template getTemplate(final String id) { - readLock.lock(); - try { - return templateMap.get(id); - } finally { - readLock.unlock(); - } - } - - /** - * Loads the templates from disk - * - * @throws IOException - */ - public void loadTemplates() throws IOException { - writeLock.lock(); - try { - final File[] files = directory.toFile().listFiles(templateFileFilter); - if (files == null) { - return; - } - - for (final File file : files) { - try (final FileInputStream fis = new FileInputStream(file); - final BufferedInputStream bis = new BufferedInputStream(fis)) { - final TemplateDTO templateDto = TemplateDeserializer.deserialize(bis); - templateMap.put(templateDto.getId(), new Template(templateDto)); - } - } - } finally { - writeLock.unlock(); - } - } - - public Template importTemplate(final TemplateDTO dto) throws IOException { - // ensure we can add this template - verifyCanImport(dto); - - writeLock.lock(); - try { - if (requireNonNull(dto).getId() == null) { - dto.setId(UUID.randomUUID().toString()); - } - - final Template template = new Template(dto); - persistTemplate(template); - - templateMap.put(dto.getId(), template); - return template; - } finally { - writeLock.unlock(); - } - } - - /** - * Persists the given template to disk - * - * @param dto - * @throws IOException - */ - private void persistTemplate(final Template template) throws IOException { - final Path path = directory.resolve(template.getDetails().getId() + ".template"); - Files.write(path, TemplateSerializer.serialize(template.getDetails()), StandardOpenOption.WRITE, StandardOpenOption.CREATE); - } - - /** - * Scrubs the template prior to persisting in order to remove fields that - * shouldn't be included or are unnecessary. - * - * @param snippet - */ - private void scrubTemplate(final FlowSnippetDTO snippet) { - // ensure that contents have been specified - if (snippet != null) { - // go through each processor if specified - if (snippet.getProcessors() != null) { - scrubProcessors(snippet.getProcessors()); - } - - // go through each connection if specified - if (snippet.getConnections() != null) { - scrubConnections(snippet.getConnections()); - } - - // go through each remote process group if specified - if (snippet.getRemoteProcessGroups() != null) { - scrubRemoteProcessGroups(snippet.getRemoteProcessGroups()); - } - - // go through each process group if specified - if (snippet.getProcessGroups() != null) { - scrubProcessGroups(snippet.getProcessGroups()); - } - } - } - - /** - * Scrubs process groups prior to saving. - * - * @param processGroups - */ - private void scrubProcessGroups(final Set<ProcessGroupDTO> processGroups) { - // go through each process group - for (final ProcessGroupDTO processGroupDTO : processGroups) { - scrubTemplate(processGroupDTO.getContents()); - } - } - - /** - * Scrubs processors prior to saving. This includes removing sensitive - * properties, validation errors, property descriptors, etc. - * - * @param snippet - */ - private void scrubProcessors(final Set<ProcessorDTO> processors) { - // go through each processor - for (final ProcessorDTO processorDTO : processors) { - final ProcessorConfigDTO processorConfig = processorDTO.getConfig(); - - // ensure that some property configuration have been specified - if (processorConfig != null) { - // if properties have been specified, remove sensitive ones - if (processorConfig.getProperties() != null) { - Map<String, String> processorProperties = processorConfig.getProperties(); - - // look for sensitive properties and remove them - if (processorConfig.getDescriptors() != null) { - final Collection<PropertyDescriptorDTO> descriptors = processorConfig.getDescriptors().values(); - for (PropertyDescriptorDTO descriptor : descriptors) { - if (descriptor.isSensitive()) { - processorProperties.put(descriptor.getName(), null); - } - } - } - } - - processorConfig.setDescriptors(null); - processorConfig.setCustomUiUrl(null); - } - - // remove validation errors - processorDTO.setValidationErrors(null); - } - } - - /** - * Scrubs connections prior to saving. This includes removing available - * relationships. - * - * @param snippet - */ - private void scrubConnections(final Set<ConnectionDTO> connections) { - // go through each connection - for (final ConnectionDTO connectionDTO : connections) { - connectionDTO.setAvailableRelationships(null); - - scrubConnectable(connectionDTO.getSource()); - scrubConnectable(connectionDTO.getDestination()); - } - } - - /** - * Remove unnecessary fields in connectables prior to saving. - * - * @param connectable - */ - private void scrubConnectable(final ConnectableDTO connectable) { - if (connectable != null) { - connectable.setComments(null); - connectable.setExists(null); - connectable.setRunning(null); - connectable.setTransmitting(null); - connectable.setName(null); - } - } - - /** - * Remove unnecessary fields in remote groups prior to saving. - * - * @param remoteGroups - */ - private void scrubRemoteProcessGroups(final Set<RemoteProcessGroupDTO> remoteGroups) { - // go through each remote process group - for (final RemoteProcessGroupDTO remoteProcessGroupDTO : remoteGroups) { - remoteProcessGroupDTO.setFlowRefreshed(null); - remoteProcessGroupDTO.setInputPortCount(null); - remoteProcessGroupDTO.setOutputPortCount(null); - remoteProcessGroupDTO.setTransmitting(null); - - // if this remote process group has contents - if (remoteProcessGroupDTO.getContents() != null) { - RemoteProcessGroupContentsDTO contents = remoteProcessGroupDTO.getContents(); - - // scrub any remote input ports - if (contents.getInputPorts() != null) { - scrubRemotePorts(contents.getInputPorts()); - } - - // scrub and remote output ports - if (contents.getOutputPorts() != null) { - scrubRemotePorts(contents.getOutputPorts()); - } - } - } - } - - /** - * Remove unnecessary fields in remote ports prior to saving. - * - * @param remotePorts - */ - private void scrubRemotePorts(final Set<RemoteProcessGroupPortDTO> remotePorts) { - for (final Iterator<RemoteProcessGroupPortDTO> remotePortIter = remotePorts.iterator(); remotePortIter.hasNext();) { - final RemoteProcessGroupPortDTO remotePortDTO = remotePortIter.next(); - - // if the flow is not connected to this remote port, remove it - if (remotePortDTO.isConnected() == null || !remotePortDTO.isConnected().booleanValue()) { - remotePortIter.remove(); - continue; - } - - remotePortDTO.setExists(null); - remotePortDTO.setTargetRunning(null); - } - } - - /** - * Removes the template with the given ID. - * - * @param id the ID of the template to remove - * @throws NullPointerException if the argument is null - * @throws IllegalStateException if no template exists with the given ID - * @throws IOException if template could not be removed - */ - public void removeTemplate(final String id) throws IOException, IllegalStateException { - writeLock.lock(); - try { - final Template removed = templateMap.remove(requireNonNull(id)); - - // ensure the template exists - if (removed == null) { - throw new IllegalStateException("No template with ID " + id + " exists"); - } else { - - try { - // remove the template from the archive directory - final Path path = directory.resolve(removed.getDetails().getId() + ".template"); - Files.delete(path); - } catch (final NoSuchFileException e) { - logger.warn(String.format("Template file for template %s not found when attempting to remove. Continuing...", id)); - } catch (final IOException e) { - logger.error(String.format("Unable to remove template file for template %s.", id)); - - // since the template file existed and we were unable to remove it, rollback - // by returning it to the template map - templateMap.put(id, removed); - - // rethrow - throw e; - } - } - } finally { - writeLock.unlock(); - } - } - - public Set<Template> getTemplates() { - readLock.lock(); - try { - return new HashSet<>(templateMap.values()); - } finally { - readLock.unlock(); - } - } - - public static List<Template> parseBytes(final byte[] bytes) { - final List<Template> templates = new ArrayList<>(); - - try (final InputStream rawIn = new ByteArrayInputStream(bytes); - final DataInputStream in = new DataInputStream(rawIn)) { - - while (isMoreData(in)) { - final int length = in.readInt(); - final byte[] buffer = new byte[length]; - StreamUtils.fillBuffer(in, buffer, true); - final TemplateDTO dto = TemplateDeserializer.deserialize(new ByteArrayInputStream(buffer)); - templates.add(new Template(dto)); - } - } catch (final IOException e) { - throw new RuntimeException("Could not parse bytes", e); // won't happen because of the types of streams being used - } - - return templates; - } - - private static boolean isMoreData(final InputStream in) throws IOException { - in.mark(1); - final int nextByte = in.read(); - if (nextByte == -1) { - return false; - } - - in.reset(); - return true; - } - - public byte[] export() { - readLock.lock(); - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final DataOutputStream dos = new DataOutputStream(baos)) { - for (final Template template : templateMap.values()) { - final TemplateDTO dto = template.getDetails(); - final byte[] bytes = TemplateSerializer.serialize(dto); - dos.writeInt(bytes.length); - dos.write(bytes); - } - - return baos.toByteArray(); - } catch (final IOException e) { - // won't happen - return null; - } finally { - readLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java deleted file mode 100644 index ac6fc5f..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -/** - * Represents the exceptional case when a controller is to be loaded with a flow - * that is fundamentally different than its existing flow. - * - * @author unattributed - */ -public class UninheritableFlowException extends RuntimeException { - - private static final long serialVersionUID = 198234798234794L; - - public UninheritableFlowException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } - - public UninheritableFlowException(Throwable cause) { - super(cause); - } - - public UninheritableFlowException(String message, Throwable cause) { - super(message, cause); - } - - public UninheritableFlowException(String message) { - super(message); - } - - public UninheritableFlowException() { - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/FlowFileConsumptionException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/FlowFileConsumptionException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/FlowFileConsumptionException.java deleted file mode 100644 index 4285e65..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/FlowFileConsumptionException.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.exception; - -public class FlowFileConsumptionException extends Exception { - - private static final long serialVersionUID = 18923749824378923L; - - public FlowFileConsumptionException() { - super(); - } - - public FlowFileConsumptionException(final Throwable cause) { - super(cause); - } - - public FlowFileConsumptionException(final String explanation) { - super(explanation); - } - - public FlowFileConsumptionException(final String explanation, final Throwable cause) { - super(explanation, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ReportingTaskInstantiationException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ReportingTaskInstantiationException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ReportingTaskInstantiationException.java deleted file mode 100644 index f407048..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ReportingTaskInstantiationException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.exception; - -public class ReportingTaskInstantiationException extends Exception { - - private static final long serialVersionUID = 189234789237L; - - public ReportingTaskInstantiationException(final String className, final Throwable t) { - super(className, t); - } - - public ReportingTaskInstantiationException(final String message) { - super(message); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ValidationException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ValidationException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ValidationException.java deleted file mode 100644 index a125716..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ValidationException.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.exception; - -import java.util.Collections; -import java.util.List; - -public class ValidationException extends RuntimeException { - - private static final long serialVersionUID = 198023479823479L; - - private final List<String> errors; - - public ValidationException(final List<String> errors) { - this.errors = errors; - } - - public List<String> getValidationErrors() { - return Collections.unmodifiableList(errors); - } - - @Override - public String getLocalizedMessage() { - final StringBuilder sb = new StringBuilder(); - sb.append(errors.size()).append(" validation error"); - if (errors.size() == 1) { - sb.append(": ").append(errors.get(0)); - } else { - sb.append("s"); - } - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java deleted file mode 100644 index c13dd47..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.label; - -import org.apache.nifi.controller.label.Label; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.nifi.connectable.Position; -import org.apache.nifi.connectable.Size; -import org.apache.nifi.groups.ProcessGroup; - -public class StandardLabel implements Label { - - private final String identifier; - private final AtomicReference<Position> position; - private final AtomicReference<Size> size; - private final AtomicReference<Map<String, String>> style; - private final AtomicReference<String> value; - private final AtomicReference<ProcessGroup> processGroup; - - public StandardLabel(final String identifier, final String value) { - this(identifier, new Position(0D, 0D), new HashMap<String, String>(), value, null); - } - - public StandardLabel(final String identifier, final Position position, final Map<String, String> style, final String value, final ProcessGroup processGroup) { - this.identifier = identifier; - this.position = new AtomicReference<>(position); - this.style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>(style))); - this.size = new AtomicReference<>(new Size(150, 150)); - this.value = new AtomicReference<>(value); - this.processGroup = new AtomicReference<>(processGroup); - } - - @Override - public Position getPosition() { - return position.get(); - } - - @Override - public void setPosition(final Position position) { - if (position != null) { - this.position.set(position); - } - } - - @Override - public Size getSize() { - return size.get(); - } - - @Override - public void setSize(final Size size) { - if (size != null) { - this.size.set(size); - } - } - - public String getIdentifier() { - return identifier; - } - - public Map<String, String> getStyle() { - return style.get(); - } - - public void setStyle(final Map<String, String> style) { - if (style != null) { - boolean updated = false; - while (!updated) { - final Map<String, String> existingStyles = this.style.get(); - final Map<String, String> updatedStyles = new HashMap<>(existingStyles); - updatedStyles.putAll(style); - updated = this.style.compareAndSet(existingStyles, Collections.unmodifiableMap(updatedStyles)); - } - } - } - - public String getValue() { - return value.get(); - } - - public void setValue(final String value) { - this.value.set(value); - } - - public void setProcessGroup(final ProcessGroup group) { - this.processGroup.set(group); - } - - public ProcessGroup getProcessGroup() { - return processGroup.get(); - } -}