Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-250 2d4aebf33 -> 883c4ac9d


NIFI-250: Creating ReportingTaskProvider


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/83eff8d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/83eff8d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/83eff8d6

Branch: refs/heads/NIFI-250
Commit: 83eff8d6df7b485a4d0eb03f176e4a28ff81df3b
Parents: a227fe4
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Mar 4 15:50:30 2015 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Mar 4 15:50:30 2015 -0500

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java | 27 +++++++--
 .../reporting/ReportingTaskProvider.java        | 61 ++++++++++++++++++++
 .../apache/nifi/controller/FlowController.java  |  4 +-
 3 files changed, 86 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/83eff8d6/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index f4c86f6..f3dd0a0 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -41,7 +41,9 @@ import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -66,6 +68,7 @@ import javax.xml.transform.stream.StreamResult;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.ClusterNodeInformation;
@@ -128,8 +131,10 @@ import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.StandardFlowSerializer;
 import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
 import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
 import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.reporting.ReportingTaskProvider;
 import 
org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
 import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
@@ -233,7 +238,7 @@ import com.sun.jersey.api.client.ClientResponse;
  *
  * @author unattributed
  */
-public class WebClusterManager implements HttpClusterManager, ProtocolHandler, 
ControllerServiceProvider {
+public class WebClusterManager implements HttpClusterManager, ProtocolHandler, 
ControllerServiceProvider, ReportingTaskProvider {
 
     public static final String ROOT_GROUP_ID_ALIAS = "root";
     public static final String BULLETIN_CATEGORY = "Clustering";
@@ -315,7 +320,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     private final ClusterManagerLock writeLock = new 
ClusterManagerLock(resourceRWLock.writeLock(), "Write");
 
     private final Set<Node> nodes = new HashSet<>();
-    private final Map<String, ReportingTaskNode> reportingTasks = new 
HashMap<>();
+    private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = 
new ConcurrentHashMap<>();
 
     // null means the dataflow should be read from disk
     private StandardDataFlow cachedDataFlow = null;
@@ -471,7 +476,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                 // Load and start running Reporting Tasks
                 final byte[] serializedReportingTasks = 
clusterDataFlow.getReportingTasks();
                 if ( serializedReportingTasks != null && 
serializedReportingTasks.length > 0 ) {
-                       
reportingTasks.putAll(loadReportingTasks(serializedReportingTasks));
+                       loadReportingTasks(serializedReportingTasks);
                 }
             } catch (final IOException ioe) {
                 logger.warn("Failed to initialize cluster services due to: " + 
ioe, ioe);
@@ -931,7 +936,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                 //set the class to be used for the configured reporting task
                 final ReportingTaskNode reportingTaskNode;
                 try {
-                    reportingTaskNode = createReportingTask(taskClass, taskId);
+                    reportingTaskNode = createReportingTask(taskClass, taskId, 
false);
                 } catch (final ReportingTaskInstantiationException e) {
                     logger.error("Unable to load reporting task {} due to {}", 
new Object[]{taskId, e});
                     if (logger.isDebugEnabled()) {
@@ -973,7 +978,9 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return tasks;
     }
 
-    private ReportingTaskNode createReportingTask(final String type, final 
String id) throws ReportingTaskInstantiationException {
+    
+    @Override
+    public ReportingTaskNode createReportingTask(final String type, final 
String id, final boolean firstTimeAdded) throws 
ReportingTaskInstantiationException {
         if (type == null) {
             throw new NullPointerException();
         }
@@ -1003,6 +1010,16 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(this);
         final ReportingTaskNode taskNode = new 
ClusteredReportingTaskNode(task, id, processScheduler,
                 new ClusteredEventAccess(this), bulletinRepository, 
controllerServiceProvider, validationContextFactory);
+        
+        reportingTasks.put(id, taskNode);
+        if ( firstTimeAdded ) {
+            try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
task);
+            } catch (final Exception e) {
+                throw new ProcessorLifeCycleException("Failed to invoke 
On-Added Lifecycle methods of " + task, e);
+            }
+        }
+        
         return taskNode;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/83eff8d6/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
new file mode 100644
index 0000000..c58115f
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.reporting;
+
+import org.apache.nifi.controller.ReportingTaskNode;
+
+/**
+ * A ReportingTaskProvider is responsible for providing management of, and 
access to, Reporting Tasks
+ */
+public interface ReportingTaskProvider {
+
+    /**
+     * Creates a new instance of a reporting task
+     * 
+     * @param type the type (fully qualified class name) of the reporting task 
to instantiate
+     * @param id the identifier for the Reporting Task
+     * @param firstTimeAdded whether or not this is the first time that the 
reporting task is being added
+     * to the flow. I.e., this will be true only when the user adds the 
reporting task to the flow, not when
+     * the flow is being restored after a restart of the software
+     * 
+     * @return the ReportingTaskNode that is used to manage the reporting task
+     * 
+     * @throws ReportingTaskInstantiationException if unable to create the 
Reporting Task
+     */
+    ReportingTaskNode createReportingTask(String type, String id, boolean 
firstTimeAdded) throws ReportingTaskInstantiationException;
+    
+    /**
+     * Returns the reporting task that has the given identifier, or 
<code>null</code> if no reporting task
+     * exists with that ID.
+     * 
+     * @param identifier
+     * @return
+     */
+    ReportingTaskNode getReportingTaskNode(String identifier);
+    
+    
+    /**
+     * Removes the given reporting task from the flow
+     * 
+     * @param reportingTask
+     * 
+     * @throws IllegalStateException if the reporting task cannot be removed 
because it is not stopped, or
+     * if the reporting task is not known in the flow
+     */
+    void removeReportingTask(ReportingTaskNode reportingTask);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/83eff8d6/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 0a86145..f6e9aae 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -78,6 +78,7 @@ import 
org.apache.nifi.controller.exception.ProcessorLifeCycleException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.label.StandardLabel;
 import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.reporting.ReportingTaskProvider;
 import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.CounterRepository;
@@ -190,7 +191,7 @@ import org.slf4j.LoggerFactory;
 
 import com.sun.jersey.api.client.ClientHandlerException;
 
-public class FlowController implements EventAccess, ControllerServiceProvider, 
Heartbeater, QueueProvider {
+public class FlowController implements EventAccess, ControllerServiceProvider, 
ReportingTaskProvider, Heartbeater, QueueProvider {
 
     // default repository implementations
     public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = 
"org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
@@ -2472,6 +2473,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, H
        return createReportingTask(type, UUID.randomUUID().toString(), 
firstTimeAdded);
     }
     
+    @Override
     public ReportingTaskNode createReportingTask(final String type, final 
String id, final boolean firstTimeAdded) throws 
ReportingTaskInstantiationException {
         if (type == null || id == null) {
             throw new NullPointerException();

Reply via email to