http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java deleted file mode 100644 index 630631f..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -import javax.xml.bind.annotation.adapters.XmlAdapter; - -public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, NodeInformation> { - - @Override - public NodeInformation unmarshal(final AdaptedNodeInformation adapted) throws Exception { - return new NodeInformation(adapted.getHostname(), adapted.getSiteToSitePort(), adapted.getApiPort(), adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles()); - } - - @Override - public AdaptedNodeInformation marshal(final NodeInformation nodeInformation) throws Exception { - final AdaptedNodeInformation adapted = new AdaptedNodeInformation(); - adapted.setHostname(nodeInformation.getHostname()); - adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort()); - adapted.setApiPort(nodeInformation.getAPIPort()); - adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure()); - adapted.setTotalFlowFiles(nodeInformation.getTotalFlowFiles()); - return adapted; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java deleted file mode 100644 index 57c1c30..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -public interface DataFlow { - - /** - * @return the raw byte array of the flow - */ - public byte[] getFlow(); - - /** - * @return the raw byte array of the templates - */ - public byte[] getTemplates(); - - /** - * @return the raw byte array of the snippets - */ - public byte[] getSnippets(); - - /** - * @return true if processors should be automatically started at application - * startup; false otherwise - */ - public boolean isAutoStartProcessors(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java deleted file mode 100644 index 3d5c75d..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.connectable; - -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.Triggerable; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.scheduling.SchedulingStrategy; - -/** - * Represents a connectable component to which or from which data can flow. - */ -public interface Connectable extends Triggerable { - - /** - * @return the unique identifier for this <code>Connectable</code> - */ - String getIdentifier(); - - /** - * @return a Collection of all relationships for this Connectable - */ - Collection<Relationship> getRelationships(); - - /** - * Returns the ProcessorRelationship whose name is given - * - * @param relationshipName - * @return a ProcessorRelationship whose name is given, or <code>null</code> - * if none exists - */ - Relationship getRelationship(String relationshipName); - - /** - * Adds the given connection to this Connectable. - * - * @param connection the connection to add - * @throws NullPointerException if the argument is null - * @throws IllegalArgumentException if the given Connection is not valid - */ - void addConnection(Connection connection) throws IllegalArgumentException; - - /** - * @return true if the Connectable is the destination of any other - * Connectable, false otherwise. - */ - boolean hasIncomingConnection(); - - /** - * - * @param connection - * @throws IllegalStateException if the given Connection is not registered - * to <code>this</code>. - */ - void removeConnection(Connection connection) throws IllegalStateException; - - /** - * Updates any internal state that depends on the given connection. The - * given connection will share the same ID as the old connection. - * - * @param newConnection - * @throws IllegalStateException - */ - void updateConnection(Connection newConnection) throws IllegalStateException; - - /** - * @return a <code>Set</code> of all <code>Connection</code>s for which this - * <code>Connectable</code> is the destination - */ - List<Connection> getIncomingConnections(); - - /** - * @return a <code>Set</code> of all <code>Connection</code>s for which this - * <code>Connectable</code> is the source; if no connections exist, will - * return an empty Collection. Guaranteed not null. - */ - Set<Connection> getConnections(); - - /** - * @param relationship - * @return a <code>Set</code> of all <code>Connection</code>s that contain - * the given relationship for which this <code>Connectable</code> is the - * source - */ - Set<Connection> getConnections(Relationship relationship); - - /** - * Returns the position on the graph where this Connectable is located - * - * @return - */ - Position getPosition(); - - /** - * Updates this component's position on the graph - * - * @param position - */ - void setPosition(Position position); - - /** - * @return the name of this Connectable - */ - String getName(); - - /** - * Sets the name of this Connectable so that its name will be visible on the - * UI - * @param name - */ - void setName(String name); - - /** - * @return the comments of this Connectable - */ - String getComments(); - - /** - * Sets the comments of this Connectable. - * @param comments - */ - void setComments(String comments); - - /** - * If true, - * {@link #onTrigger(nifi.processor.ProcessContext, nifi.processor.ProcessSessionFactory)} - * should be called even when this Connectable has no FlowFiles queued for - * processing - * - * @return - */ - boolean isTriggerWhenEmpty(); - - /** - * Returns the ProcessGroup to which this <code>Connectable</code> belongs - * - * @return - */ - ProcessGroup getProcessGroup(); - - /** - * Sets the new ProcessGroup to which this <code>Connectable</code> belongs - * - * @param group - */ - void setProcessGroup(ProcessGroup group); - - /** - * - * @param relationship the relationship - * @return true indicates flow files transferred to the given relationship - * should be terminated if the relationship is not connected to another - * FlowFileConsumer; false indicates they will not be terminated and the - * processor will not be valid until specified - */ - boolean isAutoTerminated(Relationship relationship); - - /** - * Indicates whether flow file content made by this connectable must be - * persisted - * - * @return - */ - boolean isLossTolerant(); - - /** - * @param lossTolerant - */ - void setLossTolerant(boolean lossTolerant); - - /** - * @return the type of the Connectable - */ - ConnectableType getConnectableType(); - - /** - * Returns the any validation errors for this connectable. - * - * @return - */ - Collection<ValidationResult> getValidationErrors(); - - /** - * Returns the amount of time for which a FlowFile should be penalized when - * {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called - * - * @param timeUnit - * @return - */ - long getPenalizationPeriod(final TimeUnit timeUnit); - - /** - * Returns a string representation for which a FlowFile should be penalized - * when {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called - * - * @return - */ - String getPenalizationPeriod(); - - /** - * @param timeUnit determines the unit of time to represent the yield - * period. - * @return - */ - long getYieldPeriod(TimeUnit timeUnit); - - /** - * returns the string representation for this Connectable's configured yield - * period - * - * @return - */ - String getYieldPeriod(); - - /** - * Updates the amount of time that this Connectable should avoid being - * scheduled when the processor calls - * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()} - * - * @param yieldPeriod - */ - void setYieldPeriod(String yieldPeriod); - - /** - * Updates the amount of time that this Connectable will penalize FlowFiles - * when {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called - * @param penalizationPeriod - */ - void setPenalizationPeriod(String penalizationPeriod); - - /** - * Causes the processor not to be scheduled for some period of time. This - * duration can be obtained and set via the - * {@link #getYieldPeriod(TimeUnit)} and - * {@link #setYieldPeriod(long, TimeUnit)} methods. - */ - void yield(); - - /** - * Returns the time in milliseconds since Epoch at which this Connectable - * should no longer yield its threads - * - * @return - */ - long getYieldExpiration(); - - /** - * Specifies whether or not this component is considered side-effect free, - * with respect to external systems. - * - * @return - */ - boolean isSideEffectFree(); - - void verifyCanDelete() throws IllegalStateException; - - void verifyCanDelete(boolean ignoreConnections) throws IllegalStateException; - - void verifyCanStart() throws IllegalStateException; - - void verifyCanStop() throws IllegalStateException; - - void verifyCanUpdate() throws IllegalStateException; - - void verifyCanEnable() throws IllegalStateException; - - void verifyCanDisable() throws IllegalStateException; - - SchedulingStrategy getSchedulingStrategy(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java deleted file mode 100644 index 0334bfb..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.connectable; - -import javax.xml.bind.annotation.XmlEnum; - -@XmlEnum -public enum ConnectableType { - - PROCESSOR, - /** - * Port that lives within an RemoteProcessGroup and is used to send data to - * remote NiFi instances - */ - REMOTE_INPUT_PORT, - /** - * Port that lives within a RemoteProcessGroup and is used to receive data - * from remote NiFi instances - */ - REMOTE_OUTPUT_PORT, - /** - * Root Group Input Ports and Local Input Ports - */ - INPUT_PORT, - /** - * Root Group Output Ports and Local Output Ports - */ - OUTPUT_PORT, - FUNNEL -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java deleted file mode 100644 index 0a0089d..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.connectable; - -import java.util.Collection; -import java.util.List; -import java.util.Set; - -import org.apache.nifi.controller.FlowFileQueue; -import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.processor.FlowFileFilter; -import org.apache.nifi.processor.Relationship; - -public interface Connection { - - void enqueue(FlowFileRecord flowFile); - - void enqueue(Collection<FlowFileRecord> flowFiles); - - Connectable getDestination(); - - Collection<Relationship> getRelationships(); - - FlowFileQueue getFlowFileQueue(); - - String getIdentifier(); - - String getName(); - - void setName(String name); - - void setBendPoints(List<Position> position); - - List<Position> getBendPoints(); - - int getLabelIndex(); - - void setLabelIndex(int labelIndex); - - long getZIndex(); - - void setZIndex(long zIndex); - - Connectable getSource(); - - void setRelationships(Collection<Relationship> newRelationships); - - void setDestination(final Connectable newDestination); - - void setProcessGroup(ProcessGroup processGroup); - - ProcessGroup getProcessGroup(); - - void lock(); - - void unlock(); - - List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords); - - void verifyCanUpdate() throws IllegalStateException; - - void verifyCanDelete() throws IllegalStateException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java deleted file mode 100644 index cceca8f..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.connectable; - -import org.apache.nifi.controller.ScheduledState; - -public interface Funnel extends Connectable { - - void setScheduledState(ScheduledState scheduledState); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java deleted file mode 100644 index 907dd92..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.connectable; - -public interface Port extends Connectable { - - void shutdown(); - - boolean isValid(); - - /** - * <p> - * This method is called just before a Port is scheduled to run, giving the - * Port a chance to initialize any resources needed.</p> - */ - void onSchedulingStart(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java deleted file mode 100644 index 75d04f5..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.connectable; - -public class Position { - - private final double x; - private final double y; - - public Position(final double x, final double y) { - this.x = x; - this.y = y; - } - - public double getX() { - return x; - } - - public double getY() { - return y; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java deleted file mode 100644 index cea13d2..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.connectable; - -public class Size { - - private final double width; - private final double height; - - public Size(final double width, final double height) { - this.width = width; - this.height = height; - } - - public double getWidth() { - return width; - } - - public double getHeight() { - return height; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java deleted file mode 100644 index ef4b72a..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.nifi.components.ConfigurableComponent; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.nar.NarCloseable; - -public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent { - - private final String id; - private final ConfigurableComponent component; - private final ValidationContextFactory validationContextFactory; - private final ControllerServiceProvider serviceProvider; - - private final AtomicReference<String> name = new AtomicReference<>(); - private final AtomicReference<String> annotationData = new AtomicReference<>(); - - private final Lock lock = new ReentrantLock(); - private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>(); - - public AbstractConfiguredComponent(final ConfigurableComponent component, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { - this.id = id; - this.component = component; - this.validationContextFactory = validationContextFactory; - this.serviceProvider = serviceProvider; - } - - @Override - public String getIdentifier() { - return id; - } - - @Override - public String getName() { - return name.get(); - } - - @Override - public void setName(final String name) { - this.name.set(Objects.requireNonNull(name).intern()); - } - - @Override - public String getAnnotationData() { - return annotationData.get(); - } - - @Override - public void setAnnotationData(final String data) { - annotationData.set(data); - } - - @Override - public void setProperty(final String name, final String value) { - if (null == name || null == value) { - throw new IllegalArgumentException(); - } - - lock.lock(); - try { - verifyModifiable(); - - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - final PropertyDescriptor descriptor = component.getPropertyDescriptor(name); - - final String oldValue = properties.put(descriptor, value); - if (!value.equals(oldValue)) { - - if (descriptor.getControllerServiceDefinition() != null) { - if (oldValue != null) { - final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(oldValue); - if (oldNode != null) { - oldNode.removeReference(this); - } - } - - final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(value); - if (newNode != null) { - newNode.addReference(this); - } - } - - try { - component.onPropertyModified(descriptor, oldValue, value); - } catch (final Throwable t) { - // nothing really to do here... - } - } - } - } finally { - lock.unlock(); - } - } - - /** - * Removes the property and value for the given property name if a - * descriptor and value exists for the given name. If the property is - * optional its value might be reset to default or will be removed entirely - * if was a dynamic property. - * - * @param name the property to remove - * @return true if removed; false otherwise - * @throws java.lang.IllegalArgumentException if the name is null - */ - @Override - public boolean removeProperty(final String name) { - if (null == name) { - throw new IllegalArgumentException(); - } - - lock.lock(); - try { - verifyModifiable(); - - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - final PropertyDescriptor descriptor = component.getPropertyDescriptor(name); - String value = null; - if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) { - component.onPropertyModified(descriptor, value, null); - return true; - } - } - } finally { - lock.unlock(); - } - return false; - } - - @Override - public Map<PropertyDescriptor, String> getProperties() { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - final List<PropertyDescriptor> supported = component.getPropertyDescriptors(); - if (supported == null || supported.isEmpty()) { - return Collections.unmodifiableMap(properties); - } else { - final Map<PropertyDescriptor, String> props = new LinkedHashMap<>(); - for (final PropertyDescriptor descriptor : supported) { - props.put(descriptor, null); - } - props.putAll(properties); - return props; - } - } - } - - @Override - public String getProperty(final PropertyDescriptor property) { - return properties.get(property); - } - - @Override - public int hashCode() { - return 273171 * id.hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if (obj == this) { - return true; - } - if (obj == null) { - return false; - } - - if (!(obj instanceof ConfiguredComponent)) { - return false; - } - - final ConfiguredComponent other = (ConfiguredComponent) obj; - return id.equals(other.getIdentifier()); - } - - @Override - public String toString() { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - return component.toString(); - } - } - - @Override - public Collection<ValidationResult> validate(final ValidationContext context) { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - return component.validate(context); - } - } - - @Override - public PropertyDescriptor getPropertyDescriptor(final String name) { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - return component.getPropertyDescriptor(name); - } - } - - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - component.onPropertyModified(descriptor, oldValue, newValue); - } - } - - @Override - public List<PropertyDescriptor> getPropertyDescriptors() { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - return component.getPropertyDescriptors(); - } - } - - @Override - public boolean isValid() { - final Collection<ValidationResult> validationResults = validate(validationContextFactory.newValidationContext(getProperties(), getAnnotationData())); - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - return false; - } - } - - return true; - } - - @Override - public Collection<ValidationResult> getValidationErrors() { - final List<ValidationResult> results = new ArrayList<>(); - lock.lock(); - try { - final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData()); - - final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - validationResults = component.validate(validationContext); - } - - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - results.add(result); - } - } - } catch (final Throwable t) { - results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build()); - } finally { - lock.unlock(); - } - return results; - } - - public abstract void verifyModifiable() throws IllegalStateException; - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java deleted file mode 100644 index e1d2dd4..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java +++ /dev/null @@ -1,636 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import static java.util.Objects.requireNonNull; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.ConnectableType; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.connectable.Port; -import org.apache.nifi.connectable.Position; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.util.FormatUtils; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; - -public abstract class AbstractPort implements Port { - - public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder() - .description("The relationship through which all Flow Files are transferred") - .name("") - .build(); - - public static final long MINIMUM_PENALIZATION_MILLIS = 0L; - public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; - - public static final long MINIMUM_YIELD_MILLIS = 0L; - public static final long DEFAULT_YIELD_PERIOD = 10000L; - public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS; - - private final List<Relationship> relationships; - - private final String id; - private final ConnectableType type; - private final AtomicReference<String> name; - private final AtomicReference<Position> position; - private final AtomicReference<String> comments; - private final AtomicReference<ProcessGroup> processGroup; - private final AtomicBoolean lossTolerant; - private final AtomicReference<ScheduledState> scheduledState; - private final AtomicInteger concurrentTaskCount; - private final AtomicReference<String> penalizationPeriod; - private final AtomicReference<String> yieldPeriod; - private final AtomicReference<String> schedulingPeriod; - private final AtomicLong schedulingNanos; - private final AtomicLong yieldExpiration; - private final ProcessScheduler processScheduler; - - private final Set<Connection> outgoingConnections; - private final List<Connection> incomingConnections; - - private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); - private final Lock writeLock = rwLock.writeLock(); - - public AbstractPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) { - this.id = requireNonNull(id); - this.name = new AtomicReference<>(requireNonNull(name)); - position = new AtomicReference<>(new Position(0D, 0D)); - outgoingConnections = new HashSet<>(); - incomingConnections = new ArrayList<>(); - comments = new AtomicReference<>(); - lossTolerant = new AtomicBoolean(false); - concurrentTaskCount = new AtomicInteger(1); - processScheduler = scheduler; - - final List<Relationship> relationshipList = new ArrayList<>(); - relationshipList.add(PORT_RELATIONSHIP); - relationships = Collections.unmodifiableList(relationshipList); - this.processGroup = new AtomicReference<>(processGroup); - this.type = type; - penalizationPeriod = new AtomicReference<>("30 sec"); - yieldPeriod = new AtomicReference<>("1 sec"); - yieldExpiration = new AtomicLong(0L); - schedulingPeriod = new AtomicReference<>("0 millis"); - schedulingNanos = new AtomicLong(30000); - scheduledState = new AtomicReference<>(ScheduledState.STOPPED); - } - - @Override - public String getIdentifier() { - return id; - } - - @Override - public String getName() { - return name.get(); - } - - @Override - public void setName(final String name) { - if (this.name.get().equals(name)) { - return; - } - - final ProcessGroup parentGroup = this.processGroup.get(); - if (getConnectableType() == ConnectableType.INPUT_PORT) { - if (parentGroup.getInputPortByName(name) != null) { - throw new IllegalStateException("Cannot rename port from " + this.name.get() + " to " + name + " because the ProcessGroup already has an Input Port named " + name); - } - } else if (getConnectableType() == ConnectableType.OUTPUT_PORT) { - if (parentGroup.getOutputPortByName(name) != null) { - throw new IllegalStateException("Cannot rename port from " + this.name.get() + " to " + name + " because the ProcessGroup already has an Output Port named " + name); - } - } - - this.name.set(name); - } - - @Override - public ProcessGroup getProcessGroup() { - return processGroup.get(); - } - - @Override - public void setProcessGroup(final ProcessGroup newGroup) { - this.processGroup.set(newGroup); - } - - @Override - public String getComments() { - return comments.get(); - } - - @Override - public void setComments(final String comments) { - this.comments.set(comments); - } - - @Override - public Collection<Relationship> getRelationships() { - return relationships; - } - - @Override - public Relationship getRelationship(final String relationshipName) { - if (PORT_RELATIONSHIP.getName().equals(relationshipName)) { - return PORT_RELATIONSHIP; - } - return null; - } - - @Override - public void addConnection(final Connection connection) throws IllegalArgumentException { - writeLock.lock(); - try { - if (!requireNonNull(connection).getSource().equals(this)) { - if (connection.getDestination().equals(this)) { - // don't add the connection twice. This may occur if we have a self-loop because we will be told - // to add the connection once because we are the source and again because we are the destination. - if (!incomingConnections.contains(connection)) { - incomingConnections.add(connection); - } - - return; - } else { - throw new IllegalArgumentException("Cannot add a connection to a LocalPort for which the LocalPort is neither the Source nor the Destination"); - } - } - - for (final Relationship relationship : connection.getRelationships()) { - if (!relationship.equals(PORT_RELATIONSHIP)) { - throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Local Ports"); - } - } - - // don't add the connection twice. This may occur if we have a self-loop because we will be told - // to add the connection once because we are the source and again because we are the destination. - if (!outgoingConnections.contains(connection)) { - outgoingConnections.add(connection); - } - } finally { - writeLock.unlock(); - } - } - - @Override - public boolean hasIncomingConnection() { - readLock.lock(); - try { - return !incomingConnections.isEmpty(); - } finally { - readLock.unlock(); - } - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - final ProcessSession session = sessionFactory.createSession(); - - try { - onTrigger(context, session); - session.commit(); - } catch (final ProcessException e) { - session.rollback(); - throw e; - } catch (final Throwable t) { - session.rollback(); - throw new RuntimeException(t); - } - } - - public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; - - @Override - public void updateConnection(final Connection connection) throws IllegalStateException { - if (requireNonNull(connection).getSource().equals(this)) { - writeLock.lock(); - try { - if (!outgoingConnections.remove(connection)) { - throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); - } - outgoingConnections.add(connection); - } finally { - writeLock.unlock(); - } - } else if (connection.getDestination().equals(this)) { - writeLock.lock(); - try { - if (!incomingConnections.remove(connection)) { - throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); - } - incomingConnections.add(connection); - } finally { - writeLock.unlock(); - } - } else { - throw new IllegalStateException("The given connection is not currently registered for this Port"); - } - } - - @Override - public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException { - writeLock.lock(); - try { - if (!requireNonNull(connection).getSource().equals(this)) { - final boolean existed = incomingConnections.remove(connection); - if (!existed) { - throw new IllegalStateException("The given connection is not currently registered for this Port"); - } - return; - } - - if (!canConnectionBeRemoved(connection)) { - // TODO: Determine which processors will be broken if connection is removed, rather than just returning a boolean - throw new IllegalStateException(connection + " cannot be removed"); - } - - final boolean removed = outgoingConnections.remove(connection); - if (!removed) { - throw new IllegalStateException(connection + " is not registered with " + this); - } - } finally { - writeLock.unlock(); - } - } - - /** - * Verify that removing this connection will not prevent this Port from - * still being connected via each relationship - * - * @param connection - * @return - */ - private boolean canConnectionBeRemoved(final Connection connection) { - final Connectable source = connection.getSource(); - if (!source.isRunning()) { - // we don't have to verify that this Connectable is still connected because it's okay to make - // the source invalid since it is not running. - return true; - } - - for (final Relationship relationship : source.getRelationships()) { - if (source.isAutoTerminated(relationship)) { - continue; - } - - final Set<Connection> connectionsForRelationship = source.getConnections(relationship); - if (connectionsForRelationship == null || connectionsForRelationship.isEmpty()) { - return false; - } - } - - return true; - } - - @Override - public Set<Connection> getConnections() { - readLock.lock(); - try { - return Collections.unmodifiableSet(outgoingConnections); - } finally { - readLock.unlock(); - } - } - - @Override - public Set<Connection> getConnections(final Relationship relationship) { - readLock.lock(); - try { - if (relationship.equals(PORT_RELATIONSHIP)) { - return Collections.unmodifiableSet(outgoingConnections); - } - - throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Local Ports"); - } finally { - readLock.unlock(); - } - } - - @Override - public Position getPosition() { - return position.get(); - } - - @Override - public void setPosition(final Position position) { - this.position.set(position); - } - - @Override - public String toString() { - return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("name", getName()).append("id", getIdentifier()).toString(); - } - - @Override - public List<Connection> getIncomingConnections() { - readLock.lock(); - try { - return Collections.unmodifiableList(incomingConnections); - } finally { - readLock.unlock(); - } - } - - /** - * Indicates whether or not this Port is valid. - * - * @return - */ - @Override - public abstract boolean isValid(); - - @Override - public boolean isAutoTerminated(final Relationship relationship) { - return false; - } - - @Override - public boolean isLossTolerant() { - return lossTolerant.get(); - } - - @Override - public void setLossTolerant(boolean lossTolerant) { - this.lossTolerant.set(lossTolerant); - } - - @Override - public void setMaxConcurrentTasks(final int taskCount) { - if (taskCount < 1) { - throw new IllegalArgumentException(); - } - concurrentTaskCount.set(taskCount); - } - - /** - * @return the number of tasks that may execute concurrently for this - * processor - */ - @Override - public int getMaxConcurrentTasks() { - return concurrentTaskCount.get(); - } - - /** - * - */ - @Override - public void shutdown() { - scheduledState.set(ScheduledState.STOPPED); - } - - @Override - public void onSchedulingStart() { - scheduledState.set(ScheduledState.RUNNING); - } - - public void disable() { - final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED); - if (!updated) { - throw new IllegalStateException("Port cannot be disabled because it is not stopped"); - } - } - - public void enable() { - final boolean updated = scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED); - if (!updated) { - throw new IllegalStateException("Port cannot be enabled because it is not disabled"); - } - } - - @Override - public boolean isRunning() { - return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0; - } - - @Override - public ScheduledState getScheduledState() { - return scheduledState.get(); - } - - @Override - public ConnectableType getConnectableType() { - return type; - } - - /** - * Updates the amount of time that this processor should avoid being - * scheduled when the processor calls - * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()} - * - * @param yieldPeriod - */ - @Override - public void setYieldPeriod(final String yieldPeriod) { - final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); - if (yieldMillis < 0) { - throw new IllegalArgumentException("Yield duration must be positive"); - } - this.yieldPeriod.set(yieldPeriod); - } - - /** - * @param schedulingPeriod - */ - @Override - public void setScheduldingPeriod(final String schedulingPeriod) { - final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS); - if (schedulingNanos < 0) { - throw new IllegalArgumentException("Scheduling Period must be positive"); - } - - this.schedulingPeriod.set(schedulingPeriod); - this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); - } - - @Override - public long getPenalizationPeriod(final TimeUnit timeUnit) { - return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); - } - - @Override - public String getPenalizationPeriod() { - return penalizationPeriod.get(); - } - - /** - * Causes the processor not to be scheduled for some period of time. This - * duration can be obtained and set via the - * {@link #getYieldPeriod(TimeUnit)} and - * {@link #setYieldPeriod(long, TimeUnit)} methods. - */ - @Override - public void yield() { - final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); - yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); - } - - @Override - public long getYieldExpiration() { - return yieldExpiration.get(); - } - - @Override - public long getSchedulingPeriod(final TimeUnit timeUnit) { - return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); - } - - @Override - public String getSchedulingPeriod() { - return schedulingPeriod.get(); - } - - @Override - public void setPenalizationPeriod(final String penalizationPeriod) { - this.penalizationPeriod.set(penalizationPeriod); - } - - @Override - public String getYieldPeriod() { - return yieldPeriod.get(); - } - - @Override - public long getYieldPeriod(final TimeUnit timeUnit) { - return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); - } - - @Override - public void verifyCanDelete() throws IllegalStateException { - verifyCanDelete(false); - } - - @Override - public void verifyCanDelete(final boolean ignoreConnections) { - readLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException(this + " is running"); - } - - if (!ignoreConnections) { - for (final Connection connection : outgoingConnections) { - connection.verifyCanDelete(); - } - - for (final Connection connection : incomingConnections) { - if (connection.getSource().equals(this)) { - connection.verifyCanDelete(); - } else { - throw new IllegalStateException(this + " is the destination of another component"); - } - } - } - } finally { - readLock.unlock(); - } - } - - @Override - public void verifyCanStart() { - readLock.lock(); - try { - if (scheduledState.get() != ScheduledState.STOPPED) { - throw new IllegalStateException(this + " is not stopped"); - } - verifyNoActiveThreads(); - - final Collection<ValidationResult> validationResults = getValidationErrors(); - if (!validationResults.isEmpty()) { - throw new IllegalStateException(this + " is not in a valid state: " + validationResults.iterator().next().getExplanation()); - } - } finally { - readLock.unlock(); - } - } - - @Override - public void verifyCanStop() { - if (getScheduledState() != ScheduledState.RUNNING) { - throw new IllegalStateException(this + " is not scheduled to run"); - } - } - - @Override - public void verifyCanUpdate() { - readLock.lock(); - try { - if (isRunning()) { - throw new IllegalStateException(this + " is not stopped"); - } - } finally { - readLock.unlock(); - } - } - - @Override - public void verifyCanEnable() { - readLock.lock(); - try { - if (getScheduledState() != ScheduledState.DISABLED) { - throw new IllegalStateException(this + " is not disabled"); - } - - verifyNoActiveThreads(); - } finally { - readLock.unlock(); - } - } - - @Override - public void verifyCanDisable() { - readLock.lock(); - try { - if (getScheduledState() != ScheduledState.STOPPED) { - throw new IllegalStateException(this + " is not stopped"); - } - verifyNoActiveThreads(); - } finally { - readLock.unlock(); - } - } - - private void verifyNoActiveThreads() throws IllegalStateException { - final int threadCount = processScheduler.getActiveThreadCount(this); - if (threadCount > 0) { - throw new IllegalStateException(this + " has " + threadCount + " threads still active"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java deleted file mode 100644 index 38df6f7..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -public enum Availability { - - CLUSTER_MANAGER_ONLY, - NODE_ONLY, - BOTH; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java deleted file mode 100644 index 5b95524..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.util.Collection; -import java.util.Map; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationResult; - -public interface ConfiguredComponent { - - public String getIdentifier(); - - public String getName(); - - public void setName(String name); - - public String getAnnotationData(); - - public void setAnnotationData(String data); - - public void setProperty(String name, String value); - - /** - * Removes the property and value for the given property name if a - * descriptor and value exists for the given name. If the property is - * optional its value might be reset to default or will be removed entirely - * if was a dynamic property. - * - * @param name the property to remove - * @return true if removed; false otherwise - * @throws java.lang.IllegalArgumentException if the name is null - */ - public boolean removeProperty(String name); - - public Map<PropertyDescriptor, String> getProperties(); - - public String getProperty(final PropertyDescriptor property); - - boolean isValid(); - - /** - * Returns the any validation errors for this connectable. - * - * @return - */ - Collection<ValidationResult> getValidationErrors(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java deleted file mode 100644 index eee878e..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -/** - * Provides information about whether or not the data referenced in a Provenance - * Event can be replayed or downloaded - */ -public interface ContentAvailability { - - /** - * Returns a boolean indicating whether or not the Input content is - * available - * - * @return - */ - boolean isInputAvailable(); - - /** - * Returns a boolean indicating whether or not the Output content is - * available - * - * @return - */ - boolean isOutputAvailable(); - - /** - * Returns <code>true</code> if the Input content is the same as the Output - * content - * - * @return - */ - boolean isContentSame(); - - /** - * Returns a boolean indicating whether or not the content is replayable. If - * this returns <code>false</code>, the reason that replay is not available - * can be determined by calling {@link #getReasonNotReplayable()}. - * - * @return - */ - boolean isReplayable(); - - /** - * Returns the reason that the content cannot be replayed, or - * <code>null</code> if the content can be replayed. - * - * @return - */ - String getReasonNotReplayable(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java deleted file mode 100644 index eaa0c48..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -public interface Counter { - - void adjust(long delta); - - String getName(); - - long getValue(); - - String getContext(); - - String getIdentifier(); - - void reset(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java deleted file mode 100644 index 280f69d..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import org.apache.nifi.connectable.Connectable; - -/** - * Wraps a Connectable object and maintains a count of how many unanswered - * events have been reported for the Connectable - */ -public interface EventBasedWorker { - - Connectable getConnectable(); - - int incrementEventCount(); - - int decrementEventCount(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java deleted file mode 100644 index 1195bc9..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -public interface Heartbeater { - - void heartbeat(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java deleted file mode 100644 index 303f540..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.Funnel; -import org.apache.nifi.connectable.Port; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.OnUnscheduled; -import org.apache.nifi.scheduling.SchedulingStrategy; - -public interface ProcessScheduler { - - /** - * Shuts down the scheduler, stopping all components - */ - void shutdown(); - - /** - * Starts scheduling the given processor to run after invoking all methods - * on the underlying {@link nifi.processor.Processor FlowFileProcessor} that - * are annotated with the {@link OnScheduled} annotation. If the Processor - * is already scheduled to run, does nothing. - * - * @param procNode - * @throws IllegalStateException if the Processor is disabled - */ - void startProcessor(ProcessorNode procNode); - - /** - * Stops scheduling the given processor to run and invokes all methods on - * the underlying {@link nifi.processor.Processor FlowFileProcessor} that - * are annotated with the {@link OnUnscheduled} annotation. This does not - * interrupt any threads that are currently running within the given - * Processor. If the Processor is not scheduled to run, does nothing. - * @param procNode - */ - void stopProcessor(ProcessorNode procNode); - - /** - * Starts scheduling the given Port to run. If the Port is already scheduled - * to run, does nothing. - * - * @param port - * - * @throws IllegalStateException if the Port is disabled - */ - void startPort(Port port); - - /** - * Stops scheduling the given Port to run. This does not interrupt any - * threads that are currently running within the given port. This does not - * interrupt any threads that are currently running within the given Port. - * If the Port is not scheduled to run, does nothing. - * - * @param port - */ - void stopPort(Port port); - - /** - * Starts scheduling the given Funnel to run. If the funnel is already - * scheduled to run, does nothing. - * - * @param funnel - * - * @throws IllegalStateException if the Funnel is disabled - */ - void startFunnel(Funnel funnel); - - /** - * Stops scheduling the given Funnel to run. This does not interrupt any - * threads that are currently running within the given funnel. If the funnel - * is not scheduled to run, does nothing. - * - * @param funnel - */ - void stopFunnel(Funnel funnel); - - void enableFunnel(Funnel funnel); - - void enablePort(Port port); - - void enableProcessor(ProcessorNode procNode); - - void disableFunnel(Funnel funnel); - - void disablePort(Port port); - - void disableProcessor(ProcessorNode procNode); - - /** - * Returns the number of threads currently active for the given - * <code>Connectable</code>. - * - * @param scheduled - * @return - */ - int getActiveThreadCount(Object scheduled); - - /** - * Returns a boolean indicating whether or not the given object is scheduled - * to run - * - * @param scheduled - * @return - */ - boolean isScheduled(Object scheduled); - - /** - * Registers a relevant event for an Event-Driven worker - * - * @param worker - */ - void registerEvent(Connectable worker); - - /** - * Notifies the ProcessScheduler of how many threads are available to use - * for the given {@link SchedulingStrategy} - * - * @param strategy - * @param maxThreadCount - */ - void setMaxThreadCount(SchedulingStrategy strategy, int maxThreadCount); - - /** - * Notifies the Scheduler that it should stop scheduling the given component - * until its yield duration has expired - * - * @param procNode - */ - void yield(ProcessorNode procNode); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java deleted file mode 100644 index f6786fa..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.logging.LogLevel; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.scheduling.SchedulingStrategy; - -public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable { - - public ProcessorNode(final Processor processor, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { - super(processor, id, validationContextFactory, serviceProvider); - } - - public abstract boolean isIsolated(); - - public abstract boolean isTriggerWhenAnyDestinationAvailable(); - - @Override - public abstract boolean isSideEffectFree(); - - public abstract boolean isTriggeredSerially(); - - public abstract boolean isEventDrivenSupported(); - - public abstract boolean isHighThroughputSupported(); - - @Override - public abstract boolean isValid(); - - public abstract void setScheduledState(ScheduledState scheduledState); - - public abstract void setBulletinLevel(LogLevel bulletinLevel); - - public abstract LogLevel getBulletinLevel(); - - public abstract Processor getProcessor(); - - public abstract void yield(long period, TimeUnit timeUnit); - - public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships); - - public abstract Set<Relationship> getAutoTerminatedRelationships(); - - public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy); - - @Override - public abstract SchedulingStrategy getSchedulingStrategy(); - - public abstract void setRunDuration(long duration, TimeUnit timeUnit); - - public abstract long getRunDuration(TimeUnit timeUnit); - - public abstract Map<String, String> getStyle(); - - public abstract void setStyle(Map<String, String> style); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java deleted file mode 100644 index 6b8ede0..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.reporting.ReportingContext; -import org.apache.nifi.reporting.ReportingTask; -import org.apache.nifi.scheduling.SchedulingStrategy; - -public interface ReportingTaskNode extends ConfiguredComponent { - - Availability getAvailability(); - - void setAvailability(Availability availability); - - void setSchedulingStrategy(SchedulingStrategy schedulingStrategy); - - SchedulingStrategy getSchedulingStrategy(); - - /** - * @return a string representation of the time between each scheduling - * period - */ - String getSchedulingPeriod(); - - long getSchedulingPeriod(TimeUnit timeUnit); - - /** - * Updates how often the ReportingTask should be triggered to run - * @param schedulingPeriod - */ - void setScheduldingPeriod(String schedulingPeriod); - - ReportingTask getReportingTask(); - - ReportingContext getReportingContext(); - - ConfigurationContext getConfigurationContext(); - - boolean isRunning(); -}