NIFI-724: Enable bulletins for reporting tasks and controller services

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/f1adb8bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/f1adb8bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/f1adb8bf

Branch: refs/heads/master
Commit: f1adb8bf034a8407dfdd655e441c74c10a61b18f
Parents: e767f5c
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Jun 24 14:03:34 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Jun 24 14:03:34 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/reporting/Bulletin.java     | 11 ++-
 .../apache/nifi/reporting/ComponentType.java    | 58 ++++++++++++++
 .../manager/impl/ClusteredReportingContext.java | 46 ++++++++++-
 .../cluster/manager/impl/WebClusterManager.java | 80 ++++++++++++--------
 .../org/apache/nifi/events/BulletinFactory.java | 30 ++++++--
 .../org/apache/nifi/events/SystemBulletin.java  |  2 +
 .../nifi/logging/LogRepositoryFactory.java      |  6 +-
 .../apache/nifi/controller/FlowController.java  | 35 ++++++---
 .../service/ControllerServiceLoader.java        | 11 +--
 .../nifi/events/VolatileBulletinRepository.java |  7 +-
 .../org/apache/nifi/jaxb/AdaptedBulletin.java   | 10 +++
 .../org/apache/nifi/jaxb/BulletinAdapter.java   |  3 +-
 .../logging/ControllerServiceLogObserver.java   | 45 +++++++++++
 .../nifi/logging/ReportingTaskLogObserver.java  | 45 +++++++++++
 .../nifi/remote/StandardRemoteProcessGroup.java | 16 ++--
 .../nifi/remote/StandardRootGroupPort.java      |  4 +-
 16 files changed, 341 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java 
