This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new d7ca37d NIFI-6723: Enrich processor-related and JVM GC metrics in Prometheus Reporting Task d7ca37d is described below commit d7ca37d065677e3016477f74ecdfe233ccd725c7 Author: Kotaro Terada <koter...@yahoo-corp.jp> AuthorDate: Fri Sep 27 09:53:39 2019 +0900 NIFI-6723: Enrich processor-related and JVM GC metrics in Prometheus Reporting Task Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #3771 --- .../prometheus/api/PrometheusMetricsUtil.java | 63 +++++++++++++++++++--- 1 file changed, 56 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java index 74b0ccc..fec3338 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java @@ -20,6 +20,7 @@ package org.apache.nifi.reporting.prometheus.api; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.Map; +import java.util.concurrent.TimeUnit; import io.prometheus.client.SimpleCollector; import org.apache.nifi.components.AllowableValue; @@ -47,7 +48,7 @@ public class PrometheusMetricsUtil { private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry(); private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry(); - // Process Group metrics + // Processor / Process Group metrics private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build() .name("nifi_amount_flowfiles_sent") .help("Total number of FlowFiles sent by the component") @@ -66,6 +67,12 @@ public class PrometheusMetricsUtil { .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(NIFI_REGISTRY); + private static final Gauge AMOUNT_FLOWFILES_REMOVED = Gauge.build() + .name("nifi_amount_flowfiles_removed") + .help("Total number of FlowFiles removed by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(NIFI_REGISTRY); + private static final Gauge AMOUNT_BYTES_SENT = Gauge.build() .name("nifi_amount_bytes_sent") .help("Total number of bytes sent by the component") @@ -150,6 +157,7 @@ public class PrometheusMetricsUtil { "source_id", "source_name", "destination_id", "destination_name") .register(NIFI_REGISTRY); + // Processor metrics private static final Gauge PROCESSOR_COUNTERS = Gauge.build() .name("nifi_processor_counters") .help("Counters exposed by NiFi Processors") @@ -252,6 +260,18 @@ public class PrometheusMetricsUtil { .labelNames("instance") .register(JVM_REGISTRY); + private static final Gauge JVM_GC_RUNS = Gauge.build() + .name("nifi_jvm_gc_runs") + .help("NiFi JVM GC number of runs") + .labelNames("instance", "gc_name") + .register(JVM_REGISTRY); + + private static final Gauge JVM_GC_TIME = Gauge.build() + .name("nifi_jvm_gc_time") + .help("NiFi JVM GC time in milliseconds") + .labelNames("instance", "gc_name") + .register(JVM_REGISTRY); + public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) { final String componentId = status.getId(); @@ -305,10 +325,10 @@ public class PrometheusMetricsUtil { if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) { // Report metrics for all components - for(ProcessorStatus processorStatus : status.getProcessorStatus()) { + for (ProcessorStatus processorStatus : status.getProcessorStatus()) { Map<String, Long> counters = processorStatus.getCounters(); - if(counters != null) { + if (counters != null) { counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS .labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue())); } @@ -317,13 +337,36 @@ public class PrometheusMetricsUtil { final String procComponentId = processorStatus.getId(); final String procComponentName = processorStatus.getName(); final String parentId = processorStatus.getGroupId(); + + AMOUNT_FLOWFILES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesSent()); + AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesReceived()); + AMOUNT_FLOWFILES_REMOVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesRemoved()); + + AMOUNT_BYTES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesSent()); + AMOUNT_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesRead()); + AMOUNT_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesWritten()); + AMOUNT_BYTES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesReceived()); + + SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "") + .set(processorStatus.getOutputBytes()); + SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "") + .set(processorStatus.getInputBytes()); + + AMOUNT_ITEMS_OUTPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "") + .set(processorStatus.getOutputCount()); + AMOUNT_ITEMS_INPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "") + .set(processorStatus.getInputCount()); + + AVERAGE_LINEAGE_DURATION.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "") + .set(processorStatus.getAverageLineageDuration()); + AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId) .set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount()); AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId) .set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount()); } - for(ConnectionStatus connectionStatus : status.getConnectionStatus()) { + for (ConnectionStatus connectionStatus : status.getConnectionStatus()) { final String connComponentId = connectionStatus.getId(); final String connComponentName = connectionStatus.getName(); final String sourceId = connectionStatus.getSourceId(); @@ -355,7 +398,7 @@ public class PrometheusMetricsUtil { IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) .set(isBackpressureEnabled ? 1 : 0); } - for(PortStatus portStatus : status.getInputPortStatus()) { + for (PortStatus portStatus : status.getInputPortStatus()) { final String portComponentId = portStatus.getId(); final String portComponentName = portStatus.getName(); final String parentId = portStatus.getGroupId(); @@ -379,7 +422,7 @@ public class PrometheusMetricsUtil { AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); } - for(PortStatus portStatus : status.getOutputPortStatus()) { + for (PortStatus portStatus : status.getOutputPortStatus()) { final String portComponentId = portStatus.getId(); final String portComponentName = portStatus.getName(); final String parentId = portStatus.getGroupId(); @@ -403,7 +446,7 @@ public class PrometheusMetricsUtil { AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); } - for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) { + for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) { final String rpgComponentId = remoteProcessGroupStatus.getId(); final String rpgComponentName = remoteProcessGroupStatus.getName(); final String parentId = remoteProcessGroupStatus.getGroupId(); @@ -446,6 +489,12 @@ public class PrometheusMetricsUtil { JVM_UPTIME.labels(instanceId).set(jvmMetrics.uptime()); JVM_FILE_DESCRIPTOR_USAGE.labels(instanceId).set(jvmMetrics.fileDescriptorUsage()); + jvmMetrics.garbageCollectors() + .forEach((name, stat) -> { + JVM_GC_RUNS.labels(instanceId, name).set(stat.getRuns()); + JVM_GC_TIME.labels(instanceId, name).set(stat.getTime(TimeUnit.MILLISECONDS)); + }); + return JVM_REGISTRY; }