Repository: incubator-nifi Updated Branches: refs/heads/NIFI-250 2d4aebf33 -> 883c4ac9d
NIFI-250: Creating ReportingTaskProvider Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/83eff8d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/83eff8d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/83eff8d6 Branch: refs/heads/NIFI-250 Commit: 83eff8d6df7b485a4d0eb03f176e4a28ff81df3b Parents: a227fe4 Author: Mark Payne <marka...@hotmail.com> Authored: Wed Mar 4 15:50:30 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Mar 4 15:50:30 2015 -0500 ---------------------------------------------------------------------- .../cluster/manager/impl/WebClusterManager.java | 27 +++++++-- .../reporting/ReportingTaskProvider.java | 61 ++++++++++++++++++++ .../apache/nifi/controller/FlowController.java | 4 +- 3 files changed, 86 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/83eff8d6/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index f4c86f6..f3dd0a0 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -41,7 +41,9 @@ import java.util.TimerTask; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -66,6 +68,7 @@ import javax.xml.transform.stream.StreamResult; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.admin.service.AuditService; +import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.cluster.BulletinsPayload; import org.apache.nifi.cluster.ClusterNodeInformation; @@ -128,8 +131,10 @@ import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.StandardFlowSerializer; import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.exception.ProcessorLifeCycleException; import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; +import org.apache.nifi.controller.reporting.ReportingTaskProvider; import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; @@ -233,7 +238,7 @@ import com.sun.jersey.api.client.ClientResponse; * * @author unattributed */ -public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider { +public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider, ReportingTaskProvider { public static final String ROOT_GROUP_ID_ALIAS = "root"; public static final String BULLETIN_CATEGORY = "Clustering"; @@ -315,7 +320,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write"); private final Set<Node> nodes = new HashSet<>(); - private final Map<String, ReportingTaskNode> reportingTasks = new HashMap<>(); + private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>(); // null means the dataflow should be read from disk private StandardDataFlow cachedDataFlow = null; @@ -471,7 +476,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // Load and start running Reporting Tasks final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks(); if ( serializedReportingTasks != null && serializedReportingTasks.length > 0 ) { - reportingTasks.putAll(loadReportingTasks(serializedReportingTasks)); + loadReportingTasks(serializedReportingTasks); } } catch (final IOException ioe) { logger.warn("Failed to initialize cluster services due to: " + ioe, ioe); @@ -931,7 +936,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C //set the class to be used for the configured reporting task final ReportingTaskNode reportingTaskNode; try { - reportingTaskNode = createReportingTask(taskClass, taskId); + reportingTaskNode = createReportingTask(taskClass, taskId, false); } catch (final ReportingTaskInstantiationException e) { logger.error("Unable to load reporting task {} due to {}", new Object[]{taskId, e}); if (logger.isDebugEnabled()) { @@ -973,7 +978,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return tasks; } - private ReportingTaskNode createReportingTask(final String type, final String id) throws ReportingTaskInstantiationException { + + @Override + public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException { if (type == null) { throw new NullPointerException(); } @@ -1003,6 +1010,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this); final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler, new ClusteredEventAccess(this), bulletinRepository, controllerServiceProvider, validationContextFactory); + + reportingTasks.put(id, taskNode); + if ( firstTimeAdded ) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task); + } catch (final Exception e) { + throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e); + } + } + return taskNode; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/83eff8d6/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java new file mode 100644 index 0000000..c58115f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.reporting; + +import org.apache.nifi.controller.ReportingTaskNode; + +/** + * A ReportingTaskProvider is responsible for providing management of, and access to, Reporting Tasks + */ +public interface ReportingTaskProvider { + + /** + * Creates a new instance of a reporting task + * + * @param type the type (fully qualified class name) of the reporting task to instantiate + * @param id the identifier for the Reporting Task + * @param firstTimeAdded whether or not this is the first time that the reporting task is being added + * to the flow. I.e., this will be true only when the user adds the reporting task to the flow, not when + * the flow is being restored after a restart of the software + * + * @return the ReportingTaskNode that is used to manage the reporting task + * + * @throws ReportingTaskInstantiationException if unable to create the Reporting Task + */ + ReportingTaskNode createReportingTask(String type, String id, boolean firstTimeAdded) throws ReportingTaskInstantiationException; + + /** + * Returns the reporting task that has the given identifier, or <code>null</code> if no reporting task + * exists with that ID. + * + * @param identifier + * @return + */ + ReportingTaskNode getReportingTaskNode(String identifier); + + + /** + * Removes the given reporting task from the flow + * + * @param reportingTask + * + * @throws IllegalStateException if the reporting task cannot be removed because it is not stopped, or + * if the reporting task is not known in the flow + */ + void removeReportingTask(ReportingTaskNode reportingTask); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/83eff8d6/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 0a86145..f6e9aae 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -78,6 +78,7 @@ import org.apache.nifi.controller.exception.ProcessorLifeCycleException; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.label.StandardLabel; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; +import org.apache.nifi.controller.reporting.ReportingTaskProvider; import org.apache.nifi.controller.reporting.StandardReportingTaskNode; import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.CounterRepository; @@ -190,7 +191,7 @@ import org.slf4j.LoggerFactory; import com.sun.jersey.api.client.ClientHandlerException; -public class FlowController implements EventAccess, ControllerServiceProvider, Heartbeater, QueueProvider { +public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, Heartbeater, QueueProvider { // default repository implementations public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository"; @@ -2472,6 +2473,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded); } + @Override public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException { if (type == null || id == null) { throw new NullPointerException();