b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
index 87443a3..fe370ae 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
@@ -34,6 +34,7 @@ public abstract class Bulletin implements 
Comparable<Bulletin> {
     private String groupId;
     private String sourceId;
     private String sourceName;
+    private ComponentType sourceType;
 
     protected Bulletin(final long id) {
         this.timestamp = new Date();
@@ -104,9 +105,17 @@ public abstract class Bulletin implements 
Comparable<Bulletin> {
         this.sourceName = sourceName;
     }
 
+    public ComponentType getSourceType() {
+        return sourceType;
+    }
+
+    public void setSourceType(ComponentType sourceType) {
+        this.sourceType = sourceType;
+    }
+
     @Override
     public String toString() {
-        return "Bulletin{" + "id=" + id + ", message=" + message + ", 
sourceName=" + sourceName + '}';
+        return "Bulletin{" + "id=" + id + ", message=" + message + ", 
sourceName=" + sourceName + ", sourceType=" + sourceType + '}';
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java 
b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
new file mode 100644
index 0000000..97f3538
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting;
+
+/**
+ * An Enumeration for indicating which type of component a Bulletin is 
associated with
+ */
+public enum ComponentType {
+
+    /**
+     * Bulletin is associated with a Processor
+     */
+    PROCESSOR,
+
+    /**
+     * Bulletin is associated with a Remote Process Group
+     */
+    REMOTE_PROCESS_GROUP,
+
+    /**
+     * Bulletin is associated with an Input Port
+     */
+    INPUT_PORT,
+
+    /**
+     * Bulletin is associated with an Output Port
+     */
+    OUTPUT_PORT,
+
+    /**
+     * Bulletin is associated with a Reporting Task
+     */
+    REPORTING_TASK,
+
+    /**
+     * Bulletin is associated with a Controller Service
+     */
+    CONTROLLER_SERVICE,
+
+    /**
+     * Bulletin is a system-level bulletin, associated with the Flow Controller
+     */
+    FLOW_CONTROLLER;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
index e546f87..c6624cc 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
@@ -24,15 +24,18 @@ import 
org.apache.nifi.attribute.expression.language.PreparedQuery;
 import org.apache.nifi.attribute.expression.language.Query;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.processor.StandardPropertyValue;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.EventAccess;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.Severity;
@@ -85,8 +88,9 @@ public class ClusteredReportingContext implements 
ReportingContext {
         final ProcessGroupStatus rootGroupStatus = 
eventAccess.getControllerStatus();
         final String groupId = findGroupId(rootGroupStatus, componentId);
         final String componentName = findComponentName(rootGroupStatus, 
componentId);
+        final ComponentType componentType = findComponentType(rootGroupStatus, 
componentId);
 
-        return BulletinFactory.createBulletin(groupId, componentId, 
componentName, category, severity.name(), message);
+        return BulletinFactory.createBulletin(groupId, componentId, 
componentType, componentName, category, severity.name(), message);
     }
 
     @Override
@@ -134,6 +138,46 @@ public class ClusteredReportingContext implements 
ReportingContext {
         return null;
     }
 
+    private ComponentType findComponentType(final ProcessGroupStatus 
groupStatus, final String componentId) {
+        for (final ProcessorStatus procStatus : 
groupStatus.getProcessorStatus()) {
+            if (procStatus.getId().equals(componentId)) {
+                return ComponentType.PROCESSOR;
+            }
+        }
+
+        for (final PortStatus portStatus : groupStatus.getInputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return ComponentType.INPUT_PORT;
+            }
+        }
+
+        for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return ComponentType.OUTPUT_PORT;
+            }
+        }
+
+        for (final RemoteProcessGroupStatus remoteStatus : 
groupStatus.getRemoteProcessGroupStatus()) {
+            if (remoteStatus.getId().equals(componentId)) {
+                return ComponentType.REMOTE_PROCESS_GROUP;
+            }
+        }
+
+        for (final ProcessGroupStatus childGroup : 
groupStatus.getProcessGroupStatus()) {
+            final ComponentType type = findComponentType(childGroup, 
componentId);
+            if (type != null) {
+                return type;
+            }
+        }
+
+        final ControllerService service = 
serviceProvider.getControllerService(componentId);
+        if (service != null) {
+            return ComponentType.CONTROLLER_SERVICE;
+        }
+
+        return null;
+    }
+
     private String findComponentName(final ProcessGroupStatus groupStatus, 
final String componentId) {
         for (final ProcessorStatus procStatus : 
groupStatus.getProcessorStatus()) {
             if (procStatus.getId().equals(componentId)) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index d6ba6db..9edc83f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -129,6 +129,7 @@ import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.StandardFlowSerializer;
+import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
@@ -159,7 +160,12 @@ import org.apache.nifi.events.VolatileBulletinRepository;
 import org.apache.nifi.framework.security.util.SslContextFactory;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
@@ -929,7 +935,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             //optional properties for all ReportingTasks
             for (final Element taskElement : 
DomUtils.getChildElementsByTagName(tasksElement, "reportingTask")) {
                 //add global properties common to all tasks
-                Map<String, String> properties = new HashMap<>();
+                final Map<String, String> properties = new HashMap<>();
 
                 //get properties for the specific reporting task - id, name, 
class,
                 //and schedulingPeriod must be set
@@ -1080,6 +1086,11 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             }
         }
 
+        // Register log observer to provide bulletins when reporting task logs 
anything at WARN level or above
+        final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, 
LogLevel.WARN,
+            new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+
         return taskNode;
     }
 
@@ -1368,7 +1379,14 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
     @Override
     public ControllerServiceNode createControllerService(final String type, 
final String id, final boolean firstTimeAdded) {
-        return controllerServiceProvider.createControllerService(type, id, 
firstTimeAdded);
+        final ControllerServiceNode serviceNode = 
controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+
+        // Register log observer to provide bulletins when reporting task logs 
anything at WARN level or above
+        final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, 
LogLevel.WARN,
+            new ControllerServiceLogObserver(getBulletinRepository(), 
serviceNode));
+
+        return serviceNode;
     }
 
     @Override
@@ -1630,7 +1648,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + 
nodeIdentifier.getApiPort();
 
         // unmarshal the message
