Repository: incubator-nifi Updated Branches: refs/heads/NIFI-250 85e38dc05 -> fe09680de
NIFI-250: Added missing methods to ReportingTaskProvider Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/40035429 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/40035429 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/40035429 Branch: refs/heads/NIFI-250 Commit: 40035429f886a501312b25b14f94cab38ed0c97f Parents: 74d45ae Author: Mark Payne <marka...@hotmail.com> Authored: Wed Mar 11 15:47:58 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Mar 11 15:47:58 2015 -0400 ---------------------------------------------------------------------- .../cluster/manager/impl/WebClusterManager.java | 17 +++++++++ .../reporting/ReportingTaskProvider.java | 36 ++++++++++++++++++++ .../apache/nifi/controller/FlowController.java | 6 ++++ .../scheduling/StandardProcessScheduler.java | 4 ++- 4 files changed, 62 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/40035429/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index b5f3647..e26f388 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -1493,6 +1493,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } + @Override public ReportingTaskNode getReportingTaskNode(final String taskId) { readLock.lock(); try { @@ -1502,17 +1503,20 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } + @Override public void startReportingTask(final ReportingTaskNode reportingTaskNode) { reportingTaskNode.verifyCanStart(); processScheduler.schedule(reportingTaskNode); } + @Override public void stopReportingTask(final ReportingTaskNode reportingTaskNode) { reportingTaskNode.verifyCanStop(); processScheduler.unschedule(reportingTaskNode); } + @Override public void removeReportingTask(final ReportingTaskNode reportingTaskNode) { writeLock.lock(); try { @@ -1547,6 +1551,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } + @Override + public void disableReportingTask(final ReportingTaskNode reportingTask) { + reportingTask.verifyCanDisable(); + processScheduler.disableReportingTask(reportingTask); + } + + @Override + public void enableReportingTask(final ReportingTaskNode reportingTask) { + reportingTask.verifyCanEnable(); + processScheduler.enableReportingTask(reportingTask); + } + + /** * Handle a bulletins message. * http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/40035429/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java index ddd9d1a..bb6f3f7 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java @@ -64,4 +64,40 @@ public interface ReportingTaskProvider { */ void removeReportingTask(ReportingTaskNode reportingTask); + /** + * Begins scheduling the reporting task to run and invokes appropriate lifecycle methods + * @param reportingTask + * + * @throws IllegalStateException if the ReportingTask's state is not STOPPED, or if the Reporting Task has active + * threads, or if the ReportingTask is not valid + */ + void startReportingTask(ReportingTaskNode reportingTask); + + /** + * Stops scheduling the reporting task to run and invokes appropriate lifecycle methods + * @param reportingTask + * + * @throws IllegalStateException if the ReportingTask's state is not RUNNING + */ + void stopReportingTask(ReportingTaskNode reportingTask); + + + /** + * Enables the reporting task to be scheduled to run + * @param reportingTask + * + * @throws IllegalStateException if the ReportingTask's state is not DISABLED + */ + void enableReportingTask(ReportingTaskNode reportingTask); + + + /** + * Disables the ability to schedul the reporting task to run + * + * @param reportingTask + * + * @throws IllegalStateException if the ReportingTask's state is not STOPPED, or if the Reporting Task has active + * threads + */ + void disableReportingTask(ReportingTaskNode reportingTask); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/40035429/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 6173014..0cee4bd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2517,10 +2517,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return taskNode; } + @Override public ReportingTaskNode getReportingTaskNode(final String taskId) { return reportingTasks.get(taskId); } + @Override public void startReportingTask(final ReportingTaskNode reportingTaskNode) { if (isTerminated()) { throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode + " because the controller is terminated"); @@ -2531,6 +2533,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } + @Override public void stopReportingTask(final ReportingTaskNode reportingTaskNode) { if (isTerminated()) { return; @@ -2540,6 +2543,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R processScheduler.unschedule(reportingTaskNode); } + @Override public void removeReportingTask(final ReportingTaskNode reportingTaskNode) { final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier()); if ( existing == null || existing != reportingTaskNode ) { @@ -2578,11 +2582,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return controllerServiceProvider.createControllerService(type, id, firstTimeAdded); } + @Override public void enableReportingTask(final ReportingTaskNode reportingTaskNode) { reportingTaskNode.verifyCanEnable(); processScheduler.enableReportingTask(reportingTaskNode); } + @Override public void disableReportingTask(final ReportingTaskNode reportingTaskNode) { reportingTaskNode.verifyCanDisable(); processScheduler.disableReportingTask(reportingTaskNode); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/40035429/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 6a7b419..087ec68 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -201,6 +201,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { }; componentLifeCycleThreadPool.execute(startReportingTaskRunnable); + taskNode.setScheduledState(ScheduledState.RUNNING); } @@ -215,7 +216,8 @@ public final class StandardProcessScheduler implements ProcessScheduler { final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); final ReportingTask reportingTask = taskNode.getReportingTask(); scheduleState.setScheduled(false); - + taskNode.setScheduledState(ScheduledState.STOPPED); + final Runnable unscheduleReportingTaskRunnable = new Runnable() { @SuppressWarnings("deprecation") @Override