NIFI-724: Enable bulletins for reporting tasks and controller services
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e240e07a Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e240e07a Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e240e07a Branch: refs/heads/develop Commit: e240e07aaebea1fd66b22fce8aec3f0005fd3f60 Parents: e767f5c Author: Mark Payne <marka...@hotmail.com> Authored: Wed Jun 24 14:03:34 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Thu Jun 25 10:56:49 2015 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/reporting/Bulletin.java | 11 ++- .../apache/nifi/reporting/ComponentType.java | 58 ++++++++++++++ .../manager/impl/ClusteredReportingContext.java | 46 ++++++++++- .../cluster/manager/impl/WebClusterManager.java | 80 ++++++++++++-------- .../org/apache/nifi/events/BulletinFactory.java | 30 ++++++-- .../org/apache/nifi/events/SystemBulletin.java | 2 + .../nifi/logging/LogRepositoryFactory.java | 6 +- .../apache/nifi/controller/FlowController.java | 35 ++++++--- .../service/ControllerServiceLoader.java | 11 +-- .../nifi/events/VolatileBulletinRepository.java | 18 +++-- .../org/apache/nifi/jaxb/AdaptedBulletin.java | 10 +++ .../org/apache/nifi/jaxb/BulletinAdapter.java | 3 +- .../logging/ControllerServiceLogObserver.java | 45 +++++++++++ .../nifi/logging/ReportingTaskLogObserver.java | 45 +++++++++++ .../nifi/remote/StandardRemoteProcessGroup.java | 16 ++-- .../nifi/remote/StandardRootGroupPort.java | 4 +- 16 files changed, 349 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java index 87443a3..fe370ae 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java @@ -34,6 +34,7 @@ public abstract class Bulletin implements Comparable<Bulletin> { private String groupId; private String sourceId; private String sourceName; + private ComponentType sourceType; protected Bulletin(final long id) { this.timestamp = new Date(); @@ -104,9 +105,17 @@ public abstract class Bulletin implements Comparable<Bulletin> { this.sourceName = sourceName; } + public ComponentType getSourceType() { + return sourceType; + } + + public void setSourceType(ComponentType sourceType) { + this.sourceType = sourceType; + } + @Override public String toString() { - return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + '}'; + return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + ", sourceType=" + sourceType + '}'; } @Override http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java new file mode 100644 index 0000000..97f3538 --- /dev/null +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +/** + * An Enumeration for indicating which type of component a Bulletin is associated with + */ +public enum ComponentType { + + /** + * Bulletin is associated with a Processor + */ + PROCESSOR, + + /** + * Bulletin is associated with a Remote Process Group + */ + REMOTE_PROCESS_GROUP, + + /** + * Bulletin is associated with an Input Port + */ + INPUT_PORT, + + /** + * Bulletin is associated with an Output Port + */ + OUTPUT_PORT, + + /** + * Bulletin is associated with a Reporting Task + */ + REPORTING_TASK, + + /** + * Bulletin is associated with a Controller Service + */ + CONTROLLER_SERVICE, + + /** + * Bulletin is a system-level bulletin, associated with the Flow Controller + */ + FLOW_CONTROLLER; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java index e546f87..c6624cc 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java @@ -24,15 +24,18 @@ import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.processor.StandardPropertyValue; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.reporting.EventAccess; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.Severity; @@ -85,8 +88,9 @@ public class ClusteredReportingContext implements ReportingContext { final ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus(); final String groupId = findGroupId(rootGroupStatus, componentId); final String componentName = findComponentName(rootGroupStatus, componentId); + final ComponentType componentType = findComponentType(rootGroupStatus, componentId); - return BulletinFactory.createBulletin(groupId, componentId, componentName, category, severity.name(), message); + return BulletinFactory.createBulletin(groupId, componentId, componentType, componentName, category, severity.name(), message); } @Override @@ -134,6 +138,46 @@ public class ClusteredReportingContext implements ReportingContext { return null; } + private ComponentType findComponentType(final ProcessGroupStatus groupStatus, final String componentId) { + for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) { + if (procStatus.getId().equals(componentId)) { + return ComponentType.PROCESSOR; + } + } + + for (final PortStatus portStatus : groupStatus.getInputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return ComponentType.INPUT_PORT; + } + } + + for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return ComponentType.OUTPUT_PORT; + } + } + + for (final RemoteProcessGroupStatus remoteStatus : groupStatus.getRemoteProcessGroupStatus()) { + if (remoteStatus.getId().equals(componentId)) { + return ComponentType.REMOTE_PROCESS_GROUP; + } + } + + for (final ProcessGroupStatus childGroup : groupStatus.getProcessGroupStatus()) { + final ComponentType type = findComponentType(childGroup, componentId); + if (type != null) { + return type; + } + } + + final ControllerService service = serviceProvider.getControllerService(componentId); + if (service != null) { + return ComponentType.CONTROLLER_SERVICE; + } + + return null; + } + private String findComponentName(final ProcessGroupStatus groupStatus, final String componentId) { for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) { if (procStatus.getId().equals(componentId)) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index d6ba6db..9edc83f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -129,6 +129,7 @@ import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.StandardFlowSerializer; +import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode; @@ -159,7 +160,12 @@ import org.apache.nifi.events.VolatileBulletinRepository; import org.apache.nifi.framework.security.util.SslContextFactory; import org.apache.nifi.io.socket.multicast.DiscoverableService; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.logging.ControllerServiceLogObserver; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.logging.LogRepository; +import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.logging.NiFiLog; +import org.apache.nifi.logging.ReportingTaskLogObserver; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.nar.NarThreadContextClassLoader; @@ -929,7 +935,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C //optional properties for all ReportingTasks for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "reportingTask")) { //add global properties common to all tasks - Map<String, String> properties = new HashMap<>(); + final Map<String, String> properties = new HashMap<>(); //get properties for the specific reporting task - id, name, class, //and schedulingPeriod must be set @@ -1080,6 +1086,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } + // Register log observer to provide bulletins when reporting task logs anything at WARN level or above + final LogRepository logRepository = LogRepositoryFactory.getRepository(id); + logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, + new ReportingTaskLogObserver(getBulletinRepository(), taskNode)); + return taskNode; } @@ -1368,7 +1379,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C @Override public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { - return controllerServiceProvider.createControllerService(type, id, firstTimeAdded); + final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded); + + // Register log observer to provide bulletins when reporting task logs anything at WARN level or above + final LogRepository logRepository = LogRepositoryFactory.getRepository(id); + logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, + new ControllerServiceLogObserver(getBulletinRepository(), serviceNode)); + + return serviceNode; } @Override @@ -1630,7 +1648,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort(); // unmarshal the message - BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload()); + final BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload()); for (final Bulletin bulletin : payload.getBulletins()) { bulletin.setNodeAddress(nodeAddress); bulletinRepository.addBulletin(bulletin); @@ -1688,7 +1706,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final int numPendingHeartbeats = mostRecentHeartbeats.size(); if (heartbeatLogger.isDebugEnabled()) { - heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : "")); + heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, numPendingHeartbeats > 1 ? "s" : "")); } for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) { @@ -2130,7 +2148,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C readLock.lock(); try { final ClusterServicesBroadcaster broadcaster = this.servicesBroadcaster; - return (broadcaster != null && broadcaster.isRunning()); + return broadcaster != null && broadcaster.isRunning(); } finally { readLock.unlock("isBroadcasting"); } @@ -2323,7 +2341,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C if (auditService != null) { try { auditService.addActions(clusterContext.getActions()); - } catch (Throwable t) { + } catch (final Throwable t) { logger.warn("Unable to record actions: " + t.getMessage()); if (logger.isDebugEnabled()) { logger.warn(StringUtils.EMPTY, t); @@ -2834,7 +2852,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final ProcessorEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorEntity.class); + final ProcessorEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorEntity.class); final ProcessorDTO nodeProcessor = nodeResponseEntity.getProcessor(); processorMap.put(nodeResponse.getNodeId(), nodeProcessor); } @@ -2851,7 +2869,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final ProcessorsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class); + final ProcessorsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class); final Set<ProcessorDTO> nodeProcessors = nodeResponseEntity.getProcessors(); for (final ProcessorDTO nodeProcessor : nodeProcessors) { @@ -2892,7 +2910,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final ProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class); + final ProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class); final ProcessGroupDTO nodeProcessGroup = nodeResponseEntity.getProcessGroup(); for (final ProcessorDTO nodeProcessor : nodeProcessGroup.getContents().getProcessors()) { @@ -2952,7 +2970,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final FlowSnippetEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class); + final FlowSnippetEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class); final FlowSnippetDTO nodeContents = nodeResponseEntity.getContents(); for (final ProcessorDTO nodeProcessor : nodeContents.getProcessors()) { @@ -2995,7 +3013,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // create a new client response clientResponse = new NodeResponse(clientResponse, responseEntity); - } else if (hasSuccessfulClientResponse && (isRemoteProcessGroupEndpoint(uri, method))) { + } else if (hasSuccessfulClientResponse && isRemoteProcessGroupEndpoint(uri, method)) { final RemoteProcessGroupEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class); final RemoteProcessGroupDTO remoteProcessGroup = responseEntity.getRemoteProcessGroup(); @@ -3005,7 +3023,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final RemoteProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class); + final RemoteProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class); final RemoteProcessGroupDTO nodeRemoteProcessGroup = nodeResponseEntity.getRemoteProcessGroup(); remoteProcessGroupMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup); @@ -3013,7 +3031,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C mergeRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupMap); clientResponse = new NodeResponse(clientResponse, responseEntity); - } else if (hasSuccessfulClientResponse && (isRemoteProcessGroupsEndpoint(uri, method))) { + } else if (hasSuccessfulClientResponse && isRemoteProcessGroupsEndpoint(uri, method)) { final RemoteProcessGroupsEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class); final Set<RemoteProcessGroupDTO> remoteProcessGroups = responseEntity.getRemoteProcessGroups(); @@ -3023,7 +3041,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final RemoteProcessGroupsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class); + final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class); final Set<RemoteProcessGroupDTO> nodeRemoteProcessGroups = nodeResponseEntity.getRemoteProcessGroups(); for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeRemoteProcessGroups) { @@ -3056,7 +3074,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final ProvenanceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class); + final ProvenanceEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class); final ProvenanceDTO nodeQuery = nodeResponseEntity.getProvenance(); resultsMap.put(nodeResponse.getNodeId(), nodeQuery); @@ -3084,7 +3102,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final ControllerServiceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class); + final ControllerServiceEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class); final ControllerServiceDTO nodeControllerService = nodeResponseEntity.getControllerService(); resultsMap.put(nodeResponse.getNodeId(), nodeControllerService); @@ -3102,7 +3120,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final ControllerServicesEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class); + final ControllerServicesEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class); final Set<ControllerServiceDTO> nodeControllerServices = nodeResponseEntity.getControllerServices(); for (final ControllerServiceDTO nodeControllerService : nodeControllerServices) { @@ -3136,7 +3154,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } final ControllerServiceReferencingComponentsEntity nodeResponseEntity = - (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class); + nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class); final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents(); resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents); @@ -3154,7 +3172,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final ReportingTaskEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class); + final ReportingTaskEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class); final ReportingTaskDTO nodeReportingTask = nodeResponseEntity.getReportingTask(); resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask); @@ -3172,7 +3190,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final ReportingTasksEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class); + final ReportingTasksEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class); final Set<ReportingTaskDTO> nodeReportingTasks = nodeResponseEntity.getReportingTasks(); for (final ReportingTaskDTO nodeReportingTask : nodeReportingTasks) { @@ -3428,7 +3446,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } private boolean canChangeNodeState(final String method, final URI uri) { - return (HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method)); + return HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method); } private void notifyDataFlowManagementServiceOfNodeStatusChange() { @@ -3477,7 +3495,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public void run() { logger.info("Entering safe mode..."); final int safeModeSeconds = (int) FormatUtils.getTimeDuration(properties.getClusterManagerSafeModeDuration(), TimeUnit.SECONDS); - final long timeToElect = (safeModeSeconds <= 0) ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS); + final long timeToElect = safeModeSeconds <= 0 ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS); boolean exitSafeMode = false; while (isRunning()) { @@ -3819,7 +3837,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static Date normalizeStatusSnapshotDate(final Date toNormalize, final long numMillis) { final long time = toNormalize.getTime(); - return new Date(time - (time % numMillis)); + return new Date(time - time % numMillis); } private NodeDTO createNodeDTO(final Node node) { @@ -3861,8 +3879,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); StatusHistoryDTO lastStatusHistory = null; - Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>(); - Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); + final Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>(); + final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); for (final Node node : getRawNodes()) { final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); @@ -3942,8 +3960,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); StatusHistoryDTO lastStatusHistory = null; - Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>(); - Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); + final Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>(); + final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); for (final Node node : getRawNodes()) { final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); @@ -4006,8 +4024,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); StatusHistoryDTO lastStatusHistory = null; - Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>(); - Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); + final Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>(); + final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); for (final Node node : getRawNodes()) { final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); @@ -4070,8 +4088,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>(); StatusHistoryDTO lastStatusHistory = null; - Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>(); - Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); + final Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>(); + final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); for (final Node node : getRawNodes()) { final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java index d1d5e5b..4795827 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java @@ -17,24 +17,43 @@ package org.apache.nifi.events; import java.util.concurrent.atomic.AtomicLong; + import org.apache.nifi.connectable.Connectable; import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.ComponentType; -/** - * - */ public final class BulletinFactory { private static final AtomicLong currentId = new AtomicLong(0); public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message) { - return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), connectable.getName(), category, severity, message); + final ComponentType type; + switch (connectable.getConnectableType()) { + case REMOTE_INPUT_PORT: + case REMOTE_OUTPUT_PORT: + type = ComponentType.REMOTE_PROCESS_GROUP; + break; + case INPUT_PORT: + type = ComponentType.INPUT_PORT; + break; + case OUTPUT_PORT: + type = ComponentType.OUTPUT_PORT; + break; + case PROCESSOR: + default: + type = ComponentType.PROCESSOR; + break; + } + + return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), type, connectable.getName(), category, severity, message); } - public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) { + public static Bulletin createBulletin(final String groupId, final String sourceId, final ComponentType sourceType, final String sourceName, + final String category, final String severity, final String message) { final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement()); bulletin.setGroupId(groupId); bulletin.setSourceId(sourceId); + bulletin.setSourceType(sourceType); bulletin.setSourceName(sourceName); bulletin.setCategory(category); bulletin.setLevel(severity); @@ -47,6 +66,7 @@ public final class BulletinFactory { bulletin.setCategory(category); bulletin.setLevel(severity); bulletin.setMessage(message); + bulletin.setSourceType(ComponentType.FLOW_CONTROLLER); return bulletin; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java index f97dc46..3359e7e 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java @@ -17,6 +17,7 @@ package org.apache.nifi.events; import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.ComponentType; /** * @@ -25,6 +26,7 @@ public class SystemBulletin extends Bulletin { SystemBulletin(final long id) { super(id); + setSourceType(ComponentType.FLOW_CONTROLLER); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java index 76ca661..d7fa3fc 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java @@ -41,8 +41,8 @@ public class LogRepositoryFactory { logRepositoryClass = clazz; } - public static LogRepository getRepository(final String processorId) { - LogRepository repository = repositoryMap.get(requireNonNull(processorId)); + public static LogRepository getRepository(final String componentId) { + LogRepository repository = repositoryMap.get(requireNonNull(componentId)); if (repository == null) { try { repository = logRepositoryClass.newInstance(); @@ -50,7 +50,7 @@ public class LogRepositoryFactory { throw new RuntimeException(e); } - final LogRepository oldRepository = repositoryMap.putIfAbsent(processorId, repository); + final LogRepository oldRepository = repositoryMap.putIfAbsent(componentId, repository); if (oldRepository != null) { repository = oldRepository; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 2ffdd4e..b6edbbb 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -137,11 +137,13 @@ import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; import org.apache.nifi.groups.StandardProcessGroup; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.logging.ControllerServiceLogObserver; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogRepository; import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.logging.ProcessorLogObserver; +import org.apache.nifi.logging.ReportingTaskLogObserver; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.nar.NarThreadContextClassLoader; @@ -593,7 +595,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R writeLock.lock(); try { if (startDelayedComponents) { - LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size())); + LOG.info("Starting {} processors/ports/funnels", startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()); for (final Connectable connectable : startConnectablesAfterInitialization) { if (connectable.getScheduledState() == ScheduledState.DISABLED) { continue; @@ -1012,7 +1014,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public boolean isTerminated() { this.readLock.lock(); try { - return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated()); + return null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated(); } finally { this.readLock.unlock(); } @@ -1828,9 +1830,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } else if (id1.equals(id2)) { return true; } else { - final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1); - final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2); - return (comparable1.equals(comparable2)); + final String comparable1 = id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1; + final String comparable2 = id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2; + return comparable1.equals(comparable2); } } @@ -1964,7 +1966,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id; - return (root == null) ? null : root.findProcessGroup(searchId); + return root == null ? null : root.findProcessGroup(searchId); } @Override @@ -2079,8 +2081,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut()); connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut()); - flowFilesTransferred += (connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut()); - bytesTransferred += (connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut()); + flowFilesTransferred += connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut(); + bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut(); } if (StringUtils.isNotBlank(conn.getName())) { @@ -2552,6 +2554,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } reportingTasks.put(id, taskNode); + + // Register log observer to provide bulletins when reporting task logs anything at WARN level or above + final LogRepository logRepository = LogRepositoryFactory.getRepository(id); + logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, + new ReportingTaskLogObserver(getBulletinRepository(), taskNode)); + return taskNode; } @@ -2616,7 +2624,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { - return controllerServiceProvider.createControllerService(type, id, firstTimeAdded); + final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded); + + // Register log observer to provide bulletins when reporting task logs anything at WARN level or above + final LogRepository logRepository = LogRepositoryFactory.getRepository(id); + logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, + new ControllerServiceLogObserver(getBulletinRepository(), serviceNode)); + + return serviceNode; } @Override @@ -3480,7 +3495,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (bulletin.getGroupId() == null) { escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage); } else { - escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), + escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(), bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage); } } else { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index 92fa3b2..b5c3855 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -48,11 +48,12 @@ public class ControllerServiceLoader { private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class); public static List<ControllerServiceNode> loadControllerServices( - final ControllerServiceProvider provider, - final InputStream serializedStream, - final StringEncryptor encryptor, - final BulletinRepository bulletinRepo, - final boolean autoResumeState) throws IOException { + final ControllerServiceProvider provider, + final InputStream serializedStream, + final StringEncryptor encryptor, + final BulletinRepository bulletinRepo, + final boolean autoResumeState) throws IOException { + final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); documentBuilderFactory.setNamespaceAware(true); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java index a20e974..5172d34 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.util.RingBuffer; import org.apache.nifi.util.RingBuffer.Filter; @@ -167,7 +168,7 @@ public class VolatileBulletinRepository implements BulletinRepository { } final RingBuffer<Bulletin> buffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY); - return (buffer == null) ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() { + return buffer == null ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() { @Override public boolean select(final Bulletin bulletin) { return bulletin.getTimestamp().getTime() >= fiveMinutesAgo; @@ -194,12 +195,12 @@ public class VolatileBulletinRepository implements BulletinRepository { } private RingBuffer<Bulletin> getBulletinBuffer(final Bulletin bulletin) { - final String groupId = getBulletinStoreKey(bulletin); + final String storageKey = getBulletinStoreKey(bulletin); - ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(groupId); + ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(storageKey); if (componentMap == null) { componentMap = new ConcurrentHashMap<>(); - ConcurrentMap<String, RingBuffer<Bulletin>> existing = bulletinStoreMap.putIfAbsent(groupId, componentMap); + final ConcurrentMap<String, RingBuffer<Bulletin>> existing = bulletinStoreMap.putIfAbsent(storageKey, componentMap); if (existing != null) { componentMap = existing; } @@ -221,11 +222,16 @@ public class VolatileBulletinRepository implements BulletinRepository { } private String getBulletinStoreKey(final Bulletin bulletin) { - return isControllerBulletin(bulletin) ? CONTROLLER_BULLETIN_STORE_KEY : bulletin.getGroupId(); + if (isControllerBulletin(bulletin)) { + return CONTROLLER_BULLETIN_STORE_KEY; + } + + final String groupId = bulletin.getGroupId(); + return groupId == null ? bulletin.getSourceId() : groupId; } private boolean isControllerBulletin(final Bulletin bulletin) { - return bulletin.getGroupId() == null; + return ComponentType.FLOW_CONTROLLER.equals(bulletin.getSourceType()); } private class DefaultBulletinProcessingStrategy implements BulletinProcessingStrategy { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java index 6f1dc45..17c5991 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java @@ -18,6 +18,8 @@ package org.apache.nifi.jaxb; import java.util.Date; +import org.apache.nifi.reporting.ComponentType; + /** * */ @@ -32,6 +34,7 @@ public class AdaptedBulletin { private String groupId; private String sourceId; private String sourceName; + private ComponentType sourceType; public String getCategory() { return category; @@ -97,4 +100,11 @@ public class AdaptedBulletin { this.timestamp = timestamp; } + public ComponentType getSourceType() { + return sourceType; + } + + public void setSourceType(ComponentType sourceType) { + this.sourceType = sourceType; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java index b699348..acbe0dd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java @@ -34,7 +34,7 @@ public class BulletinAdapter extends XmlAdapter<AdaptedBulletin, Bulletin> { if (b.getSourceId() == null) { return BulletinFactory.createBulletin(b.getCategory(), b.getLevel(), b.getMessage()); } else { - return BulletinFactory.createBulletin(b.getGroupId(), b.getSourceId(), b.getSourceName(), b.getCategory(), b.getLevel(), b.getMessage()); + return BulletinFactory.createBulletin(b.getGroupId(), b.getSourceId(), b.getSourceType(), b.getSourceName(), b.getCategory(), b.getLevel(), b.getMessage()); } } @@ -48,6 +48,7 @@ public class BulletinAdapter extends XmlAdapter<AdaptedBulletin, Bulletin> { aBulletin.setTimestamp(b.getTimestamp()); aBulletin.setGroupId(b.getGroupId()); aBulletin.setSourceId(b.getSourceId()); + aBulletin.setSourceType(b.getSourceType()); aBulletin.setSourceName(b.getSourceName()); aBulletin.setCategory(b.getCategory()); aBulletin.setLevel(b.getLevel()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java new file mode 100644 index 0000000..837e1c4 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.logging; + +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.events.BulletinFactory; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ComponentType; +import org.apache.nifi.reporting.Severity; + +public class ControllerServiceLogObserver implements LogObserver { + private final BulletinRepository bulletinRepository; + private final ControllerServiceNode serviceNode; + + public ControllerServiceLogObserver(final BulletinRepository bulletinRepository, final ControllerServiceNode serviceNode) { + this.bulletinRepository = bulletinRepository; + this.serviceNode = serviceNode; + } + + @Override + public void onLogMessage(final LogMessage message) { + // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever + // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are). + final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString(); + + final Bulletin bulletin = BulletinFactory.createBulletin(null, serviceNode.getIdentifier(), ComponentType.REPORTING_TASK, + serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage()); + bulletinRepository.addBulletin(bulletin); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java new file mode 100644 index 0000000..e5638d6 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.logging; + +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.events.BulletinFactory; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ComponentType; +import org.apache.nifi.reporting.Severity; + +public class ReportingTaskLogObserver implements LogObserver { + private final BulletinRepository bulletinRepository; + private final ReportingTaskNode taskNode; + + public ReportingTaskLogObserver(final BulletinRepository bulletinRepository, final ReportingTaskNode taskNode) { + this.bulletinRepository = bulletinRepository; + this.taskNode = taskNode; + } + + @Override + public void onLogMessage(final LogMessage message) { + // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever + // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are). + final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString(); + + final Bulletin bulletin = BulletinFactory.createBulletin(null, taskNode.getIdentifier(), ComponentType.REPORTING_TASK, + taskNode.getName(), "Log Message", bulletinLevel, message.getMessage()); + bulletinRepository.addBulletin(bulletin); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index d19b5c1..61516d0 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -57,6 +57,7 @@ import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; @@ -164,7 +165,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { final String groupId = StandardRemoteProcessGroup.this.getProcessGroup().getIdentifier(); final String sourceId = StandardRemoteProcessGroup.this.getIdentifier(); final String sourceName = StandardRemoteProcessGroup.this.getName(); - bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message)); + bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, ComponentType.REMOTE_PROCESS_GROUP, + sourceName, category, severity.name(), message)); } }; @@ -227,7 +229,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public String getName() { final String name = this.name.get(); - return (name == null) ? targetUri.toString() : name; + return name == null ? targetUri.toString() : name; } @Override @@ -671,7 +673,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private ProcessGroup getRootGroup(final ProcessGroup context) { final ProcessGroup parent = context.getParent(); - return (parent == null) ? context : getRootGroup(parent); + return parent == null ? context : getRootGroup(parent); } private boolean isWebApiSecure() { @@ -714,7 +716,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { public Date getLastRefreshTime() { readLock.lock(); try { - return (refreshContentsTimestamp == null) ? null : new Date(refreshContentsTimestamp); + return refreshContentsTimestamp == null ? null : new Date(refreshContentsTimestamp); } finally { readLock.unlock(); } @@ -855,7 +857,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { Set<RemoteProcessGroupPortDescriptor> remotePorts = null; if (ports != null) { remotePorts = new LinkedHashSet<>(ports.size()); - for (PortDTO port : ports) { + for (final PortDTO port : ports) { final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); final ScheduledState scheduledState = ScheduledState.valueOf(port.getState()); descriptor.setId(port.getId()); @@ -1093,7 +1095,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } final String remoteInstanceId = dto.getInstanceId(); - boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId); + final boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId); pointsToCluster.set(isPointingToCluster); } else if (statusCode == UNAUTHORIZED_STATUS_CODE) { try { @@ -1120,7 +1122,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { new Object[]{this, response.getStatus(), response.getStatusInfo().getReasonPhrase(), message}); authorizationIssue = "Unable to determine Site-to-Site availability."; } - } catch (Exception e) { + } catch (final Exception e) { logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e)); getEventReporter().reportEvent(Severity.WARNING, "Site to Site", String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this.getTargetUri().toString(), e)); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java index 4bb1683..9eadec0 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java @@ -61,6 +61,7 @@ import org.apache.nifi.remote.exception.TransmissionDisabledException; import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.ServerProtocol; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.user.NiFiUser; @@ -108,7 +109,8 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort final String groupId = StandardRootGroupPort.this.getProcessGroup().getIdentifier(); final String sourceId = StandardRootGroupPort.this.getIdentifier(); final String sourceName = StandardRootGroupPort.this.getName(); - bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message)); + final ComponentType componentType = direction == TransferDirection.RECEIVE ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT; + bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, componentType, sourceName, category, severity.name(), message)); } };