-        BulletinsPayload payload = 
BulletinsPayload.unmarshal(bulletins.getPayload());
+        final BulletinsPayload payload = 
BulletinsPayload.unmarshal(bulletins.getPayload());
         for (final Bulletin bulletin : payload.getBulletins()) {
             bulletin.setNodeAddress(nodeAddress);
             bulletinRepository.addBulletin(bulletin);
@@ -1688,7 +1706,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
             final int numPendingHeartbeats = mostRecentHeartbeats.size();
             if (heartbeatLogger.isDebugEnabled()) {
-                heartbeatLogger.debug(String.format("Handling %s heartbeat%s", 
numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : ""));
+                heartbeatLogger.debug(String.format("Handling %s heartbeat%s", 
numPendingHeartbeats, numPendingHeartbeats > 1 ? "s" : ""));
             }
 
             for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
@@ -2130,7 +2148,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         readLock.lock();
         try {
             final ClusterServicesBroadcaster broadcaster = 
this.servicesBroadcaster;
-            return (broadcaster != null && broadcaster.isRunning());
+            return broadcaster != null && broadcaster.isRunning();
         } finally {
             readLock.unlock("isBroadcasting");
         }
@@ -2323,7 +2341,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                                 if (auditService != null) {
                                     try {
                                         
auditService.addActions(clusterContext.getActions());
-                                    } catch (Throwable t) {
+                                    } catch (final Throwable t) {
                                         logger.warn("Unable to record actions: 
" + t.getMessage());
                                         if (logger.isDebugEnabled()) {
                                             logger.warn(StringUtils.EMPTY, t);
@@ -2834,7 +2852,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ProcessorEntity nodeResponseEntity = (nodeResponse == 
clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessorEntity.class);
+                final ProcessorEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessorEntity.class);
                 final ProcessorDTO nodeProcessor = 
nodeResponseEntity.getProcessor();
                 processorMap.put(nodeResponse.getNodeId(), nodeProcessor);
             }
@@ -2851,7 +2869,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ProcessorsEntity nodeResponseEntity = (nodeResponse == 
clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
+                final ProcessorsEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
                 final Set<ProcessorDTO> nodeProcessors = 
nodeResponseEntity.getProcessors();
 
                 for (final ProcessorDTO nodeProcessor : nodeProcessors) {
@@ -2892,7 +2910,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                         continue;
                     }
 
-                    final ProcessGroupEntity nodeResponseEntity = 
(nodeResponse == clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
+                    final ProcessGroupEntity nodeResponseEntity = nodeResponse 
== clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
                     final ProcessGroupDTO nodeProcessGroup = 
nodeResponseEntity.getProcessGroup();
 
                     for (final ProcessorDTO nodeProcessor : 
nodeProcessGroup.getContents().getProcessors()) {
@@ -2952,7 +2970,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                         continue;
                     }
 
-                    final FlowSnippetEntity nodeResponseEntity = (nodeResponse 
== clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
+                    final FlowSnippetEntity nodeResponseEntity = nodeResponse 
== clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
                     final FlowSnippetDTO nodeContents = 
nodeResponseEntity.getContents();
 
                     for (final ProcessorDTO nodeProcessor : 
nodeContents.getProcessors()) {
@@ -2995,7 +3013,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
             // create a new client response
             clientResponse = new NodeResponse(clientResponse, responseEntity);
-        } else if (hasSuccessfulClientResponse && 
(isRemoteProcessGroupEndpoint(uri, method))) {
+        } else if (hasSuccessfulClientResponse && 
isRemoteProcessGroupEndpoint(uri, method)) {
             final RemoteProcessGroupEntity responseEntity = 
clientResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
             final RemoteProcessGroupDTO remoteProcessGroup = 
responseEntity.getRemoteProcessGroup();
 
@@ -3005,7 +3023,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final RemoteProcessGroupEntity nodeResponseEntity = 
(nodeResponse == clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
+                final RemoteProcessGroupEntity nodeResponseEntity = 
nodeResponse == clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
                 final RemoteProcessGroupDTO nodeRemoteProcessGroup = 
nodeResponseEntity.getRemoteProcessGroup();
 
                 remoteProcessGroupMap.put(nodeResponse.getNodeId(), 
nodeRemoteProcessGroup);
@@ -3013,7 +3031,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             mergeRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupMap);
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
-        } else if (hasSuccessfulClientResponse && 
(isRemoteProcessGroupsEndpoint(uri, method))) {
+        } else if (hasSuccessfulClientResponse && 
isRemoteProcessGroupsEndpoint(uri, method)) {
             final RemoteProcessGroupsEntity responseEntity = 
clientResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
             final Set<RemoteProcessGroupDTO> remoteProcessGroups = 
responseEntity.getRemoteProcessGroups();
 
@@ -3023,7 +3041,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final RemoteProcessGroupsEntity nodeResponseEntity = 
(nodeResponse == clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
+                final RemoteProcessGroupsEntity nodeResponseEntity = 
nodeResponse == clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
                 final Set<RemoteProcessGroupDTO> nodeRemoteProcessGroups = 
nodeResponseEntity.getRemoteProcessGroups();
 
                 for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : 
nodeRemoteProcessGroups) {
@@ -3056,7 +3074,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ProvenanceEntity nodeResponseEntity = (nodeResponse == 
clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
+                final ProvenanceEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
                 final ProvenanceDTO nodeQuery = 
nodeResponseEntity.getProvenance();
 
                 resultsMap.put(nodeResponse.getNodeId(), nodeQuery);
@@ -3084,7 +3102,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ControllerServiceEntity nodeResponseEntity = 
(nodeResponse == clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+                final ControllerServiceEntity nodeResponseEntity = 
nodeResponse == clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
                 final ControllerServiceDTO nodeControllerService = 
nodeResponseEntity.getControllerService();
 
                 resultsMap.put(nodeResponse.getNodeId(), 
nodeControllerService);
@@ -3102,7 +3120,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ControllerServicesEntity nodeResponseEntity = 
(nodeResponse == clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+                final ControllerServicesEntity nodeResponseEntity = 
nodeResponse == clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
                 final Set<ControllerServiceDTO> nodeControllerServices = 
nodeResponseEntity.getControllerServices();
 
                 for (final ControllerServiceDTO nodeControllerService : 
nodeControllerServices) {
@@ -3136,7 +3154,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                 }
 
                 final ControllerServiceReferencingComponentsEntity 
nodeResponseEntity =
-                        (nodeResponse == clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+                        nodeResponse == clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
                 final Set<ControllerServiceReferencingComponentDTO> 
nodeReferencingComponents = 
nodeResponseEntity.getControllerServiceReferencingComponents();
 
                 resultsMap.put(nodeResponse.getNodeId(), 
nodeReferencingComponents);
@@ -3154,7 +3172,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ReportingTaskEntity nodeResponseEntity = (nodeResponse 
== clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+                final ReportingTaskEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
                 final ReportingTaskDTO nodeReportingTask = 
nodeResponseEntity.getReportingTask();
 
                 resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
@@ -3172,7 +3190,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ReportingTasksEntity nodeResponseEntity = (nodeResponse 
== clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+                final ReportingTasksEntity nodeResponseEntity = nodeResponse 
== clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
                 final Set<ReportingTaskDTO> nodeReportingTasks = 
nodeResponseEntity.getReportingTasks();
 
                 for (final ReportingTaskDTO nodeReportingTask : 
nodeReportingTasks) {
@@ -3428,7 +3446,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     }
 
     private boolean canChangeNodeState(final String method, final URI uri) {
-        return (HttpMethod.DELETE.equalsIgnoreCase(method) || 
HttpMethod.POST.equalsIgnoreCase(method) || 
HttpMethod.PUT.equalsIgnoreCase(method));
+        return HttpMethod.DELETE.equalsIgnoreCase(method) || 
HttpMethod.POST.equalsIgnoreCase(method) || 
HttpMethod.PUT.equalsIgnoreCase(method);
     }
 
     private void notifyDataFlowManagementServiceOfNodeStatusChange() {
@@ -3477,7 +3495,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             public void run() {
                 logger.info("Entering safe mode...");
                 final int safeModeSeconds = (int) 
FormatUtils.getTimeDuration(properties.getClusterManagerSafeModeDuration(), 
TimeUnit.SECONDS);
-                final long timeToElect = (safeModeSeconds <= 0) ? 
Long.MAX_VALUE : threadStartTime + 
TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
+                final long timeToElect = safeModeSeconds <= 0 ? Long.MAX_VALUE 
: threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, 
TimeUnit.SECONDS);
                 boolean exitSafeMode = false;
                 while (isRunning()) {
 
@@ -3819,7 +3837,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
     public static Date normalizeStatusSnapshotDate(final Date toNormalize, 
final long numMillis) {
         final long time = toNormalize.getTime();
-        return new Date(time - (time % numMillis));
+        return new Date(time - time % numMillis);
     }
 
     private NodeDTO createNodeDTO(final Node node) {
@@ -3861,8 +3879,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> processorDescriptors = new 
LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new 
TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
@@ -3942,8 +3960,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> connectionDescriptors = new 
LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new 
TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
@@ -4006,8 +4024,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> processGroupDescriptors = new 
LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> processGroupDescriptors = new 
LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new 
TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
@@ -4070,8 +4088,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new 
LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new 
LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new 
TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
index d1d5e5b..4795827 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
@@ -17,24 +17,43 @@
 package org.apache.nifi.events;
 
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.ComponentType;
 
-/**
- *
- */
 public final class BulletinFactory {
 
     private static final AtomicLong currentId = new AtomicLong(0);
 
     public static Bulletin createBulletin(final Connectable connectable, final 
String category, final String severity, final String message) {
-        return 
BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), 
connectable.getIdentifier(), connectable.getName(), category, severity, 
message);
+        final ComponentType type;
+        switch (connectable.getConnectableType()) {
+            case REMOTE_INPUT_PORT:
+            case REMOTE_OUTPUT_PORT:
+                type = ComponentType.REMOTE_PROCESS_GROUP;
+                break;
+            case INPUT_PORT:
+                type = ComponentType.INPUT_PORT;
+                break;
+            case OUTPUT_PORT:
+                type = ComponentType.OUTPUT_PORT;
+                break;
+            case PROCESSOR:
+            default:
+                type = ComponentType.PROCESSOR;
+                break;
+        }
+
+        return 
BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), 
connectable.getIdentifier(), type, connectable.getName(), category, severity, 
message);
     }
 
-    public static Bulletin createBulletin(final String groupId, final String 
sourceId, final String sourceName, final String category, final String 
severity, final String message) {
+    public static Bulletin createBulletin(final String groupId, final String 
sourceId, final ComponentType sourceType, final String sourceName,
+        final String category, final String severity, final String message) {
         final Bulletin bulletin = new 
ComponentBulletin(currentId.getAndIncrement());
         bulletin.setGroupId(groupId);
         bulletin.setSourceId(sourceId);
+        bulletin.setSourceType(sourceType);
         bulletin.setSourceName(sourceName);
         bulletin.setCategory(category);
         bulletin.setLevel(severity);
@@ -47,6 +66,7 @@ public final class BulletinFactory {
         bulletin.setCategory(category);
         bulletin.setLevel(severity);
         bulletin.setMessage(message);
+        bulletin.setSourceType(ComponentType.FLOW_CONTROLLER);
         return bulletin;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
index f97dc46..3359e7e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.events;
 
 import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.ComponentType;
 
 /**
  *
@@ -25,6 +26,7 @@ public class SystemBulletin extends Bulletin {
 
     SystemBulletin(final long id) {
         super(id);
+        setSourceType(ComponentType.FLOW_CONTROLLER);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
index 76ca661..d7fa3fc 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
@@ -41,8 +41,8 @@ public class LogRepositoryFactory {
         logRepositoryClass = clazz;
     }
 
-    public static LogRepository getRepository(final String processorId) {
-        LogRepository repository = 
repositoryMap.get(requireNonNull(processorId));
+    public static LogRepository getRepository(final String componentId) {
+        LogRepository repository = 
repositoryMap.get(requireNonNull(componentId));
         if (repository == null) {
             try {
                 repository = logRepositoryClass.newInstance();
@@ -50,7 +50,7 @@ public class LogRepositoryFactory {
                 throw new RuntimeException(e);
             }
 
-            final LogRepository oldRepository = 
repositoryMap.putIfAbsent(processorId, repository);
+            final LogRepository oldRepository = 
repositoryMap.putIfAbsent(componentId, repository);
             if (oldRepository != null) {
                 repository = oldRepository;
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2ffdd4e..b6edbbb 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -137,11 +137,13 @@ import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.groups.StandardProcessGroup;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.logging.ProcessorLogObserver;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
@@ -593,7 +595,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         writeLock.lock();
         try {
             if (startDelayedComponents) {
-                LOG.info("Starting {} processors/ports/funnels", 
(startConnectablesAfterInitialization.size() + 
startRemoteGroupPortsAfterInitialization.size()));
+                LOG.info("Starting {} processors/ports/funnels", 
startConnectablesAfterInitialization.size() + 
startRemoteGroupPortsAfterInitialization.size());
                 for (final Connectable connectable : 
startConnectablesAfterInitialization) {
                     if (connectable.getScheduledState() == 
ScheduledState.DISABLED) {
                         continue;
@@ -1012,7 +1014,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     public boolean isTerminated() {
         this.readLock.lock();
         try {
-            return (null == this.timerDrivenEngineRef.get() || 
this.timerDrivenEngineRef.get().isTerminated());
+            return null == this.timerDrivenEngineRef.get() || 
this.timerDrivenEngineRef.get().isTerminated();
         } finally {
             this.readLock.unlock();
         }
@@ -1828,9 +1830,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         } else if (id1.equals(id2)) {
             return true;
         } else {
-            final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id1);
-            final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id2);
-            return (comparable1.equals(comparable2));
+            final String comparable1 = id1.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id1;
+            final String comparable2 = id2.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id2;
+            return comparable1.equals(comparable2);
         }
     }
 
@@ -1964,7 +1966,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
 
         final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id;
-        return (root == null) ? null : root.findProcessGroup(searchId);
+        return root == null ? null : root.findProcessGroup(searchId);
     }
 
     @Override
@@ -2079,8 +2081,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 
connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
                 
connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
 
-                flowFilesTransferred += 
(connectionStatusReport.getFlowFilesIn() + 
connectionStatusReport.getFlowFilesOut());
-                bytesTransferred += (connectionStatusReport.getContentSizeIn() 
+ connectionStatusReport.getContentSizeOut());
+                flowFilesTransferred += 
connectionStatusReport.getFlowFilesIn() + 
connectionStatusReport.getFlowFilesOut();
+                bytesTransferred += connectionStatusReport.getContentSizeIn() 
+ connectionStatusReport.getContentSizeOut();
             }
 
             if (StringUtils.isNotBlank(conn.getName())) {
@@ -2552,6 +2554,12 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
 
         reportingTasks.put(id, taskNode);
+
+        // Register log observer to provide bulletins when reporting task logs 
anything at WARN level or above
+        final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, 
LogLevel.WARN,
+            new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+
         return taskNode;
     }
 
@@ -2616,7 +2624,14 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
     @Override
     public ControllerServiceNode createControllerService(final String type, 
final String id, final boolean firstTimeAdded) {
-        return controllerServiceProvider.createControllerService(type, id, 
firstTimeAdded);
+        final ControllerServiceNode serviceNode = 
controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+
+        // Register log observer to provide bulletins when reporting task logs 
anything at WARN level or above
+        final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, 
LogLevel.WARN,
+            new ControllerServiceLogObserver(getBulletinRepository(), 
serviceNode));
+
+        return serviceNode;
     }
 
     @Override
@@ -3480,7 +3495,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                     if (bulletin.getGroupId() == null) {
                         escapedBulletin = 
BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), 
escapedBulletinMessage);
                     } else {
-                        escapedBulletin = 
BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(),
+                        escapedBulletin = 
BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), 
bulletin.getSourceType(),
                                 bulletin.getSourceName(), 
bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
                     }
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 92fa3b2..b5c3855 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -48,11 +48,12 @@ public class ControllerServiceLoader {
     private static final Logger logger = 
LoggerFactory.getLogger(ControllerServiceLoader.class);
 
     public static List<ControllerServiceNode> loadControllerServices(
-            final ControllerServiceProvider provider,
-            final InputStream serializedStream,
-            final StringEncryptor encryptor,
-            final BulletinRepository bulletinRepo,
-            final boolean autoResumeState) throws IOException {
+        final ControllerServiceProvider provider,
+        final InputStream serializedStream,
+        final StringEncryptor encryptor,
+        final BulletinRepository bulletinRepo,
+        final boolean autoResumeState) throws IOException {
+
         final DocumentBuilderFactory documentBuilderFactory = 
DocumentBuilderFactory.newInstance();
         documentBuilderFactory.setNamespaceAware(true);
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index a20e974..a58bf8e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.util.RingBuffer;
 import org.apache.nifi.util.RingBuffer.Filter;
 
@@ -167,7 +168,7 @@ public class VolatileBulletinRepository implements 
BulletinRepository {
         }
 
         final RingBuffer<Bulletin> buffer = 
componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
-        return (buffer == null) ? Collections.<Bulletin>emptyList() : 
buffer.getSelectedElements(new Filter<Bulletin>() {
+        return buffer == null ? Collections.<Bulletin>emptyList() : 
buffer.getSelectedElements(new Filter<Bulletin>() {
             @Override
             public boolean select(final Bulletin bulletin) {
                 return bulletin.getTimestamp().getTime() >= fiveMinutesAgo;
@@ -199,7 +200,7 @@ public class VolatileBulletinRepository implements 
BulletinRepository {
         ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = 
bulletinStoreMap.get(groupId);
         if (componentMap == null) {
             componentMap = new ConcurrentHashMap<>();
-            ConcurrentMap<String, RingBuffer<Bulletin>> existing = 
bulletinStoreMap.putIfAbsent(groupId, componentMap);
+            final ConcurrentMap<String, RingBuffer<Bulletin>> existing = 
bulletinStoreMap.putIfAbsent(groupId, componentMap);
             if (existing != null) {
                 componentMap = existing;
             }
@@ -225,7 +226,7 @@ public class VolatileBulletinRepository implements 
BulletinRepository {
     }
 
     private boolean isControllerBulletin(final Bulletin bulletin) {
-        return bulletin.getGroupId() == null;
+        return ComponentType.FLOW_CONTROLLER.equals(bulletin.getSourceType());
     }
 
     private class DefaultBulletinProcessingStrategy implements 
BulletinProcessingStrategy {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
index 6f1dc45..17c5991 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
@@ -18,6 +18,8 @@ package org.apache.nifi.jaxb;
 
 import java.util.Date;
 
+import org.apache.nifi.reporting.ComponentType;
+
 /**
  *
  */
@@ -32,6 +34,7 @@ public class AdaptedBulletin {
     private String groupId;
     private String sourceId;
     private String sourceName;
+    private ComponentType sourceType;
 
     public String getCategory() {
         return category;
@@ -97,4 +100,11 @@ public class AdaptedBulletin {
         this.timestamp = timestamp;
     }
 
+    public ComponentType getSourceType() {
+        return sourceType;
+    }
+
+    public void setSourceType(ComponentType sourceType) {
+        this.sourceType = sourceType;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
index b699348..acbe0dd 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
@@ -34,7 +34,7 @@ public class BulletinAdapter extends 
XmlAdapter<AdaptedBulletin, Bulletin> {
         if (b.getSourceId() == null) {
             return BulletinFactory.createBulletin(b.getCategory(), 
b.getLevel(), b.getMessage());
         } else {
-            return BulletinFactory.createBulletin(b.getGroupId(), 
b.getSourceId(), b.getSourceName(), b.getCategory(), b.getLevel(), 
b.getMessage());
+            return BulletinFactory.createBulletin(b.getGroupId(), 
b.getSourceId(), b.getSourceType(), b.getSourceName(), b.getCategory(), 
b.getLevel(), b.getMessage());
         }
     }
 
@@ -48,6 +48,7 @@ public class BulletinAdapter extends 
XmlAdapter<AdaptedBulletin, Bulletin> {
         aBulletin.setTimestamp(b.getTimestamp());
         aBulletin.setGroupId(b.getGroupId());
         aBulletin.setSourceId(b.getSourceId());
+        aBulletin.setSourceType(b.getSourceType());
         aBulletin.setSourceName(b.getSourceName());
         aBulletin.setCategory(b.getCategory());
         aBulletin.setLevel(b.getLevel());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
new file mode 100644
index 0000000..837e1c4
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.Severity;
+
+public class ControllerServiceLogObserver implements LogObserver {
+    private final BulletinRepository bulletinRepository;
+    private final ControllerServiceNode serviceNode;
+
+    public ControllerServiceLogObserver(final BulletinRepository 
bulletinRepository, final ControllerServiceNode serviceNode) {
+        this.bulletinRepository = bulletinRepository;
+        this.serviceNode = serviceNode;
+    }
+
+    @Override
+    public void onLogMessage(final LogMessage message) {
+        // Map LogLevel.WARN to Severity.WARNING so that we are consistent 
with the Severity enumeration. Else, just use whatever
+        // the LogLevel is (INFO and ERROR map directly and all others we will 
just accept as they are).
+        final String bulletinLevel = message.getLevel() == LogLevel.WARN ? 
Severity.WARNING.name() : message.getLevel().toString();
+
+        final Bulletin bulletin = BulletinFactory.createBulletin(null, 
serviceNode.getIdentifier(), ComponentType.REPORTING_TASK,
+            serviceNode.getName(), "Log Message", bulletinLevel, 
message.getMessage());
+        bulletinRepository.addBulletin(bulletin);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
new file mode 100644
index 0000000..e5638d6
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.Severity;
+
+public class ReportingTaskLogObserver implements LogObserver {
+    private final BulletinRepository bulletinRepository;
+    private final ReportingTaskNode taskNode;
+
+    public ReportingTaskLogObserver(final BulletinRepository 
bulletinRepository, final ReportingTaskNode taskNode) {
+        this.bulletinRepository = bulletinRepository;
+        this.taskNode = taskNode;
+    }
+
+    @Override
+    public void onLogMessage(final LogMessage message) {
+        // Map LogLevel.WARN to Severity.WARNING so that we are consistent 
with the Severity enumeration. Else, just use whatever
+        // the LogLevel is (INFO and ERROR map directly and all others we will 
just accept as they are).
+        final String bulletinLevel = message.getLevel() == LogLevel.WARN ? 
Severity.WARNING.name() : message.getLevel().toString();
+
+        final Bulletin bulletin = BulletinFactory.createBulletin(null, 
taskNode.getIdentifier(), ComponentType.REPORTING_TASK,
+            taskNode.getName(), "Log Message", bulletinLevel, 
message.getMessage());
+        bulletinRepository.addBulletin(bulletin);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index d19b5c1..61516d0 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -57,6 +57,7 @@ import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
@@ -164,7 +165,8 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                 final String groupId = 
StandardRemoteProcessGroup.this.getProcessGroup().getIdentifier();
                 final String sourceId = 
StandardRemoteProcessGroup.this.getIdentifier();
                 final String sourceName = 
StandardRemoteProcessGroup.this.getName();
-                
bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, 
sourceId, sourceName, category, severity.name(), message));
+                
bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, 
sourceId, ComponentType.REMOTE_PROCESS_GROUP,
+                    sourceName, category, severity.name(), message));
             }
         };
 
@@ -227,7 +229,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
     @Override
     public String getName() {
         final String name = this.name.get();
-        return (name == null) ? targetUri.toString() : name;
+        return name == null ? targetUri.toString() : name;
     }
 
     @Override
@@ -671,7 +673,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
 
     private ProcessGroup getRootGroup(final ProcessGroup context) {
         final ProcessGroup parent = context.getParent();
-        return (parent == null) ? context : getRootGroup(parent);
+        return parent == null ? context : getRootGroup(parent);
     }
 
     private boolean isWebApiSecure() {
@@ -714,7 +716,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
     public Date getLastRefreshTime() {
         readLock.lock();
         try {
-            return (refreshContentsTimestamp == null) ? null : new 
Date(refreshContentsTimestamp);
+            return refreshContentsTimestamp == null ? null : new 
Date(refreshContentsTimestamp);
         } finally {
             readLock.unlock();
         }
@@ -855,7 +857,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
         Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
         if (ports != null) {
             remotePorts = new LinkedHashSet<>(ports.size());
-            for (PortDTO port : ports) {
+            for (final PortDTO port : ports) {
                 final StandardRemoteProcessGroupPortDescriptor descriptor = 
new StandardRemoteProcessGroupPortDescriptor();
                 final ScheduledState scheduledState = 
ScheduledState.valueOf(port.getState());
                 descriptor.setId(port.getId());
@@ -1093,7 +1095,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                     }
 
                     final String remoteInstanceId = dto.getInstanceId();
-                    boolean isPointingToCluster = 
flowController.getInstanceId().equals(remoteInstanceId);
+                    final boolean isPointingToCluster = 
flowController.getInstanceId().equals(remoteInstanceId);
                     pointsToCluster.set(isPointingToCluster);
                 } else if (statusCode == UNAUTHORIZED_STATUS_CODE) {
                     try {
@@ -1120,7 +1122,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                             new Object[]{this, response.getStatus(), 
response.getStatusInfo().getReasonPhrase(), message});
                     authorizationIssue = "Unable to determine Site-to-Site 
availability.";
                 }
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 logger.warn(String.format("Unable to connect to %s due to %s", 
StandardRemoteProcessGroup.this, e));
                 getEventReporter().reportEvent(Severity.WARNING, "Site to 
Site", String.format("Unable to connect to %s due to %s",
                         
StandardRemoteProcessGroup.this.getTargetUri().toString(), e));

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 4bb1683..9eadec0 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -61,6 +61,7 @@ import 
org.apache.nifi.remote.exception.TransmissionDisabledException;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.ServerProtocol;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.user.NiFiUser;
@@ -108,7 +109,8 @@ public class StandardRootGroupPort extends AbstractPort 
implements RootGroupPort
                 final String groupId = 
StandardRootGroupPort.this.getProcessGroup().getIdentifier();
                 final String sourceId = 
StandardRootGroupPort.this.getIdentifier();
                 final String sourceName = StandardRootGroupPort.this.getName();
-                
bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, 
sourceId, sourceName, category, severity.name(), message));
+                final ComponentType componentType = direction == 
TransferDirection.RECEIVE ? ComponentType.INPUT_PORT : 
ComponentType.OUTPUT_PORT;
+                
bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, 
sourceId, componentType, sourceName, category, severity.name(), message));
             }
         };
 

Reply via email to