http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java new file mode 100644 index 0000000..e434905 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import java.util.Map; + +public interface RepositoryStatusReport { + + void addReportEntry(FlowFileEvent entry); + + Map<String, FlowFileEvent> getReportEntries(); + + FlowFileEvent getReportEntry(String componentId); +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java new file mode 100644 index 0000000..6f9c237 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service; + +import org.apache.nifi.controller.Availability; +import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.controller.ControllerService; + +public interface ControllerServiceNode extends ConfiguredComponent { + + ControllerService getControllerService(); + + Availability getAvailability(); + + void setAvailability(Availability availability); + + boolean isDisabled(); + + void setDisabled(boolean disabled); + + ControllerServiceReference getReferences(); + + void addReference(ConfiguredComponent referringComponent); + + void removeReference(ConfiguredComponent referringComponent); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java new file mode 100644 index 0000000..35a255d --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service; + +import java.util.Map; + +import org.apache.nifi.controller.ControllerServiceLookup; + +/** + * + */ +public interface ControllerServiceProvider extends ControllerServiceLookup { + + /** + * Gets the controller service for the specified identifier. Returns null if + * the identifier does not match a known service. + * + * @param type + * @param id + * @param properties + * @return + */ + ControllerServiceNode createControllerService(String type, String id, Map<String, String> properties); + + /** + * Gets the controller service node for the specified identifier. Returns + * <code>null</code> if the identifier does not match a known service + * + * @param id + * @return + */ + ControllerServiceNode getControllerServiceNode(String id); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java new file mode 100644 index 0000000..5cb676f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service; + +import java.util.Set; + +import org.apache.nifi.controller.ConfiguredComponent; + +/** + * Provides a collection of components that are referencing a Controller Service + */ +public interface ControllerServiceReference { + + /** + * Returns the component that is being referenced + * + * @return + */ + ControllerServiceNode getReferencedComponent(); + + /** + * Returns a {@link Set} of all components that are referencing this + * Controller Service + * + * @return + */ + Set<ConfiguredComponent> getReferencingComponents(); + + /** + * Returns a {@link Set} of all Processors and Reporting Tasks that are + * referencing the Controller Service and are running, in addition to all + * + * @return + */ + Set<ConfiguredComponent> getRunningReferences(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java new file mode 100644 index 0000000..d1d5e5b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.events; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.reporting.Bulletin; + +/** + * + */ +public final class BulletinFactory { + + private static final AtomicLong currentId = new AtomicLong(0); + + public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message) { + return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), connectable.getName(), category, severity, message); + } + + public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) { + final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement()); + bulletin.setGroupId(groupId); + bulletin.setSourceId(sourceId); + bulletin.setSourceName(sourceName); + bulletin.setCategory(category); + bulletin.setLevel(severity); + bulletin.setMessage(message); + return bulletin; + } + + public static Bulletin createBulletin(final String category, final String severity, final String message) { + final Bulletin bulletin = new SystemBulletin(currentId.getAndIncrement()); + bulletin.setCategory(category); + bulletin.setLevel(severity); + bulletin.setMessage(message); + return bulletin; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java new file mode 100644 index 0000000..9846cf2 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/BulletinProcessingStrategy.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.events; + +import org.apache.nifi.reporting.Bulletin; + +/** + * + */ +public interface BulletinProcessingStrategy { + + void update(Bulletin bulletin); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java new file mode 100644 index 0000000..23c4cdb --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/ComponentBulletin.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.events; + +import org.apache.nifi.reporting.Bulletin; + +/** + * + */ +public class ComponentBulletin extends Bulletin { + + ComponentBulletin(final long id) { + super(id); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java new file mode 100644 index 0000000..f97dc46 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.events; + +import org.apache.nifi.reporting.Bulletin; + +/** + * + */ +public class SystemBulletin extends Bulletin { + + SystemBulletin(final long id) { + super(id); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java new file mode 100644 index 0000000..61be59c --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -0,0 +1,723 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.groups; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.Snippet; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.Processor; + +/** + * <p> + * ProcessGroup objects are containers for processing entities, such as + * {@link Processor}s, {@link Port}s, and other {@link ProcessGroup}s. + * </p> + * + * <p> + * MUST BE THREAD-SAFE</p> + */ +public interface ProcessGroup { + + /** + * @return a reference to this ProcessGroup's parent. This will be + * <tt>null</tt> if and only if this is the root group. + */ + ProcessGroup getParent(); + + /** + * Updates the ProcessGroup to point to a new parent + * + * @param group + */ + void setParent(ProcessGroup group); + + /** + * @return the ID of the ProcessGroup + */ + String getIdentifier(); + + /** + * @return the name of the ProcessGroup + */ + String getName(); + + /** + * Updates the name of this ProcessGroup. + * + * @param name + */ + void setName(String name); + + /** + * Updates the position of where this ProcessGroup is located in the graph + */ + void setPosition(Position position); + + /** + * Returns the position of where this ProcessGroup is located in the graph + * + * @return + */ + Position getPosition(); + + /** + * @return the user-set comments about this ProcessGroup, or + * <code>null</code> if no comments have been set + */ + String getComments(); + + /** + * Updates the comments for this ProcessGroup + * + * @param comments + */ + void setComments(String comments); + + /** + * Returns the counts for this ProcessGroup + * + * @return + */ + ProcessGroupCounts getCounts(); + + /** + * Starts all Processors, Local Ports, and Funnels that are directly within + * this group and any child ProcessGroups, except for those that are + * disabled. + */ + void startProcessing(); + + /** + * Stops all Processors, Local Ports, and Funnels that are directly within + * this group and child ProcessGroups, except for those that are disabled. + */ + void stopProcessing(); + + /** + * Starts the given Processor + * + * @param processor the processor to start + * @throws IllegalStateException if the processor is not valid, or is + * already running + */ + void enableProcessor(ProcessorNode processor); + + /** + * Starts the given Input Port + * + * @param port + */ + void enableInputPort(Port port); + + /** + * Starts the given Output Port + * + * @param port + */ + void enableOutputPort(Port port); + + /** + * Starts the given Funnel + * + * @param funnel + */ + void enableFunnel(Funnel funnel); + + /** + * Starts the given Processor + * + * @param processor the processor to start + * @throws IllegalStateException if the processor is not valid, or is + * already running + */ + void startProcessor(ProcessorNode processor); + + /** + * Starts the given Input Port + * + * @param port + */ + void startInputPort(Port port); + + /** + * Starts the given Output Port + * + * @param port + */ + void startOutputPort(Port port); + + /** + * Starts the given Funnel + * + * @param funnel + */ + void startFunnel(Funnel funnel); + + /** + * Stops the given Processor + * + * @param processor + */ + void stopProcessor(ProcessorNode processor); + + /** + * Stops the given Port + * + * @param processor + */ + void stopInputPort(Port port); + + /** + * Stops the given Port + * + * @param processor + */ + void stopOutputPort(Port port); + + /** + * Stops the given Funnel + * + * @param processor + */ + void stopFunnel(Funnel funnel); + + /** + * Starts the given Processor + * + * @param processor the processor to start + * @throws IllegalStateException if the processor is not valid, or is + * already running + */ + void disableProcessor(ProcessorNode processor); + + /** + * Starts the given Input Port + * + * @param port + */ + void disableInputPort(Port port); + + /** + * Starts the given Output Port + * + * @param port + */ + void disableOutputPort(Port port); + + /** + * Starts the given Funnel + * + * @param funnel + */ + void disableFunnel(Funnel funnel); + + /** + * Indicates that the Flow is being shutdown; allows cleanup of resources + * associated with processors, etc. + */ + void shutdown(); + + /** + * Returns a boolean indicating whether or not this ProcessGroup is the root + * group + * + * @return + */ + boolean isRootGroup(); + + /** + * Adds a {@link Port} to be used for transferring {@link FlowFile}s from + * external sources to {@link Processor}s and other {@link Port}s within + * this ProcessGroup. + * + * @param port + */ + void addInputPort(Port port); + + /** + * Removes a {@link Port} from this ProcessGroup's list of Input Ports. + * + * @param port the Port to remove + * @throws NullPointerException if <code>port</code> is null + * @throws IllegalStateException if port is not an Input Port for this + * ProcessGroup + */ + void removeInputPort(Port port); + + /** + * @return the {@link Set} of all {@link Port}s that are used by this + * ProcessGroup as Input Ports. + */ + Set<Port> getInputPorts(); + + /** + * @param id the ID of the input port + * @return the input port with the given ID, or <code>null</code> if it does + * not exist. + */ + Port getInputPort(String id); + + /** + * Adds a {@link Port} to be used for transferring {@link FlowFile}s to + * external sources. + * + * @param port the Port to add + */ + void addOutputPort(Port port); + + /** + * Removes a {@link Port} from this ProcessGroup's list of Output Ports. + * + * @param port the Port to remove + * @throws NullPointerException if <code>port</code> is null + * @throws IllegalStateException if port is not an Input Port for this + * ProcessGroup + */ + void removeOutputPort(Port port); + + /** + * @param id the ID of the output port + * @return the output port with the given ID, or <code>null</code> if it + * does not exist. + */ + Port getOutputPort(String id); + + /** + * @return the {@link Set} of all {@link Port}s that are used by this + * ProcessGroup as Output Ports. + */ + Set<Port> getOutputPorts(); + + /** + * Adds a reference to a ProgressGroup as a child of this. + * + * @return the newly created reference + */ + void addProcessGroup(ProcessGroup group); + + /** + * Returns the ProcessGroup whose parent is <code>this</code> and whose id + * is given + * + * @param id + * @return + */ + ProcessGroup getProcessGroup(String id); + + /** + * @return a {@link Set} of all Process Group References that are contained + * within this. + */ + Set<ProcessGroup> getProcessGroups(); + + /** + * @param group the group to remove + * @throws NullPointerException if <code>group</code> is null + * @throws IllegalStateException if group is not member of this + * ProcessGroup, or the given ProcessGroup is not empty (i.e., it contains + * at least one Processor, ProcessGroup, Input Port, Output Port, or Label). + */ + void removeProcessGroup(ProcessGroup group); + + /** + * Adds the already constructed processor instance to this group + * + * @param processor the processor to add + */ + void addProcessor(ProcessorNode processor); + + /** + * Removes the given processor from this group, destroying the Processor. + * The Processor is removed from the ProcessorRegistry, and any method in + * the Processor that is annotated with the + * {@link nifi.processor.annotation.OnRemoved OnRemoved} annotation will be + * invoked. All outgoing connections will also be destroyed + * + * @param processor the Processor to remove + * @throws NullPointerException if <code>processor</code> is null + * @throws IllegalStateException if <code>processor</code> is not a member + * of this ProcessGroup, is currently running, or has any incoming + * connections. + */ + void removeProcessor(ProcessorNode processor); + + /** + * @return a {@link Collection} of all FlowFileProcessors that are contained + * within this. + */ + Set<ProcessorNode> getProcessors(); + + /** + * Returns the FlowFileProcessor with the given ID. + * + * @param id the ID of the processor to retrieve + * @return the processor with the given ID + * @throws NullPointerException if <code>id</code> is null. + */ + ProcessorNode getProcessor(String id); + + /** + * Returns the <code>Connectable</code> with the given ID, or + * <code>null</code> if the <code>Connectable</code> is not a member of the + * group + * + * @param id the ID of the Connectable + * @return + */ + Connectable getConnectable(String id); + + /** + * Adds the given connection to this ProcessGroup. This method also notifies + * the Source and Destination of the Connection that the Connection has been + * established. + * + * @param connection + * @throws NullPointerException if the connection is null + * @throws IllegalStateException if the source or destination of the + * connection is not a member of this ProcessGroup or if a connection + * already exists in this ProcessGroup with the same ID + */ + void addConnection(Connection connection); + + /** + * Removes the connection from this ProcessGroup. + * + * @param connection + * @throws IllegalStateException if <code>connection</code> is not contained + * within this. + */ + void removeConnection(Connection connection); + + /** + * Inherits a Connection from another ProcessGroup; this does not perform + * any validation but simply notifies the ProcessGroup that it is now the + * owner of the given Connection. This is used in place of the + * {@link #addConnection(Connection)} method when moving Connections from + * one group to another because addConnection notifies both the Source and + * Destination of the Connection that the Connection has been established; + * this method does not notify either, as both the Source and Destination + * should already be aware of the Connection. + * + * @param connection + */ + void inheritConnection(Connection connection); + + /** + * @return the Connection with the given ID, or <code>null</code> if the + * connection does not exist. + */ + Connection getConnection(String id); + + /** + * Returns the {@link Set} of all {@link Connection}s contained within this. + * + * @return + */ + Set<Connection> getConnections(); + + /** + * Returns a List of all Connections contains within this ProcessGroup and + * any child ProcessGroups. + * + * @return + */ + List<Connection> findAllConnections(); + + /** + * Adds the given RemoteProcessGroup to this ProcessGroup + * + * @param remoteGroup + * + * @throws NullPointerException if the given argument is null + */ + void addRemoteProcessGroup(RemoteProcessGroup remoteGroup); + + /** + * Removes the given RemoteProcessGroup from this ProcessGroup + * + * @param remoteGroup + * @throws NullPointerException if the argument is null + * @throws IllegalStateException if the given argument does not belong to + * this ProcessGroup + */ + void removeRemoteProcessGroup(RemoteProcessGroup remoteGroup); + + /** + * Returns the RemoteProcessGroup that is the child of this ProcessGroup and + * has the given ID. If no RemoteProcessGroup can be found with the given + * ID, returns <code>null</code>. + * + * @param id + * @return + */ + RemoteProcessGroup getRemoteProcessGroup(String id); + + /** + * Returns a set of all RemoteProcessGroups that belong to this + * ProcessGroup. If no RemoteProcessGroup's have been added to this + * ProcessGroup, will return an empty Set. + * + * @return + */ + Set<RemoteProcessGroup> getRemoteProcessGroups(); + + /** + * Adds the given Label to this ProcessGroup + * + * @param label the label to add + * @return + * + * @throws NullPointerException if the argument is null + */ + void addLabel(Label label); + + /** + * Removes the given Label from this ProcessGroup + * + * @param label the label to remove + * @throws NullPointerException if the argument is null + * @throws IllegalStateException if the given argument does not belong to + * this ProcessGroup + */ + void removeLabel(Label label); + + /** + * Returns a set of all Labels that belong to this ProcessGroup. If no + * Labels belong to this ProcessGroup, returns an empty Set. + * + * @return + */ + Set<Label> getLabels(); + + /** + * Returns the Label that belongs to this ProcessGroup and has the given id. + * If no Label can be found with this ID, returns <code>null</code>. + * + * @param id + * @return + */ + Label getLabel(String id); + + /** + * Returns the Process Group with the given ID, if it exists as a child of + * this ProcessGroup, or is this ProcessGroup. This performs a recursive + * search of all ProcessGroups and descendant ProcessGroups + * + * @param id + * @return + */ + ProcessGroup findProcessGroup(String id); + + /** + * Returns the RemoteProcessGroup with the given ID, if it exists as a child + * or descendant of this ProcessGroup. This performs a recursive search of + * all ProcessGroups and descendant ProcessGroups + * + * @param id + * @return + */ + RemoteProcessGroup findRemoteProcessGroup(String id); + + /** + * Returns a List of all Remote Process Groups that are children or + * descendants of this ProcessGroup. This performs a recursive search of all + * descendant ProcessGroups + * + * @return + */ + List<RemoteProcessGroup> findAllRemoteProcessGroups(); + + /** + * Returns the Processor with the given ID, if it exists as a child or + * descendant of this ProcessGroup. This performs a recursive search of all + * descendant ProcessGroups + * + * @param id + * @return + */ + ProcessorNode findProcessor(String id); + + /** + * Returns a List of all Processors that are children or descendants of this + * ProcessGroup. This performs a recursive search of all descendant + * ProcessGroups + * + * @return + */ + List<ProcessorNode> findAllProcessors(); + + /** + * Returns a List of all Labels that are children or descendants of this + * ProcessGroup. This performsn a recursive search of all descendant + * ProcessGroups + * + * @return + */ + List<Label> findAllLabels(); + + /** + * Returns the input port with the given ID, if it exists; otherwise returns + * null. This performs a recursive search of all Input Ports and descendant + * ProcessGroups + * + * @param id + * @return + */ + Port findInputPort(String id); + + /** + * Returns the input port with the given name, if it exists; otherwise + * returns null. ProcessGroups + * + * @param name + * @return + */ + Port getInputPortByName(String name); + + /** + * Returns the output port with the given ID, if it exists; otherwise + * returns null. This performs a recursive search of all Output Ports and + * descendant ProcessGroups + * + * @param id + * @return + */ + Port findOutputPort(String id); + + /** + * Returns the output port with the given name, if it exists; otherwise + * returns null. + * + * @param name + * @return + */ + Port getOutputPortByName(String name); + + /** + * Adds the given funnel to this ProcessGroup + * + * @param funnel + */ + void addFunnel(Funnel funnel); + + /** + * Returns a Set of all Funnels that belong to this ProcessGroup + * + * @return + */ + Set<Funnel> getFunnels(); + + /** + * Returns the funnel with the given identifier + * + * @param id + * @return + */ + Funnel getFunnel(String id); + + /** + * Removes the given funnel from this ProcessGroup + * + * @param funnel + * + * @throws IllegalStateException if the funnel is not a member of this + * ProcessGroup or has incoming or outgoing connections + */ + void removeFunnel(Funnel funnel); + + /** + * @return <code>true</code> if this ProcessGroup has no Processors, Labels, + * Connections, ProcessGroups, RemoteProcessGroupReferences, or Ports. + * Otherwise, returns <code>false</code>. + */ + boolean isEmpty(); + + /** + * Removes all of the components whose ID's are specified within the given + * {@link Snippet} from this ProcessGroup. + * + * @param snippet + * + * @throws NullPointerException if argument is null + * @throws IllegalStateException if any ID in the snippet refers to a + * component that is not within this ProcessGroup + */ + void remove(final Snippet snippet); + + /** + * Returns the Connectable with the given ID, if it exists; otherwise + * returns null. This performs a recursive search of all ProcessGroups' + * input ports, output ports, funnels, processors, and remote process groups + * + * @param identifier + * @return + */ + Connectable findConnectable(String identifier); + + /** + * Moves all of the components whose ID's are specified within the given + * {@link Snippet} from this ProcessGroup into the given destination + * ProcessGroup + * + * @param snippet + * @param destination + * + * @throws NullPointerExcepiton if either argument is null + * @throws IllegalStateException if any ID in the snippet refers to a + * component that is not within this ProcessGroup + */ + void move(final Snippet snippet, final ProcessGroup destination); + + void verifyCanDelete(); + + void verifyCanStart(); + + void verifyCanStop(); + + /** + * Ensures that deleting the given snippet is a valid operation at this + * point in time, depending on the state of this ProcessGroup + * + * @param snippet + * + * @throws IllegalStateException if deleting the Snippet is not valid at + * this time + */ + void verifyCanDelete(Snippet snippet); + + /** + * Ensure that moving the given snippet to the given new group is a valid + * operation at this point in time, depending on the state of both + * ProcessGroups + * + * @param snippet + * @param newProcessGroup + * + * @throws IllegalStateException if the move is not valid at this time + */ + void verifyCanMove(Snippet snippet, ProcessGroup newProcessGroup); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java new file mode 100644 index 0000000..3eb594b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/ProcessGroupCounts.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.groups; + +public class ProcessGroupCounts { + + private final int inputPortCount, outputPortCount, runningCount, stoppedCount, invalidCount, disabledCount, activeRemotePortCount, inactiveRemotePortCount; + + public ProcessGroupCounts(final int inputPortCount, final int outputPortCount, final int runningCount, + final int stoppedCount, final int invalidCount, final int disabledCount, final int activeRemotePortCount, final int inactiveRemotePortCount) { + this.inputPortCount = inputPortCount; + this.outputPortCount = outputPortCount; + this.runningCount = runningCount; + this.stoppedCount = stoppedCount; + this.invalidCount = invalidCount; + this.disabledCount = disabledCount; + this.activeRemotePortCount = activeRemotePortCount; + this.inactiveRemotePortCount = inactiveRemotePortCount; + } + + public int getInputPortCount() { + return inputPortCount; + } + + public int getOutputPortCount() { + return outputPortCount; + } + + public int getRunningCount() { + return runningCount; + } + + public int getStoppedCount() { + return stoppedCount; + } + + public int getInvalidCount() { + return invalidCount; + } + + public int getDisabledCount() { + return disabledCount; + } + + public int getActiveRemotePortCount() { + return activeRemotePortCount; + } + + public int getInactiveRemotePortCount() { + return inactiveRemotePortCount; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java new file mode 100644 index 0000000..3acd1d3 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.groups; + +import java.io.IOException; +import java.net.URI; +import java.util.Date; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.exception.CommunicationsException; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.remote.protocol.CommunicationsSession; + +public interface RemoteProcessGroup { + + String getIdentifier(); + + URI getTargetUri(); + + ProcessGroup getProcessGroup(); + + void setProcessGroup(ProcessGroup group); + + void setPosition(Position position); + + Position getPosition(); + + String getComments(); + + void setComments(String comments); + + /** + * Returns the name of this RemoteProcessGroup. The value returned will + * never be null. If unable to communicate with the remote instance, the URI + * of that instance may be returned instead + * + * @return + */ + String getName(); + + void setName(String name); + + void setInputPorts(Set<RemoteProcessGroupPortDescriptor> ports); + + void setOutputPorts(Set<RemoteProcessGroupPortDescriptor> ports); + + Set<RemoteGroupPort> getInputPorts(); + + Set<RemoteGroupPort> getOutputPorts(); + + RemoteGroupPort getInputPort(String id); + + RemoteGroupPort getOutputPort(String id); + + ProcessGroupCounts getCounts(); + + void refreshFlowContents() throws CommunicationsException; + + Date getLastRefreshTime(); + + void setYieldDuration(final String yieldDuration); + + String getYieldDuration(); + + /** + * Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min") + * + * @param timePeriod + * @throws IllegalArgumentException + */ + void setCommunicationsTimeout(String timePeriod) throws IllegalArgumentException; + + /** + * Returns the communications timeout in terms of the given TimeUnit + * + * @param timeUnit + * @return + */ + int getCommunicationsTimeout(TimeUnit timeUnit); + + /** + * Returns the user-configured String representation of the communications + * timeout + * + * @return + */ + String getCommunicationsTimeout(); + + /** + * @return the port that the remote instance is listening on for + * site-to-site communication, or <code>null</code> if the remote instance + * is not configured to allow site-to-site communications. + * + * @throws IOException if unable to communicate with the remote instance + */ + Integer getListeningPort() throws IOException; + + /** + * Indicates whether or not the RemoteProcessGroup is currently scheduled to + * transmit data + * + * @return + */ + boolean isTransmitting(); + + /** + * Initiates communications between this instance and the remote instance. + */ + void startTransmitting(); + + /** + * Immediately terminates communications between this instance and the + * remote instance. + */ + void stopTransmitting(); + + /** + * Initiates communications between this instance and the remote instance + * only for the port specified. + * + * @param port + */ + void startTransmitting(RemoteGroupPort port); + + /** + * Immediately terminates communications between this instance and the + * remote instance only for the port specified. + * + * @param port + */ + void stopTransmitting(RemoteGroupPort port); + + /** + * Indicates whether or not communications with this RemoteProcessGroup will + * be secure (2-way authentication) + * + * @return + */ + boolean isSecure() throws CommunicationsException; + + /** + * Indicates whether or not communications with this RemoteProcessGroup will + * be secure (2-way authentication). Returns null if unknown. + * + * @return + */ + Boolean getSecureFlag(); + + /** + * Returns true if the target system has site to site enabled. Returns false + * otherwise (they don't or they have not yet responded). + * + * @return + */ + boolean isSiteToSiteEnabled(); + + /** + * Returns a String indicating why we are not authorized to communicate with + * the remote instance, or <code>null</code> if we are authorized + * + * @return + */ + String getAuthorizationIssue(); + + /** + * Returns the {@link EventReporter} that can be used to report any notable + * events + * + * @return + */ + EventReporter getEventReporter(); + + /** + * Initiates a task in the remote process group to re-initialize, as a + * result of clustering changes + * + * @param isClustered whether or not this instance is now clustered + */ + void reinitialize(boolean isClustered); + + /** + * Removes all non existent ports from this RemoteProcessGroup. + */ + void removeAllNonExistentPorts(); + + /** + * Removes a port that no longer exists on the remote instance from this + * RemoteProcessGroup + * + * @param port + */ + void removeNonExistentPort(final RemoteGroupPort port); + + /** + * + * @return @throws IOException + */ + CommunicationsSession establishSiteToSiteConnection() throws IOException; + + /** + * Called whenever RemoteProcessGroup is removed from the flow, so that any + * resources can be cleaned up appropriately. + */ + void onRemove(); + + void verifyCanDelete(); + + void verifyCanDelete(boolean ignoreConnections); + + void verifyCanStartTransmitting(); + + void verifyCanStopTransmitting(); + + void verifyCanUpdate(); + + /** + * Returns a set of PeerStatus objects that describe the different peers + * that we can communicate with for this RemoteProcessGroup. + * + * If the destination is a cluster, this set will contain PeerStatuses for + * each of the nodes in the cluster. + * + * If the destination is a standalone instance, this set will contain just a + * PeerStatus for the destination. + * + * Once the PeerStatuses have been obtained, they may be cached by this + * RemoteProcessGroup for some amount of time. + * + * If unable to obtain the PeerStatuses or no peer status has yet been + * obtained, will return null. + * + * @return + */ + Set<PeerStatus> getPeerStatuses(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java new file mode 100644 index 0000000..fb4f6e0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.groups; + +public interface RemoteProcessGroupPortDescriptor { + + /** + * The comments as configured in the target port. + * + * @return + */ + String getComments(); + + /** + * The number tasks that may transmit flow files to the target port + * concurrently. + * + * @return + */ + Integer getConcurrentlySchedulableTaskCount(); + + /** + * The id of the target port. + * + * @return + */ + String getId(); + + /** + * The id of the remote process group that this port resides in. + * + * @return + */ + String getGroupId(); + + /** + * The name of the target port. + * + * @return + */ + String getName(); + + /** + * Whether or not this remote group port is configured for transmission. + * + * @return + */ + Boolean isTransmitting(); + + /** + * Whether or not flow file are compressed when sent to this target port. + * + * @return + */ + Boolean getUseCompression(); + + /** + * Whether ot not the target port exists. + * + * @return + */ + Boolean getExists(); + + /** + * Whether or not the target port is running. + * + * @return + */ + Boolean isTargetRunning(); + + /** + * Whether or not this port has either an incoming or outgoing connection. + * + * @return + */ + Boolean isConnected(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java new file mode 100644 index 0000000..27cc6c5 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogMessage.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.logging; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.sql.Date; +import java.text.DateFormat; +import java.text.SimpleDateFormat; + +public class LogMessage { + + private final String message; + private final LogLevel level; + private final Throwable throwable; + private final long time; + + public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; + public static final String TO_STRING_FORMAT = "%1$s %2$s - %3$s"; + + public LogMessage(final long millisSinceEpoch, final LogLevel level, final String message, final Throwable throwable) { + this.level = level; + this.throwable = throwable; + this.message = message; + this.time = millisSinceEpoch; + } + + public long getMillisSinceEpoch() { + return time; + } + + public String getMessage() { + return message; + } + + public LogLevel getLevel() { + return level; + } + + public Throwable getThrowable() { + return throwable; + } + + @Override + public String toString() { + final DateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT); + final String formattedTime = dateFormat.format(new Date(time)); + + String formattedMsg = String.format(TO_STRING_FORMAT, formattedTime, level.toString(), message); + if (throwable != null) { + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw); + throwable.printStackTrace(pw); + formattedMsg += "\n" + sw.toString(); + } + + return formattedMsg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java new file mode 100644 index 0000000..a75f8ea --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogObserver.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.logging; + +public interface LogObserver { + + void onLogMessage(LogMessage message); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java new file mode 100644 index 0000000..4a017ce --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepository.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.logging; + +public interface LogRepository { + + void addLogMessage(LogLevel level, String message); + + void addLogMessage(LogLevel level, String message, Throwable t); + + void addLogMessage(LogLevel level, String messageFormat, Object[] params); + + void addLogMessage(LogLevel level, String messageFormat, Object[] params, Throwable t); + + /** + * Registers an observer so that it will be notified of all Log Messages + * whose levels are at least equal to the given level. + * + * @param observerIdentifier + * @param level + * @param observer + */ + void addObserver(String observerIdentifier, LogLevel level, LogObserver observer); + + /** + * Sets the observation level of the specified observer. + * + * @param observerIdentifier + * @param level + */ + void setObservationLevel(String observerIdentifier, LogLevel level); + + /** + * Gets the observation level for the specified observer. + * + * @param observerIdentifier + * @return + */ + LogLevel getObservationLevel(String observerIdentifier); + + /** + * Removes the given LogObserver from this Repository. + * + * @param observerIdentifier + * @return + */ + LogObserver removeObserver(String observerIdentifier); + + /** + * Removes all LogObservers from this Repository + */ + void removeAllObservers(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java new file mode 100644 index 0000000..76ca661 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.logging; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.slf4j.LoggerFactory; + +@SuppressWarnings("unchecked") +public class LogRepositoryFactory { + + public static final String LOG_REPOSITORY_CLASS_NAME = "org.apache.nifi.logging.repository.StandardLogRepository"; + + private static final ConcurrentMap<String, LogRepository> repositoryMap = new ConcurrentHashMap<>(); + private static final Class<LogRepository> logRepositoryClass; + + static { + Class<LogRepository> clazz = null; + try { + clazz = (Class<LogRepository>) Class.forName(LOG_REPOSITORY_CLASS_NAME, true, LogRepositoryFactory.class.getClassLoader()); + } catch (ClassNotFoundException e) { + LoggerFactory.getLogger(LogRepositoryFactory.class).error("Unable to find class {}; logging may not work properly", LOG_REPOSITORY_CLASS_NAME); + } + logRepositoryClass = clazz; + } + + public static LogRepository getRepository(final String processorId) { + LogRepository repository = repositoryMap.get(requireNonNull(processorId)); + if (repository == null) { + try { + repository = logRepositoryClass.newInstance(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + + final LogRepository oldRepository = repositoryMap.putIfAbsent(processorId, repository); + if (oldRepository != null) { + repository = oldRepository; + } + } + + return repository; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java new file mode 100644 index 0000000..b25c90b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarCloseable.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import java.io.Closeable; + +/** + * + */ +public class NarCloseable implements Closeable { + + public static NarCloseable withNarLoader() { + final ClassLoader current = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(NarThreadContextClassLoader.getInstance()); + return new NarCloseable(current); + } + + private final ClassLoader toSet; + + private NarCloseable(final ClassLoader toSet) { + this.toSet = toSet; + } + + @Override + public void close() { + if (toSet != null) { + Thread.currentThread().setContextClassLoader(toSet); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java new file mode 100644 index 0000000..aa905a8 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; + +import org.apache.nifi.authorization.AuthorityProvider; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.reporting.ReportingTask; + +/** + * + * @author none THREAD SAFE + */ +public class NarThreadContextClassLoader extends URLClassLoader { + + static final ContextSecurityManager contextSecurityManager = new ContextSecurityManager(); + private final ClassLoader forward = ClassLoader.getSystemClassLoader(); + private static final List<Class<?>> narSpecificClasses = new ArrayList<>(); + + static { + narSpecificClasses.add(Processor.class); + narSpecificClasses.add(FlowFilePrioritizer.class); + narSpecificClasses.add(ReportingTask.class); + narSpecificClasses.add(Validator.class); + narSpecificClasses.add(InputStreamCallback.class); + narSpecificClasses.add(OutputStreamCallback.class); + narSpecificClasses.add(StreamCallback.class); + narSpecificClasses.add(ControllerService.class); + narSpecificClasses.add(AuthorityProvider.class); + narSpecificClasses.add(ProvenanceEventRepository.class); + narSpecificClasses.add(ComponentStatusRepository.class); + narSpecificClasses.add(FlowFileRepository.class); + narSpecificClasses.add(FlowFileSwapManager.class); + narSpecificClasses.add(ContentRepository.class); + } + + private NarThreadContextClassLoader() { + super(new URL[0]); + } + + @Override + public void clearAssertionStatus() { + lookupClassLoader().clearAssertionStatus(); + } + + @Override + public URL getResource(String name) { + return lookupClassLoader().getResource(name); + } + + @Override + public InputStream getResourceAsStream(String name) { + return lookupClassLoader().getResourceAsStream(name); + } + + @Override + public Enumeration<URL> getResources(String name) throws IOException { + return lookupClassLoader().getResources(name); + } + + @Override + public Class<?> loadClass(String name) throws ClassNotFoundException { + return lookupClassLoader().loadClass(name); + } + + @Override + public void setClassAssertionStatus(String className, boolean enabled) { + lookupClassLoader().setClassAssertionStatus(className, enabled); + } + + @Override + public void setDefaultAssertionStatus(boolean enabled) { + lookupClassLoader().setDefaultAssertionStatus(enabled); + } + + @Override + public void setPackageAssertionStatus(String packageName, boolean enabled) { + lookupClassLoader().setPackageAssertionStatus(packageName, enabled); + } + + private ClassLoader lookupClassLoader() { + final Class<?>[] classStack = contextSecurityManager.getExecutionStack(); + + for (Class<?> currentClass : classStack) { + final Class<?> narClass = findNarClass(currentClass); + if (narClass != null) { + final ClassLoader desiredClassLoader = narClass.getClassLoader(); + + // When new Threads are created, the new Thread inherits the ClassLoaderContext of + // the caller. However, the call stack of that new Thread may not trace back to any NiFi-specific + // code. Therefore, the NarThreadContextClassLoader will be unable to find the appropriate NAR + // ClassLoader. As a result, we want to set the ContextClassLoader to the NAR ClassLoader that + // contains the class or resource that we are looking for. + // This locks the current Thread into the appropriate NAR ClassLoader Context. The framework will change + // the ContextClassLoader back to the NarThreadContextClassLoader as appropriate via the + // {@link FlowEngine.beforeExecute(Thread, Runnable)} and + // {@link FlowEngine.afterExecute(Thread, Runnable)} methods. + if (desiredClassLoader instanceof NarClassLoader) { + Thread.currentThread().setContextClassLoader(desiredClassLoader); + } + return desiredClassLoader; + } + } + return forward; + } + + private Class<?> findNarClass(final Class<?> cls) { + for (final Class<?> narClass : narSpecificClasses) { + if (narClass.isAssignableFrom(cls)) { + return cls; + } else if (cls.getEnclosingClass() != null) { + return findNarClass(cls.getEnclosingClass()); + } + } + + return null; + } + + private static class SingletonHolder { + + public static final NarThreadContextClassLoader instance = new NarThreadContextClassLoader(); + } + + public static NarThreadContextClassLoader getInstance() { + return SingletonHolder.instance; + } + + static class ContextSecurityManager extends SecurityManager { + + Class<?>[] getExecutionStack() { + return getClassContext(); + } + } + + public static <T> T createInstance(final String implementationClassName, final Class<T> typeDefinition) throws InstantiationException, IllegalAccessException, ClassNotFoundException { + final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(NarThreadContextClassLoader.getInstance()); + try { + final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(implementationClassName); + final Class<?> rawClass; + if (detectedClassLoaderForType == null) { + // try to find from the current class loader + rawClass = Class.forName(implementationClassName); + } else { + // try to find from the registered classloader for that type + rawClass = Class.forName(implementationClassName, true, ExtensionManager.getClassLoader(implementationClassName)); + } + + Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); + final Class<?> desiredClass = rawClass.asSubclass(typeDefinition); + return typeDefinition.cast(desiredClass.newInstance()); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java new file mode 100644 index 0000000..2422fe1 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.io.IOException; +import java.net.URI; + +import org.apache.nifi.remote.protocol.CommunicationsSession; + +public class Peer { + + private final CommunicationsSession commsSession; + private final String url; + private final String host; + private long penalizationExpiration = 0L; + private boolean closed = false; + + public Peer(final CommunicationsSession commsSession, final String url) { + this.commsSession = commsSession; + this.url = url; + + try { + this.host = new URI(url).getHost(); + } catch (final Exception e) { + throw new IllegalArgumentException("Invalid URL: " + url); + } + } + + public String getUrl() { + return url; + } + + public CommunicationsSession getCommunicationsSession() { + return commsSession; + } + + public void close() throws IOException { + this.closed = true; + + // TODO: Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer + commsSession.close(); + } + + public void penalize(final long millis) { + penalizationExpiration = Math.max(penalizationExpiration, System.currentTimeMillis() + millis); + } + + public boolean isPenalized() { + return penalizationExpiration > System.currentTimeMillis(); + } + + public boolean isClosed() { + return closed; + } + + public String getHost() { + return host; + } + + @Override + public int hashCode() { + return 8320 + url.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (!(obj instanceof Peer)) { + return false; + } + + final Peer other = (Peer) obj; + return this.url.equals(other.url); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("Peer[url=").append(url); + if (closed) { + sb.append(",CLOSED"); + } else if (isPenalized()) { + sb.append(",PENALIZED"); + } + sb.append("]"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java new file mode 100644 index 0000000..d1cb076 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public class PeerStatus { + + private final String hostname; + private final int port; + private final boolean secure; + private final int numFlowFiles; + + public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) { + this.hostname = hostname; + this.port = port; + this.secure = secure; + this.numFlowFiles = numFlowFiles; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + public boolean isSecure() { + return secure; + } + + public int getFlowFileCount() { + return numFlowFiles; + } + + @Override + public String toString() { + return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]"; + } + + @Override + public int hashCode() { + return 9824372 + hostname.hashCode() + port; + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + + if (!(obj instanceof PeerStatus)) { + return false; + } + + final PeerStatus other = (PeerStatus) obj; + return port == other.port && hostname.equals(other.hostname); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java new file mode 100644 index 0000000..8f2603a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public interface PortAuthorizationResult { + + boolean isAuthorized(); + + String getExplanation(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java new file mode 100644 index 0000000..12a3d33 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +/** + * + */ +public enum RemoteAuthorizationState { + + UNKNOWN, + UNAUTHORIZED, + AUTHORIZED; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java new file mode 100644 index 0000000..d4ad374 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import org.apache.nifi.connectable.Port; +import org.apache.nifi.groups.RemoteProcessGroup; + +public interface RemoteGroupPort extends Port { + + RemoteProcessGroup getRemoteProcessGroup(); + + TransferDirection getTransferDirection(); + + boolean isUseCompression(); + + void setUseCompression(boolean useCompression); + + boolean getTargetExists(); + + boolean isTargetRunning(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java new file mode 100644 index 0000000..4afdfb7 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.connectable.Port; +import org.apache.nifi.remote.exception.BadRequestException; +import org.apache.nifi.remote.exception.NotAuthorizedException; +import org.apache.nifi.remote.exception.RequestExpiredException; +import org.apache.nifi.remote.protocol.ServerProtocol; + +public interface RootGroupPort extends Port { + + boolean isTransmitting(); + + void setGroupAccessControl(Set<String> groups); + + Set<String> getGroupAccessControl(); + + void setUserAccessControl(Set<String> users); + + Set<String> getUserAccessControl(); + + /** + * Verifies that the specified user is authorized to interact with this port + * and returns a {@link PortAuthorizationResult} indicating why the user is + * unauthorized if this assumption fails + * + * @param dn + * @return + */ + PortAuthorizationResult checkUserAuthorization(String dn); + + /** + * Receives data from the given stream + * + * @param peer + * @param serverProtocol + * @param requestHeaders + * + * @return the number of FlowFiles received + * @throws org.apache.nifi.remote.exception.NotAuthorizedException + * @throws org.apache.nifi.remote.exception.BadRequestException + * @throws org.apache.nifi.remote.exception.RequestExpiredException + */ + int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException; + + /** + * Transfers data to the given stream + * + * @param peer + * @param requestHeaders + * @param serverProtocol + * + * @return the number of FlowFiles transferred + * @throws org.apache.nifi.remote.exception.NotAuthorizedException + * @throws org.apache.nifi.remote.exception.BadRequestException + * @throws org.apache.nifi.remote.exception.RequestExpiredException + */ + int transferFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException; + +}