http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java new file mode 100644 index 0000000..3d5c75d --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.connectable; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.Triggerable; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.scheduling.SchedulingStrategy; + +/** + * Represents a connectable component to which or from which data can flow. + */ +public interface Connectable extends Triggerable { + + /** + * @return the unique identifier for this <code>Connectable</code> + */ + String getIdentifier(); + + /** + * @return a Collection of all relationships for this Connectable + */ + Collection<Relationship> getRelationships(); + + /** + * Returns the ProcessorRelationship whose name is given + * + * @param relationshipName + * @return a ProcessorRelationship whose name is given, or <code>null</code> + * if none exists + */ + Relationship getRelationship(String relationshipName); + + /** + * Adds the given connection to this Connectable. + * + * @param connection the connection to add + * @throws NullPointerException if the argument is null + * @throws IllegalArgumentException if the given Connection is not valid + */ + void addConnection(Connection connection) throws IllegalArgumentException; + + /** + * @return true if the Connectable is the destination of any other + * Connectable, false otherwise. + */ + boolean hasIncomingConnection(); + + /** + * + * @param connection + * @throws IllegalStateException if the given Connection is not registered + * to <code>this</code>. + */ + void removeConnection(Connection connection) throws IllegalStateException; + + /** + * Updates any internal state that depends on the given connection. The + * given connection will share the same ID as the old connection. + * + * @param newConnection + * @throws IllegalStateException + */ + void updateConnection(Connection newConnection) throws IllegalStateException; + + /** + * @return a <code>Set</code> of all <code>Connection</code>s for which this + * <code>Connectable</code> is the destination + */ + List<Connection> getIncomingConnections(); + + /** + * @return a <code>Set</code> of all <code>Connection</code>s for which this + * <code>Connectable</code> is the source; if no connections exist, will + * return an empty Collection. Guaranteed not null. + */ + Set<Connection> getConnections(); + + /** + * @param relationship + * @return a <code>Set</code> of all <code>Connection</code>s that contain + * the given relationship for which this <code>Connectable</code> is the + * source + */ + Set<Connection> getConnections(Relationship relationship); + + /** + * Returns the position on the graph where this Connectable is located + * + * @return + */ + Position getPosition(); + + /** + * Updates this component's position on the graph + * + * @param position + */ + void setPosition(Position position); + + /** + * @return the name of this Connectable + */ + String getName(); + + /** + * Sets the name of this Connectable so that its name will be visible on the + * UI + * @param name + */ + void setName(String name); + + /** + * @return the comments of this Connectable + */ + String getComments(); + + /** + * Sets the comments of this Connectable. + * @param comments + */ + void setComments(String comments); + + /** + * If true, + * {@link #onTrigger(nifi.processor.ProcessContext, nifi.processor.ProcessSessionFactory)} + * should be called even when this Connectable has no FlowFiles queued for + * processing + * + * @return + */ + boolean isTriggerWhenEmpty(); + + /** + * Returns the ProcessGroup to which this <code>Connectable</code> belongs + * + * @return + */ + ProcessGroup getProcessGroup(); + + /** + * Sets the new ProcessGroup to which this <code>Connectable</code> belongs + * + * @param group + */ + void setProcessGroup(ProcessGroup group); + + /** + * + * @param relationship the relationship + * @return true indicates flow files transferred to the given relationship + * should be terminated if the relationship is not connected to another + * FlowFileConsumer; false indicates they will not be terminated and the + * processor will not be valid until specified + */ + boolean isAutoTerminated(Relationship relationship); + + /** + * Indicates whether flow file content made by this connectable must be + * persisted + * + * @return + */ + boolean isLossTolerant(); + + /** + * @param lossTolerant + */ + void setLossTolerant(boolean lossTolerant); + + /** + * @return the type of the Connectable + */ + ConnectableType getConnectableType(); + + /** + * Returns the any validation errors for this connectable. + * + * @return + */ + Collection<ValidationResult> getValidationErrors(); + + /** + * Returns the amount of time for which a FlowFile should be penalized when + * {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called + * + * @param timeUnit + * @return + */ + long getPenalizationPeriod(final TimeUnit timeUnit); + + /** + * Returns a string representation for which a FlowFile should be penalized + * when {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called + * + * @return + */ + String getPenalizationPeriod(); + + /** + * @param timeUnit determines the unit of time to represent the yield + * period. + * @return + */ + long getYieldPeriod(TimeUnit timeUnit); + + /** + * returns the string representation for this Connectable's configured yield + * period + * + * @return + */ + String getYieldPeriod(); + + /** + * Updates the amount of time that this Connectable should avoid being + * scheduled when the processor calls + * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()} + * + * @param yieldPeriod + */ + void setYieldPeriod(String yieldPeriod); + + /** + * Updates the amount of time that this Connectable will penalize FlowFiles + * when {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called + * @param penalizationPeriod + */ + void setPenalizationPeriod(String penalizationPeriod); + + /** + * Causes the processor not to be scheduled for some period of time. This + * duration can be obtained and set via the + * {@link #getYieldPeriod(TimeUnit)} and + * {@link #setYieldPeriod(long, TimeUnit)} methods. + */ + void yield(); + + /** + * Returns the time in milliseconds since Epoch at which this Connectable + * should no longer yield its threads + * + * @return + */ + long getYieldExpiration(); + + /** + * Specifies whether or not this component is considered side-effect free, + * with respect to external systems. + * + * @return + */ + boolean isSideEffectFree(); + + void verifyCanDelete() throws IllegalStateException; + + void verifyCanDelete(boolean ignoreConnections) throws IllegalStateException; + + void verifyCanStart() throws IllegalStateException; + + void verifyCanStop() throws IllegalStateException; + + void verifyCanUpdate() throws IllegalStateException; + + void verifyCanEnable() throws IllegalStateException; + + void verifyCanDisable() throws IllegalStateException; + + SchedulingStrategy getSchedulingStrategy(); +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java new file mode 100644 index 0000000..0334bfb --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.connectable; + +import javax.xml.bind.annotation.XmlEnum; + +@XmlEnum +public enum ConnectableType { + + PROCESSOR, + /** + * Port that lives within an RemoteProcessGroup and is used to send data to + * remote NiFi instances + */ + REMOTE_INPUT_PORT, + /** + * Port that lives within a RemoteProcessGroup and is used to receive data + * from remote NiFi instances + */ + REMOTE_OUTPUT_PORT, + /** + * Root Group Input Ports and Local Input Ports + */ + INPUT_PORT, + /** + * Root Group Output Ports and Local Output Ports + */ + OUTPUT_PORT, + FUNNEL +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java new file mode 100644 index 0000000..0a0089d --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.connectable; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.controller.FlowFileQueue; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.Relationship; + +public interface Connection { + + void enqueue(FlowFileRecord flowFile); + + void enqueue(Collection<FlowFileRecord> flowFiles); + + Connectable getDestination(); + + Collection<Relationship> getRelationships(); + + FlowFileQueue getFlowFileQueue(); + + String getIdentifier(); + + String getName(); + + void setName(String name); + + void setBendPoints(List<Position> position); + + List<Position> getBendPoints(); + + int getLabelIndex(); + + void setLabelIndex(int labelIndex); + + long getZIndex(); + + void setZIndex(long zIndex); + + Connectable getSource(); + + void setRelationships(Collection<Relationship> newRelationships); + + void setDestination(final Connectable newDestination); + + void setProcessGroup(ProcessGroup processGroup); + + ProcessGroup getProcessGroup(); + + void lock(); + + void unlock(); + + List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords); + + void verifyCanUpdate() throws IllegalStateException; + + void verifyCanDelete() throws IllegalStateException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java new file mode 100644 index 0000000..cceca8f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.connectable; + +import org.apache.nifi.controller.ScheduledState; + +public interface Funnel extends Connectable { + + void setScheduledState(ScheduledState scheduledState); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java new file mode 100644 index 0000000..907dd92 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.connectable; + +public interface Port extends Connectable { + + void shutdown(); + + boolean isValid(); + + /** + * <p> + * This method is called just before a Port is scheduled to run, giving the + * Port a chance to initialize any resources needed.</p> + */ + void onSchedulingStart(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java new file mode 100644 index 0000000..75d04f5 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.connectable; + +public class Position { + + private final double x; + private final double y; + + public Position(final double x, final double y) { + this.x = x; + this.y = y; + } + + public double getX() { + return x; + } + + public double getY() { + return y; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java new file mode 100644 index 0000000..cea13d2 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.connectable; + +public class Size { + + private final double width; + private final double height; + + public Size(final double width, final double height) { + this.width = width; + this.height = height; + } + + public double getWidth() { + return width; + } + + public double getHeight() { + return height; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java new file mode 100644 index 0000000..ef4b72a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.nar.NarCloseable; + +public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent { + + private final String id; + private final ConfigurableComponent component; + private final ValidationContextFactory validationContextFactory; + private final ControllerServiceProvider serviceProvider; + + private final AtomicReference<String> name = new AtomicReference<>(); + private final AtomicReference<String> annotationData = new AtomicReference<>(); + + private final Lock lock = new ReentrantLock(); + private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>(); + + public AbstractConfiguredComponent(final ConfigurableComponent component, final String id, + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { + this.id = id; + this.component = component; + this.validationContextFactory = validationContextFactory; + this.serviceProvider = serviceProvider; + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public String getName() { + return name.get(); + } + + @Override + public void setName(final String name) { + this.name.set(Objects.requireNonNull(name).intern()); + } + + @Override + public String getAnnotationData() { + return annotationData.get(); + } + + @Override + public void setAnnotationData(final String data) { + annotationData.set(data); + } + + @Override + public void setProperty(final String name, final String value) { + if (null == name || null == value) { + throw new IllegalArgumentException(); + } + + lock.lock(); + try { + verifyModifiable(); + + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + final PropertyDescriptor descriptor = component.getPropertyDescriptor(name); + + final String oldValue = properties.put(descriptor, value); + if (!value.equals(oldValue)) { + + if (descriptor.getControllerServiceDefinition() != null) { + if (oldValue != null) { + final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(oldValue); + if (oldNode != null) { + oldNode.removeReference(this); + } + } + + final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(value); + if (newNode != null) { + newNode.addReference(this); + } + } + + try { + component.onPropertyModified(descriptor, oldValue, value); + } catch (final Throwable t) { + // nothing really to do here... + } + } + } + } finally { + lock.unlock(); + } + } + + /** + * Removes the property and value for the given property name if a + * descriptor and value exists for the given name. If the property is + * optional its value might be reset to default or will be removed entirely + * if was a dynamic property. + * + * @param name the property to remove + * @return true if removed; false otherwise + * @throws java.lang.IllegalArgumentException if the name is null + */ + @Override + public boolean removeProperty(final String name) { + if (null == name) { + throw new IllegalArgumentException(); + } + + lock.lock(); + try { + verifyModifiable(); + + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + final PropertyDescriptor descriptor = component.getPropertyDescriptor(name); + String value = null; + if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) { + component.onPropertyModified(descriptor, value, null); + return true; + } + } + } finally { + lock.unlock(); + } + return false; + } + + @Override + public Map<PropertyDescriptor, String> getProperties() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + final List<PropertyDescriptor> supported = component.getPropertyDescriptors(); + if (supported == null || supported.isEmpty()) { + return Collections.unmodifiableMap(properties); + } else { + final Map<PropertyDescriptor, String> props = new LinkedHashMap<>(); + for (final PropertyDescriptor descriptor : supported) { + props.put(descriptor, null); + } + props.putAll(properties); + return props; + } + } + } + + @Override + public String getProperty(final PropertyDescriptor property) { + return properties.get(property); + } + + @Override + public int hashCode() { + return 273171 * id.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + + if (!(obj instanceof ConfiguredComponent)) { + return false; + } + + final ConfiguredComponent other = (ConfiguredComponent) obj; + return id.equals(other.getIdentifier()); + } + + @Override + public String toString() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return component.toString(); + } + } + + @Override + public Collection<ValidationResult> validate(final ValidationContext context) { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return component.validate(context); + } + } + + @Override + public PropertyDescriptor getPropertyDescriptor(final String name) { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return component.getPropertyDescriptor(name); + } + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + component.onPropertyModified(descriptor, oldValue, newValue); + } + } + + @Override + public List<PropertyDescriptor> getPropertyDescriptors() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return component.getPropertyDescriptors(); + } + } + + @Override + public boolean isValid() { + final Collection<ValidationResult> validationResults = validate(validationContextFactory.newValidationContext(getProperties(), getAnnotationData())); + for (final ValidationResult result : validationResults) { + if (!result.isValid()) { + return false; + } + } + + return true; + } + + @Override + public Collection<ValidationResult> getValidationErrors() { + final List<ValidationResult> results = new ArrayList<>(); + lock.lock(); + try { + final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData()); + + final Collection<ValidationResult> validationResults; + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + validationResults = component.validate(validationContext); + } + + for (final ValidationResult result : validationResults) { + if (!result.isValid()) { + results.add(result); + } + } + } catch (final Throwable t) { + results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build()); + } finally { + lock.unlock(); + } + return results; + } + + public abstract void verifyModifiable() throws IllegalStateException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java new file mode 100644 index 0000000..e1d2dd4 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java @@ -0,0 +1,636 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.FormatUtils; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +public abstract class AbstractPort implements Port { + + public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder() + .description("The relationship through which all Flow Files are transferred") + .name("") + .build(); + + public static final long MINIMUM_PENALIZATION_MILLIS = 0L; + public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + + public static final long MINIMUM_YIELD_MILLIS = 0L; + public static final long DEFAULT_YIELD_PERIOD = 10000L; + public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS; + + private final List<Relationship> relationships; + + private final String id; + private final ConnectableType type; + private final AtomicReference<String> name; + private final AtomicReference<Position> position; + private final AtomicReference<String> comments; + private final AtomicReference<ProcessGroup> processGroup; + private final AtomicBoolean lossTolerant; + private final AtomicReference<ScheduledState> scheduledState; + private final AtomicInteger concurrentTaskCount; + private final AtomicReference<String> penalizationPeriod; + private final AtomicReference<String> yieldPeriod; + private final AtomicReference<String> schedulingPeriod; + private final AtomicLong schedulingNanos; + private final AtomicLong yieldExpiration; + private final ProcessScheduler processScheduler; + + private final Set<Connection> outgoingConnections; + private final List<Connection> incomingConnections; + + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + public AbstractPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) { + this.id = requireNonNull(id); + this.name = new AtomicReference<>(requireNonNull(name)); + position = new AtomicReference<>(new Position(0D, 0D)); + outgoingConnections = new HashSet<>(); + incomingConnections = new ArrayList<>(); + comments = new AtomicReference<>(); + lossTolerant = new AtomicBoolean(false); + concurrentTaskCount = new AtomicInteger(1); + processScheduler = scheduler; + + final List<Relationship> relationshipList = new ArrayList<>(); + relationshipList.add(PORT_RELATIONSHIP); + relationships = Collections.unmodifiableList(relationshipList); + this.processGroup = new AtomicReference<>(processGroup); + this.type = type; + penalizationPeriod = new AtomicReference<>("30 sec"); + yieldPeriod = new AtomicReference<>("1 sec"); + yieldExpiration = new AtomicLong(0L); + schedulingPeriod = new AtomicReference<>("0 millis"); + schedulingNanos = new AtomicLong(30000); + scheduledState = new AtomicReference<>(ScheduledState.STOPPED); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public String getName() { + return name.get(); + } + + @Override + public void setName(final String name) { + if (this.name.get().equals(name)) { + return; + } + + final ProcessGroup parentGroup = this.processGroup.get(); + if (getConnectableType() == ConnectableType.INPUT_PORT) { + if (parentGroup.getInputPortByName(name) != null) { + throw new IllegalStateException("Cannot rename port from " + this.name.get() + " to " + name + " because the ProcessGroup already has an Input Port named " + name); + } + } else if (getConnectableType() == ConnectableType.OUTPUT_PORT) { + if (parentGroup.getOutputPortByName(name) != null) { + throw new IllegalStateException("Cannot rename port from " + this.name.get() + " to " + name + " because the ProcessGroup already has an Output Port named " + name); + } + } + + this.name.set(name); + } + + @Override + public ProcessGroup getProcessGroup() { + return processGroup.get(); + } + + @Override + public void setProcessGroup(final ProcessGroup newGroup) { + this.processGroup.set(newGroup); + } + + @Override + public String getComments() { + return comments.get(); + } + + @Override + public void setComments(final String comments) { + this.comments.set(comments); + } + + @Override + public Collection<Relationship> getRelationships() { + return relationships; + } + + @Override + public Relationship getRelationship(final String relationshipName) { + if (PORT_RELATIONSHIP.getName().equals(relationshipName)) { + return PORT_RELATIONSHIP; + } + return null; + } + + @Override + public void addConnection(final Connection connection) throws IllegalArgumentException { + writeLock.lock(); + try { + if (!requireNonNull(connection).getSource().equals(this)) { + if (connection.getDestination().equals(this)) { + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + if (!incomingConnections.contains(connection)) { + incomingConnections.add(connection); + } + + return; + } else { + throw new IllegalArgumentException("Cannot add a connection to a LocalPort for which the LocalPort is neither the Source nor the Destination"); + } + } + + for (final Relationship relationship : connection.getRelationships()) { + if (!relationship.equals(PORT_RELATIONSHIP)) { + throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Local Ports"); + } + } + + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + if (!outgoingConnections.contains(connection)) { + outgoingConnections.add(connection); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean hasIncomingConnection() { + readLock.lock(); + try { + return !incomingConnections.isEmpty(); + } finally { + readLock.unlock(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + final ProcessSession session = sessionFactory.createSession(); + + try { + onTrigger(context, session); + session.commit(); + } catch (final ProcessException e) { + session.rollback(); + throw e; + } catch (final Throwable t) { + session.rollback(); + throw new RuntimeException(t); + } + } + + public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; + + @Override + public void updateConnection(final Connection connection) throws IllegalStateException { + if (requireNonNull(connection).getSource().equals(this)) { + writeLock.lock(); + try { + if (!outgoingConnections.remove(connection)) { + throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); + } + outgoingConnections.add(connection); + } finally { + writeLock.unlock(); + } + } else if (connection.getDestination().equals(this)) { + writeLock.lock(); + try { + if (!incomingConnections.remove(connection)) { + throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); + } + incomingConnections.add(connection); + } finally { + writeLock.unlock(); + } + } else { + throw new IllegalStateException("The given connection is not currently registered for this Port"); + } + } + + @Override + public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException { + writeLock.lock(); + try { + if (!requireNonNull(connection).getSource().equals(this)) { + final boolean existed = incomingConnections.remove(connection); + if (!existed) { + throw new IllegalStateException("The given connection is not currently registered for this Port"); + } + return; + } + + if (!canConnectionBeRemoved(connection)) { + // TODO: Determine which processors will be broken if connection is removed, rather than just returning a boolean + throw new IllegalStateException(connection + " cannot be removed"); + } + + final boolean removed = outgoingConnections.remove(connection); + if (!removed) { + throw new IllegalStateException(connection + " is not registered with " + this); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Verify that removing this connection will not prevent this Port from + * still being connected via each relationship + * + * @param connection + * @return + */ + private boolean canConnectionBeRemoved(final Connection connection) { + final Connectable source = connection.getSource(); + if (!source.isRunning()) { + // we don't have to verify that this Connectable is still connected because it's okay to make + // the source invalid since it is not running. + return true; + } + + for (final Relationship relationship : source.getRelationships()) { + if (source.isAutoTerminated(relationship)) { + continue; + } + + final Set<Connection> connectionsForRelationship = source.getConnections(relationship); + if (connectionsForRelationship == null || connectionsForRelationship.isEmpty()) { + return false; + } + } + + return true; + } + + @Override + public Set<Connection> getConnections() { + readLock.lock(); + try { + return Collections.unmodifiableSet(outgoingConnections); + } finally { + readLock.unlock(); + } + } + + @Override + public Set<Connection> getConnections(final Relationship relationship) { + readLock.lock(); + try { + if (relationship.equals(PORT_RELATIONSHIP)) { + return Collections.unmodifiableSet(outgoingConnections); + } + + throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Local Ports"); + } finally { + readLock.unlock(); + } + } + + @Override + public Position getPosition() { + return position.get(); + } + + @Override + public void setPosition(final Position position) { + this.position.set(position); + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("name", getName()).append("id", getIdentifier()).toString(); + } + + @Override + public List<Connection> getIncomingConnections() { + readLock.lock(); + try { + return Collections.unmodifiableList(incomingConnections); + } finally { + readLock.unlock(); + } + } + + /** + * Indicates whether or not this Port is valid. + * + * @return + */ + @Override + public abstract boolean isValid(); + + @Override + public boolean isAutoTerminated(final Relationship relationship) { + return false; + } + + @Override + public boolean isLossTolerant() { + return lossTolerant.get(); + } + + @Override + public void setLossTolerant(boolean lossTolerant) { + this.lossTolerant.set(lossTolerant); + } + + @Override + public void setMaxConcurrentTasks(final int taskCount) { + if (taskCount < 1) { + throw new IllegalArgumentException(); + } + concurrentTaskCount.set(taskCount); + } + + /** + * @return the number of tasks that may execute concurrently for this + * processor + */ + @Override + public int getMaxConcurrentTasks() { + return concurrentTaskCount.get(); + } + + /** + * + */ + @Override + public void shutdown() { + scheduledState.set(ScheduledState.STOPPED); + } + + @Override + public void onSchedulingStart() { + scheduledState.set(ScheduledState.RUNNING); + } + + public void disable() { + final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED); + if (!updated) { + throw new IllegalStateException("Port cannot be disabled because it is not stopped"); + } + } + + public void enable() { + final boolean updated = scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED); + if (!updated) { + throw new IllegalStateException("Port cannot be enabled because it is not disabled"); + } + } + + @Override + public boolean isRunning() { + return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0; + } + + @Override + public ScheduledState getScheduledState() { + return scheduledState.get(); + } + + @Override + public ConnectableType getConnectableType() { + return type; + } + + /** + * Updates the amount of time that this processor should avoid being + * scheduled when the processor calls + * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()} + * + * @param yieldPeriod + */ + @Override + public void setYieldPeriod(final String yieldPeriod) { + final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); + if (yieldMillis < 0) { + throw new IllegalArgumentException("Yield duration must be positive"); + } + this.yieldPeriod.set(yieldPeriod); + } + + /** + * @param schedulingPeriod + */ + @Override + public void setScheduldingPeriod(final String schedulingPeriod) { + final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS); + if (schedulingNanos < 0) { + throw new IllegalArgumentException("Scheduling Period must be positive"); + } + + this.schedulingPeriod.set(schedulingPeriod); + this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + @Override + public String getPenalizationPeriod() { + return penalizationPeriod.get(); + } + + /** + * Causes the processor not to be scheduled for some period of time. This + * duration can be obtained and set via the + * {@link #getYieldPeriod(TimeUnit)} and + * {@link #setYieldPeriod(long, TimeUnit)} methods. + */ + @Override + public void yield() { + final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); + yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); + } + + @Override + public long getYieldExpiration() { + return yieldExpiration.get(); + } + + @Override + public long getSchedulingPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); + } + + @Override + public String getSchedulingPeriod() { + return schedulingPeriod.get(); + } + + @Override + public void setPenalizationPeriod(final String penalizationPeriod) { + this.penalizationPeriod.set(penalizationPeriod); + } + + @Override + public String getYieldPeriod() { + return yieldPeriod.get(); + } + + @Override + public long getYieldPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + @Override + public void verifyCanDelete() throws IllegalStateException { + verifyCanDelete(false); + } + + @Override + public void verifyCanDelete(final boolean ignoreConnections) { + readLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException(this + " is running"); + } + + if (!ignoreConnections) { + for (final Connection connection : outgoingConnections) { + connection.verifyCanDelete(); + } + + for (final Connection connection : incomingConnections) { + if (connection.getSource().equals(this)) { + connection.verifyCanDelete(); + } else { + throw new IllegalStateException(this + " is the destination of another component"); + } + } + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanStart() { + readLock.lock(); + try { + if (scheduledState.get() != ScheduledState.STOPPED) { + throw new IllegalStateException(this + " is not stopped"); + } + verifyNoActiveThreads(); + + final Collection<ValidationResult> validationResults = getValidationErrors(); + if (!validationResults.isEmpty()) { + throw new IllegalStateException(this + " is not in a valid state: " + validationResults.iterator().next().getExplanation()); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanStop() { + if (getScheduledState() != ScheduledState.RUNNING) { + throw new IllegalStateException(this + " is not scheduled to run"); + } + } + + @Override + public void verifyCanUpdate() { + readLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException(this + " is not stopped"); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanEnable() { + readLock.lock(); + try { + if (getScheduledState() != ScheduledState.DISABLED) { + throw new IllegalStateException(this + " is not disabled"); + } + + verifyNoActiveThreads(); + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanDisable() { + readLock.lock(); + try { + if (getScheduledState() != ScheduledState.STOPPED) { + throw new IllegalStateException(this + " is not stopped"); + } + verifyNoActiveThreads(); + } finally { + readLock.unlock(); + } + } + + private void verifyNoActiveThreads() throws IllegalStateException { + final int threadCount = processScheduler.getActiveThreadCount(this); + if (threadCount > 0) { + throw new IllegalStateException(this + " has " + threadCount + " threads still active"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java new file mode 100644 index 0000000..38df6f7 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +public enum Availability { + + CLUSTER_MANAGER_ONLY, + NODE_ONLY, + BOTH; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java new file mode 100644 index 0000000..5b95524 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.Collection; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; + +public interface ConfiguredComponent { + + public String getIdentifier(); + + public String getName(); + + public void setName(String name); + + public String getAnnotationData(); + + public void setAnnotationData(String data); + + public void setProperty(String name, String value); + + /** + * Removes the property and value for the given property name if a + * descriptor and value exists for the given name. If the property is + * optional its value might be reset to default or will be removed entirely + * if was a dynamic property. + * + * @param name the property to remove + * @return true if removed; false otherwise + * @throws java.lang.IllegalArgumentException if the name is null + */ + public boolean removeProperty(String name); + + public Map<PropertyDescriptor, String> getProperties(); + + public String getProperty(final PropertyDescriptor property); + + boolean isValid(); + + /** + * Returns the any validation errors for this connectable. + * + * @return + */ + Collection<ValidationResult> getValidationErrors(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java new file mode 100644 index 0000000..eee878e --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +/** + * Provides information about whether or not the data referenced in a Provenance + * Event can be replayed or downloaded + */ +public interface ContentAvailability { + + /** + * Returns a boolean indicating whether or not the Input content is + * available + * + * @return + */ + boolean isInputAvailable(); + + /** + * Returns a boolean indicating whether or not the Output content is + * available + * + * @return + */ + boolean isOutputAvailable(); + + /** + * Returns <code>true</code> if the Input content is the same as the Output + * content + * + * @return + */ + boolean isContentSame(); + + /** + * Returns a boolean indicating whether or not the content is replayable. If + * this returns <code>false</code>, the reason that replay is not available + * can be determined by calling {@link #getReasonNotReplayable()}. + * + * @return + */ + boolean isReplayable(); + + /** + * Returns the reason that the content cannot be replayed, or + * <code>null</code> if the content can be replayed. + * + * @return + */ + String getReasonNotReplayable(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java new file mode 100644 index 0000000..eaa0c48 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +public interface Counter { + + void adjust(long delta); + + String getName(); + + long getValue(); + + String getContext(); + + String getIdentifier(); + + void reset(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java new file mode 100644 index 0000000..280f69d --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.nifi.connectable.Connectable; + +/** + * Wraps a Connectable object and maintains a count of how many unanswered + * events have been reported for the Connectable + */ +public interface EventBasedWorker { + + Connectable getConnectable(); + + int incrementEventCount(); + + int decrementEventCount(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java new file mode 100644 index 0000000..1195bc9 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +public interface Heartbeater { + + void heartbeat(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java new file mode 100644 index 0000000..303f540 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.processor.annotation.OnScheduled; +import org.apache.nifi.processor.annotation.OnUnscheduled; +import org.apache.nifi.scheduling.SchedulingStrategy; + +public interface ProcessScheduler { + + /** + * Shuts down the scheduler, stopping all components + */ + void shutdown(); + + /** + * Starts scheduling the given processor to run after invoking all methods + * on the underlying {@link nifi.processor.Processor FlowFileProcessor} that + * are annotated with the {@link OnScheduled} annotation. If the Processor + * is already scheduled to run, does nothing. + * + * @param procNode + * @throws IllegalStateException if the Processor is disabled + */ + void startProcessor(ProcessorNode procNode); + + /** + * Stops scheduling the given processor to run and invokes all methods on + * the underlying {@link nifi.processor.Processor FlowFileProcessor} that + * are annotated with the {@link OnUnscheduled} annotation. This does not + * interrupt any threads that are currently running within the given + * Processor. If the Processor is not scheduled to run, does nothing. + * @param procNode + */ + void stopProcessor(ProcessorNode procNode); + + /** + * Starts scheduling the given Port to run. If the Port is already scheduled + * to run, does nothing. + * + * @param port + * + * @throws IllegalStateException if the Port is disabled + */ + void startPort(Port port); + + /** + * Stops scheduling the given Port to run. This does not interrupt any + * threads that are currently running within the given port. This does not + * interrupt any threads that are currently running within the given Port. + * If the Port is not scheduled to run, does nothing. + * + * @param port + */ + void stopPort(Port port); + + /** + * Starts scheduling the given Funnel to run. If the funnel is already + * scheduled to run, does nothing. + * + * @param funnel + * + * @throws IllegalStateException if the Funnel is disabled + */ + void startFunnel(Funnel funnel); + + /** + * Stops scheduling the given Funnel to run. This does not interrupt any + * threads that are currently running within the given funnel. If the funnel + * is not scheduled to run, does nothing. + * + * @param funnel + */ + void stopFunnel(Funnel funnel); + + void enableFunnel(Funnel funnel); + + void enablePort(Port port); + + void enableProcessor(ProcessorNode procNode); + + void disableFunnel(Funnel funnel); + + void disablePort(Port port); + + void disableProcessor(ProcessorNode procNode); + + /** + * Returns the number of threads currently active for the given + * <code>Connectable</code>. + * + * @param scheduled + * @return + */ + int getActiveThreadCount(Object scheduled); + + /** + * Returns a boolean indicating whether or not the given object is scheduled + * to run + * + * @param scheduled + * @return + */ + boolean isScheduled(Object scheduled); + + /** + * Registers a relevant event for an Event-Driven worker + * + * @param worker + */ + void registerEvent(Connectable worker); + + /** + * Notifies the ProcessScheduler of how many threads are available to use + * for the given {@link SchedulingStrategy} + * + * @param strategy + * @param maxThreadCount + */ + void setMaxThreadCount(SchedulingStrategy strategy, int maxThreadCount); + + /** + * Notifies the Scheduler that it should stop scheduling the given component + * until its yield duration has expired + * + * @param procNode + */ + void yield(ProcessorNode procNode); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java new file mode 100644 index 0000000..f6786fa --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.scheduling.SchedulingStrategy; + +public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable { + + public ProcessorNode(final Processor processor, final String id, + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { + super(processor, id, validationContextFactory, serviceProvider); + } + + public abstract boolean isIsolated(); + + public abstract boolean isTriggerWhenAnyDestinationAvailable(); + + @Override + public abstract boolean isSideEffectFree(); + + public abstract boolean isTriggeredSerially(); + + public abstract boolean isEventDrivenSupported(); + + public abstract boolean isHighThroughputSupported(); + + @Override + public abstract boolean isValid(); + + public abstract void setScheduledState(ScheduledState scheduledState); + + public abstract void setBulletinLevel(LogLevel bulletinLevel); + + public abstract LogLevel getBulletinLevel(); + + public abstract Processor getProcessor(); + + public abstract void yield(long period, TimeUnit timeUnit); + + public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships); + + public abstract Set<Relationship> getAutoTerminatedRelationships(); + + public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy); + + @Override + public abstract SchedulingStrategy getSchedulingStrategy(); + + public abstract void setRunDuration(long duration, TimeUnit timeUnit); + + public abstract long getRunDuration(TimeUnit timeUnit); + + public abstract Map<String, String> getStyle(); + + public abstract void setStyle(Map<String, String> style); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java new file mode 100644 index 0000000..6b8ede0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingTask; +import org.apache.nifi.scheduling.SchedulingStrategy; + +public interface ReportingTaskNode extends ConfiguredComponent { + + Availability getAvailability(); + + void setAvailability(Availability availability); + + void setSchedulingStrategy(SchedulingStrategy schedulingStrategy); + + SchedulingStrategy getSchedulingStrategy(); + + /** + * @return a string representation of the time between each scheduling + * period + */ + String getSchedulingPeriod(); + + long getSchedulingPeriod(TimeUnit timeUnit); + + /** + * Updates how often the ReportingTask should be triggered to run + * @param schedulingPeriod + */ + void setScheduldingPeriod(String schedulingPeriod); + + ReportingTask getReportingTask(); + + ReportingContext getReportingContext(); + + ConfigurationContext getConfigurationContext(); + + boolean isRunning(); +}