This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cd880fb69e51dd37f58614355be8a1a65d44540f Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Wed Oct 11 09:10:49 2017 +0200 [FLINK-7812][metrics] Add system resources metrics This closes #4801. --- docs/_includes/generated/metric_configuration.html | 10 + docs/monitoring/metrics.md | 144 +++++++++++++ .../flink/configuration/ConfigurationUtils.java | 19 ++ .../apache/flink/configuration/MetricOptions.java | 14 ++ flink-runtime/pom.xml | 7 + .../runtime/entrypoint/ClusterEntrypoint.java | 5 +- .../flink/runtime/metrics/util/MetricUtils.java | 16 +- .../metrics/util/SystemResourcesCounter.java | 236 +++++++++++++++++++++ .../util/SystemResourcesMetricsInitializer.java | 101 +++++++++ .../flink/runtime/minicluster/MiniCluster.java | 6 +- .../runtime/taskexecutor/TaskManagerRunner.java | 3 +- .../TaskManagerServicesConfiguration.java | 19 +- .../flink/runtime/jobmanager/JobManager.scala | 3 +- .../minicluster/LocalFlinkMiniCluster.scala | 5 +- .../flink/runtime/taskmanager/TaskManager.scala | 3 +- .../runtime/metrics/TaskManagerMetricsTest.java | 3 +- .../metrics/utils/SystemResourcesCounterTest.java | 71 +++++++ .../taskexecutor/NetworkBufferCalculationTest.java | 4 +- flink-tests/pom.xml | 6 + .../metrics/SystemResourcesMetricsITCase.java | 142 +++++++++++++ pom.xml | 8 +- 21 files changed, 811 insertions(+), 14 deletions(-) diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index aef8fbb..98054e9 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -67,5 +67,15 @@ <td style="word-wrap: break-word;">"<host>.taskmanager.<tm_id>.<job_name>"</td> <td>Defines the scope format string that is applied to all metrics scoped to a job on a TaskManager.</td> </tr> + <tr> + <td><h5>metrics.system-resource</h5></td> + <td style="word-wrap: break-word;">false</td> + <td></td> + </tr> + <tr> + <td><h5>metrics.system-resource-probing-interval</h5></td> + <td style="word-wrap: break-word;">5000</td> + <td></td> + </tr> </tbody> </table> diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 55f626e..554e1c5 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1396,6 +1396,150 @@ Thus, in order to infer the metric identifier: </tbody> </table> +### System resources + +System resources reporting is disabled by default. When `metrics.system-resource` +is enabled additional metrics listed below will be available on Job- and TaskManager. +System resources metrics are updated periodically and they present average values for a +configured interval (`metrics.system-resource-probing-interval`). + +System resources reporting requires an optional dependency to be present on the +classpath (for example placed in Flink's `lib` directory): + + - `com.github.oshi:oshi-core:3.4.0` (licensed under EPL 1.0 license) + +Including it's transitive dependencies: + + - `net.java.dev.jna:jna-platform:jar:4.2.2` + - `net.java.dev.jna:jna:jar:4.2.2` + +Failures in this regard will be reported as warning messages like `NoClassDefFoundError` +logged by `SystemResourcesMetricsInitializer` during the startup. + +#### System CPU + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Scope</th> + <th class="text-left" style="width: 25%">Infix</th> + <th class="text-left" style="width: 23%">Metrics</th> + <th class="text-left" style="width: 32%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <th rowspan="12"><strong>Job-/TaskManager</strong></th> + <td rowspan="12">System.CPU</td> + <td>Usage</td> + <td>Overall % of CPU usage on the machine.</td> + </tr> + <tr> + <td>Idle</td> + <td>% of CPU Idle usage on the machine.</td> + </tr> + <tr> + <td>Sys</td> + <td>% of System CPU usage on the machine.</td> + </tr> + <tr> + <td>User</td> + <td>% of User CPU usage on the machine.</td> + </tr> + <tr> + <td>IOWait</td> + <td>% of IOWait CPU usage on the machine.</td> + </tr> + <tr> + <td>Irq</td> + <td>% of Irq CPU usage on the machine.</td> + </tr> + <tr> + <td>SoftIrq</td> + <td>% of SoftIrq CPU usage on the machine.</td> + </tr> + <tr> + <td>Nice</td> + <td>% of Nice Idle usage on the machine.</td> + </tr> + <tr> + <td>Load1min</td> + <td>Average CPU load over 1 minute</td> + </tr> + <tr> + <td>Load5min</td> + <td>Average CPU load over 5 minute</td> + </tr> + <tr> + <td>Load15min</td> + <td>Average CPU load over 15 minute</td> + </tr> + <tr> + <td>UsageCPU*</td> + <td>% of CPU usage per each processor</td> + </tr> + </tbody> +</table> + +#### System memory + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Scope</th> + <th class="text-left" style="width: 25%">Infix</th> + <th class="text-left" style="width: 23%">Metrics</th> + <th class="text-left" style="width: 32%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <th rowspan="4"><strong>Job-/TaskManager</strong></th> + <td rowspan="2">System.Memory</td> + <td>Available</td> + <td>Available memory in bytes</td> + </tr> + <tr> + <td>Total</td> + <td>Total memory in bytes</td> + </tr> + <tr> + <td rowspan="2">System.Swap</td> + <td>Used</td> + <td>Used swap bytes</td> + </tr> + <tr> + <td>Total</td> + <td>Total swap in bytes</td> + </tr> + </tbody> +</table> + +#### System network + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Scope</th> + <th class="text-left" style="width: 25%">Infix</th> + <th class="text-left" style="width: 23%">Metrics</th> + <th class="text-left" style="width: 32%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <th rowspan="2"><strong>Job-/TaskManager</strong></th> + <td rowspan="2">System.Network.INTERFACE_NAME</td> + <td>ReceiveRate</td> + <td>Average receive rate in bytes per second</td> + </tr> + <tr> + <td>SendRate</td> + <td>Average send rate in bytes per second</td> + </tr> + </tbody> +</table> + ## Latency tracking Flink allows to track the latency of records traveling through the system. To enable the latency tracking diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index 1b30821..7b717bd 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -18,12 +18,18 @@ package org.apache.flink.configuration; +import org.apache.flink.api.common.time.Time; + import javax.annotation.Nonnull; import java.io.File; +import java.util.Optional; import java.util.Properties; import java.util.Set; +import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS; +import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL; + /** * Utility class for {@link Configuration} related helper functions. */ @@ -70,6 +76,19 @@ public class ConfigurationUtils { } /** + * @return extracted {@link MetricOptions#SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL} or {@code Optional.empty()} if + * {@link MetricOptions#SYSTEM_RESOURCE_METRICS} are disabled. + */ + public static Optional<Time> getSystemResourceMetricsProbingInterval(Configuration configuration) { + if (!configuration.getBoolean(SYSTEM_RESOURCE_METRICS)) { + return Optional.empty(); + } else { + return Optional.of(Time.milliseconds( + configuration.getLong(SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL))); + } + } + + /** * Extracts the task manager directories for temporary files as defined by * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}. * diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 3b11645..f9fd024 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -110,6 +110,20 @@ public class MetricOptions { .defaultValue(128) .withDescription("Defines the number of measured latencies to maintain at each operator."); + /** + * Whether Flink should report system resource metrics such as machine's CPU, memory or network usage. + */ + public static final ConfigOption<Boolean> SYSTEM_RESOURCE_METRICS = + key("metrics.system-resource") + .defaultValue(false); + /** + * Interval between probing of system resource metrics specified in milliseconds. Has an effect only when + * {@link #SYSTEM_RESOURCE_METRICS} is enabled. + */ + public static final ConfigOption<Long> SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL = + key("metrics.system-resource-probing-interval") + .defaultValue(5000L); + private MetricOptions() { } } diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index e163881..bc4a3cb 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -306,6 +306,13 @@ under the License. <groupId>org.reflections</groupId> <artifactId>reflections</artifactId> </dependency> + + <!-- Used only for additional logging. Optional because of unclear EPL 1.0 license compatibility. --> + <dependency> + <groupId>com.github.oshi</groupId> + <artifactId>oshi-core</artifactId> + <optional>true</optional> + </dependency> </dependencies> <!-- Dependency Management to converge transitive dependency versions --> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index b429de5..ddd3751c 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -346,7 +346,10 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { clusterInformation, webMonitorEndpoint.getRestBaseUrl()); - jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress()); + jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( + metricRegistry, + rpcService.getAddress(), + ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index 367979e..b150631 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.metrics.util; +import org.apache.flink.api.common.time.Time; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.network.NetworkEnvironment; @@ -45,6 +46,9 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; import java.lang.management.ThreadMXBean; import java.util.List; +import java.util.Optional; + +import static org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics; /** * Utility class to register pre-defined metric sets. @@ -58,7 +62,8 @@ public class MetricUtils { public static JobManagerMetricGroup instantiateJobManagerMetricGroup( final MetricRegistry metricRegistry, - final String hostname) { + final String hostname, + final Optional<Time> systemResourceProbeInterval) { final JobManagerMetricGroup jobManagerMetricGroup = new JobManagerMetricGroup( metricRegistry, hostname); @@ -68,13 +73,17 @@ public class MetricUtils { // initialize the JM metrics instantiateStatusMetrics(statusGroup); + if (systemResourceProbeInterval.isPresent()) { + instantiateSystemMetrics(jobManagerMetricGroup, systemResourceProbeInterval.get()); + } return jobManagerMetricGroup; } public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup( MetricRegistry metricRegistry, TaskManagerLocation taskManagerLocation, - NetworkEnvironment network) { + NetworkEnvironment network, + Optional<Time> systemResourceProbeInterval) { final TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup( metricRegistry, taskManagerLocation.getHostname(), @@ -89,6 +98,9 @@ public class MetricUtils { .addGroup("Network"); instantiateNetworkMetrics(networkGroup, network); + if (systemResourceProbeInterval.isPresent()) { + instantiateSystemMetrics(taskManagerMetricGroup, systemResourceProbeInterval.get()); + } return taskManagerMetricGroup; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesCounter.java new file mode 100644 index 0000000..7365617 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesCounter.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.util; + +import org.apache.flink.api.common.time.Time; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import oshi.SystemInfo; +import oshi.hardware.CentralProcessor; +import oshi.hardware.CentralProcessor.TickType; +import oshi.hardware.HardwareAbstractionLayer; +import oshi.hardware.NetworkIF; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Daemon thread probing system resources. + * + * <p>To accurately and consistently report CPU and network usage we have to periodically probe + * CPU ticks and network sent/received bytes and then convert those values to CPU usage and + * send/receive byte rates. + */ +@ThreadSafe +public class SystemResourcesCounter extends Thread { + + private static final Logger LOG = LoggerFactory.getLogger(SystemResourcesCounter.class); + + private final long probeIntervalMs; + private final SystemInfo systemInfo = new SystemInfo(); + private final HardwareAbstractionLayer hardwareAbstractionLayer = systemInfo.getHardware(); + + private volatile boolean running = true; + + private long[] previousCpuTicks; + private long[] bytesReceivedPerInterface; + private long[] bytesSentPerInterface; + + private volatile double cpuUser; + private volatile double cpuNice; + private volatile double cpuSys; + private volatile double cpuIdle; + private volatile double cpuIOWait; + private volatile double cpuIrq; + private volatile double cpuSoftIrq; + private volatile double cpuUsage; + + private volatile double cpuLoad1; + private volatile double cpuLoad5; + private volatile double cpuLoad15; + + private AtomicReferenceArray<Double> cpuUsagePerProcessor; + + private final String[] networkInterfaceNames; + + private AtomicLongArray receiveRatePerInterface; + private AtomicLongArray sendRatePerInterface; + + public SystemResourcesCounter(Time probeInterval) { + probeIntervalMs = probeInterval.toMilliseconds(); + checkState(this.probeIntervalMs > 0); + + setName(SystemResourcesCounter.class.getSimpleName() + " probing thread"); + + cpuUsagePerProcessor = new AtomicReferenceArray<>(hardwareAbstractionLayer.getProcessor().getLogicalProcessorCount()); + + NetworkIF[] networkIFs = hardwareAbstractionLayer.getNetworkIFs(); + bytesReceivedPerInterface = new long[networkIFs.length]; + bytesSentPerInterface = new long[networkIFs.length]; + receiveRatePerInterface = new AtomicLongArray(networkIFs.length); + sendRatePerInterface = new AtomicLongArray(networkIFs.length); + networkInterfaceNames = new String[networkIFs.length]; + + for (int i = 0; i < networkInterfaceNames.length; i++) { + networkInterfaceNames[i] = networkIFs[i].getName(); + } + } + + @Override + public void run() { + try { + while (running) { + calculateCPUUsage(hardwareAbstractionLayer.getProcessor()); + calculateNetworkUsage(hardwareAbstractionLayer.getNetworkIFs()); + Thread.sleep(probeIntervalMs); + } + } catch (InterruptedException e) { + if (running) { + LOG.warn("{} has failed", SystemResourcesCounter.class.getSimpleName(), e); + } + } + } + + public void shutdown() throws InterruptedException { + running = false; + interrupt(); + join(); + } + + public double getCpuUser() { + return cpuUser; + } + + public double getCpuNice() { + return cpuNice; + } + + public double getCpuSys() { + return cpuSys; + } + + public double getCpuIdle() { + return cpuIdle; + } + + public double getIOWait() { + return cpuIOWait; + } + + public double getCpuIrq() { + return cpuIrq; + } + + public double getCpuSoftIrq() { + return cpuSoftIrq; + } + + public double getCpuUsage() { + return cpuUsage; + } + + public double getCpuLoad1() { + return cpuLoad1; + } + + public double getCpuLoad5() { + return cpuLoad5; + } + + public double getCpuLoad15() { + return cpuLoad15; + } + + public int getProcessorsCount() { + return cpuUsagePerProcessor.length(); + } + + public double getCpuUsagePerProcessor(int processor) { + return cpuUsagePerProcessor.get(processor); + } + + public String[] getNetworkInterfaceNames() { + return networkInterfaceNames; + } + + public long getReceiveRatePerInterface(int interfaceNo) { + return receiveRatePerInterface.get(interfaceNo); + } + + public long getSendRatePerInterface(int interfaceNo) { + return sendRatePerInterface.get(interfaceNo); + } + + private void calculateCPUUsage(CentralProcessor processor) { + long[] ticks = processor.getSystemCpuLoadTicks(); + if (this.previousCpuTicks == null) { + this.previousCpuTicks = ticks; + } + + long userTicks = ticks[TickType.USER.getIndex()] - previousCpuTicks[TickType.USER.getIndex()]; + long niceTicks = ticks[TickType.NICE.getIndex()] - previousCpuTicks[TickType.NICE.getIndex()]; + long sysTicks = ticks[TickType.SYSTEM.getIndex()] - previousCpuTicks[TickType.SYSTEM.getIndex()]; + long idleTicks = ticks[TickType.IDLE.getIndex()] - previousCpuTicks[TickType.IDLE.getIndex()]; + long iowaitTicks = ticks[TickType.IOWAIT.getIndex()] - previousCpuTicks[TickType.IOWAIT.getIndex()]; + long irqTicks = ticks[TickType.IRQ.getIndex()] - previousCpuTicks[TickType.IRQ.getIndex()]; + long softIrqTicks = ticks[TickType.SOFTIRQ.getIndex()] - previousCpuTicks[TickType.SOFTIRQ.getIndex()]; + long totalCpuTicks = userTicks + niceTicks + sysTicks + idleTicks + iowaitTicks + irqTicks + softIrqTicks; + this.previousCpuTicks = ticks; + + cpuUser = 100d * userTicks / totalCpuTicks; + cpuNice = 100d * niceTicks / totalCpuTicks; + cpuSys = 100d * sysTicks / totalCpuTicks; + cpuIdle = 100d * idleTicks / totalCpuTicks; + cpuIOWait = 100d * iowaitTicks / totalCpuTicks; + cpuIrq = 100d * irqTicks / totalCpuTicks; + cpuSoftIrq = 100d * softIrqTicks / totalCpuTicks; + + cpuUsage = processor.getSystemCpuLoad() * 100; + + double[] loadAverage = processor.getSystemLoadAverage(3); + cpuLoad1 = (loadAverage[0] < 0 ? Double.NaN : loadAverage[0]); + cpuLoad5 = (loadAverage[1] < 0 ? Double.NaN : loadAverage[1]); + cpuLoad15 = (loadAverage[2] < 0 ? Double.NaN : loadAverage[2]); + + double[] load = processor.getProcessorCpuLoadBetweenTicks(); + checkState(load.length == cpuUsagePerProcessor.length()); + for (int i = 0; i < load.length; i++) { + cpuUsagePerProcessor.set(i, load[i] * 100); + } + } + + private void calculateNetworkUsage(NetworkIF[] networkIFs) { + checkState(networkIFs.length == receiveRatePerInterface.length()); + + for (int i = 0; i < networkIFs.length; i++) { + NetworkIF networkIF = networkIFs[i]; + networkIF.updateNetworkStats(); + + receiveRatePerInterface.set(i, (networkIF.getBytesRecv() - bytesReceivedPerInterface[i]) * 1000 / probeIntervalMs); + sendRatePerInterface.set(i, (networkIF.getBytesSent() - bytesSentPerInterface[i]) * 1000 / probeIntervalMs); + + bytesReceivedPerInterface[i] = networkIF.getBytesRecv(); + bytesSentPerInterface[i] = networkIF.getBytesSent(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesMetricsInitializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesMetricsInitializer.java new file mode 100644 index 0000000..01a9347 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesMetricsInitializer.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.util; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import oshi.SystemInfo; +import oshi.hardware.GlobalMemory; +import oshi.hardware.HardwareAbstractionLayer; + +/** + * Utility class to initialize system resource metrics. + */ +public class SystemResourcesMetricsInitializer { + private static final Logger LOG = LoggerFactory.getLogger(SystemResourcesMetricsInitializer.class); + + public static void instantiateSystemMetrics(MetricGroup metricGroup, Time probeInterval) { + try { + MetricGroup system = metricGroup.addGroup("System"); + + SystemResourcesCounter systemResourcesCounter = new SystemResourcesCounter(probeInterval); + systemResourcesCounter.start(); + + SystemInfo systemInfo = new SystemInfo(); + HardwareAbstractionLayer hardwareAbstractionLayer = systemInfo.getHardware(); + + instantiateMemoryMetrics(system.addGroup("Memory"), hardwareAbstractionLayer.getMemory()); + instantiateSwapMetrics(system.addGroup("Swap"), hardwareAbstractionLayer.getMemory()); + instantiateCPUMetrics(system.addGroup("CPU"), systemResourcesCounter); + instantiateNetworkMetrics(system.addGroup("Network"), systemResourcesCounter); + } + catch (NoClassDefFoundError ex) { + LOG.warn( + "Failed to initialize system resource metrics because of missing class definitions." + + " Did you forget to explicitly add the oshi-core optional dependency?", + ex); + } + } + + private static void instantiateMemoryMetrics(MetricGroup metrics, GlobalMemory memory) { + metrics.<Long, Gauge<Long>>gauge("Available", memory::getAvailable); + metrics.<Long, Gauge<Long>>gauge("Total", memory::getTotal); + } + + private static void instantiateSwapMetrics(MetricGroup metrics, GlobalMemory memory) { + metrics.<Long, Gauge<Long>>gauge("Used", memory::getSwapUsed); + metrics.<Long, Gauge<Long>>gauge("Total", memory::getSwapTotal); + } + + private static void instantiateCPUMetrics(MetricGroup metrics, SystemResourcesCounter usageCounter) { + metrics.<Double, Gauge<Double>>gauge("Usage", usageCounter::getCpuUsage); + metrics.<Double, Gauge<Double>>gauge("Idle", usageCounter::getCpuIdle); + metrics.<Double, Gauge<Double>>gauge("Sys", usageCounter::getCpuSys); + metrics.<Double, Gauge<Double>>gauge("User", usageCounter::getCpuUser); + metrics.<Double, Gauge<Double>>gauge("IOWait", usageCounter::getIOWait); + metrics.<Double, Gauge<Double>>gauge("Nice", usageCounter::getCpuNice); + metrics.<Double, Gauge<Double>>gauge("Irq", usageCounter::getCpuIrq); + metrics.<Double, Gauge<Double>>gauge("SoftIrq", usageCounter::getCpuSoftIrq); + + metrics.<Double, Gauge<Double>>gauge("Load1min", usageCounter::getCpuLoad1); + metrics.<Double, Gauge<Double>>gauge("Load5min", usageCounter::getCpuLoad5); + metrics.<Double, Gauge<Double>>gauge("Load15min", usageCounter::getCpuLoad15); + + for (int i = 0; i < usageCounter.getProcessorsCount(); i++) { + final int processor = i; + metrics.<Double, Gauge<Double>>gauge( + String.format("UsageCPU%d", processor), + () -> usageCounter.getCpuUsagePerProcessor(processor)); + } + } + + private static void instantiateNetworkMetrics(MetricGroup metrics, SystemResourcesCounter usageCounter) { + for (int i = 0; i < usageCounter.getNetworkInterfaceNames().length; i++) { + MetricGroup interfaceGroup = metrics.addGroup(usageCounter.getNetworkInterfaceNames()[i]); + + final int interfaceNo = i; + interfaceGroup.<Long, Gauge<Long>>gauge("ReceiveRate", () -> usageCounter.getReceiveRatePerInterface(interfaceNo)); + interfaceGroup.<Long, Gauge<Long>>gauge("SendRate", () -> usageCounter.getSendRatePerInterface(interfaceNo)); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 97ab5a5..8054a38 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobCacheService; @@ -354,7 +355,10 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { // bring up the dispatcher that launches JobManagers when jobs submitted LOG.info("Starting job dispatcher(s) for JobManger"); - this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost"); + this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( + metricRegistry, + "localhost", + ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index c1f0cc8..9ab7f80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -367,7 +367,8 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( metricRegistry, taskManagerServices.getTaskManagerLocation(), - taskManagerServices.getNetworkEnvironment()); + taskManagerServices.getNetworkEnvironment(), + taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval()); TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 86bc46d..eec39ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Iterator; +import java.util.Optional; import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES; import static org.apache.flink.util.MathUtils.checkedDownCast; @@ -83,6 +85,10 @@ public class TaskManagerServicesConfiguration { private final boolean localRecoveryEnabled; + private boolean systemResourceMetricsEnabled; + + private Optional<Time> systemResourceMetricsProbingInterval; + public TaskManagerServicesConfiguration( InetAddress taskManagerAddress, String[] tmpDirPaths, @@ -95,7 +101,8 @@ public class TaskManagerServicesConfiguration { MemoryType memoryType, boolean preAllocateMemory, float memoryFraction, - long timerServiceShutdownTimeout) { + long timerServiceShutdownTimeout, + Optional<Time> systemResourceMetricsProbingInterval) { this.taskManagerAddress = checkNotNull(taskManagerAddress); this.tmpDirPaths = checkNotNull(tmpDirPaths); @@ -113,6 +120,9 @@ public class TaskManagerServicesConfiguration { checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " + "service shutdown timeout must be greater or equal to 0."); this.timerServiceShutdownTimeout = timerServiceShutdownTimeout; + + this.systemResourceMetricsEnabled = systemResourceMetricsEnabled; + this.systemResourceMetricsProbingInterval = checkNotNull(systemResourceMetricsProbingInterval); } // -------------------------------------------------------------------------------------------- @@ -179,6 +189,10 @@ public class TaskManagerServicesConfiguration { return timerServiceShutdownTimeout; } + public Optional<Time> getSystemResourceMetricsProbingInterval() { + return systemResourceMetricsProbingInterval; + } + // -------------------------------------------------------------------------------------------- // Parsing of Flink configuration // -------------------------------------------------------------------------------------------- @@ -276,7 +290,8 @@ public class TaskManagerServicesConfiguration { memType, preAllocateMemory, memoryFraction, - timerServiceShutdownTimeout); + timerServiceShutdownTimeout, + ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); } // -------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 2a8f492..0855991 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2519,7 +2519,8 @@ object JobManager { val jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, - configuration.getString(JobManagerOptions.ADDRESS)) + configuration.getString(JobManagerOptions.ADDRESS), + ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)) (instanceManager, scheduler, diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index fb57861..6b9e4a8 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -47,7 +47,7 @@ import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.messages.JobManagerMessages import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse} import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, TaskManagerMetricGroup} -import org.apache.flink.runtime.metrics.util.MetricUtils +import org.apache.flink.runtime.metrics.util.{MetricUtils, SystemResourcesMetricsInitializer} import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} @@ -246,7 +246,8 @@ class LocalFlinkMiniCluster( val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( metricRegistryOpt.get, taskManagerServices.getTaskManagerLocation(), - taskManagerServices.getNetworkEnvironment()) + taskManagerServices.getNetworkEnvironment(), + taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval) val props = getTaskManagerProps( taskManagerClass, diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 62fe862..1de4848 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -2019,7 +2019,8 @@ object TaskManager { val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( metricRegistry, taskManagerServices.getTaskManagerLocation(), - taskManagerServices.getNetworkEnvironment()) + taskManagerServices.getNetworkEnvironment(), + taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval) // create the actor properties (which define the actor constructor parameters) val tmProps = getTaskManagerProps( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index db04023..4d18060 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -106,7 +106,8 @@ public class TaskManagerMetricsTest extends TestLogger { TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( metricRegistry, taskManagerServices.getTaskManagerLocation(), - taskManagerServices.getNetworkEnvironment()); + taskManagerServices.getNetworkEnvironment(), + taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval()); // create the task manager final Props tmProps = TaskManager.getTaskManagerProps( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/utils/SystemResourcesCounterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/utils/SystemResourcesCounterTest.java new file mode 100644 index 0000000..f6c228d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/utils/SystemResourcesCounterTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.utils; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.metrics.util.SystemResourcesCounter; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +/** + * Tests for {@link SystemResourcesCounter}. + */ +public class SystemResourcesCounterTest { + + private static final double EPSILON = 0.01; + + @Test + public void testObtainAnyMetrics() throws InterruptedException { + SystemResourcesCounter systemResources = new SystemResourcesCounter(Time.milliseconds(10)); + double initialCpuIdle = systemResources.getCpuIdle(); + + systemResources.start(); + // wait for stats to update/calculate + try { + double cpuIdle; + do { + Thread.sleep(1); + cpuIdle = systemResources.getCpuIdle(); + } + while (initialCpuIdle == cpuIdle || Double.isNaN(cpuIdle) || cpuIdle == 0.0); + } + finally { + systemResources.shutdown(); + systemResources.join(); + } + + double totalCpuUsage = systemResources.getCpuIrq() + + systemResources.getCpuNice() + + systemResources.getCpuSoftIrq() + + systemResources.getCpuSys() + + systemResources.getCpuUser() + + systemResources.getIOWait(); + + assertTrue( + "There should be at least one processor", + systemResources.getProcessorsCount() > 0); + assertTrue( + "There should be at least one network interface", + systemResources.getNetworkInterfaceNames().length > 0); + assertEquals(100.0, totalCpuUsage + systemResources.getCpuIdle(), EPSILON); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java index 6374cf8..586f937 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java @@ -27,6 +27,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; import java.net.InetAddress; +import java.util.Optional; import static org.apache.flink.util.MathUtils.checkedDownCast; import static org.junit.Assert.assertEquals; @@ -108,6 +109,7 @@ public class NetworkBufferCalculationTest extends TestLogger { memType, false, managedMemoryFraction, - 0); + 0, + Optional.empty()); } } diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index f9746e1..80cf8de 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -231,6 +231,12 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>com.github.oshi</groupId> + <artifactId>oshi-core</artifactId> + <scope>test</scope> + </dependency> + <!-- utility to scan classpaths --> <dependency> <groupId>org.reflections</groupId> diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java new file mode 100644 index 0000000..6d9a7b0 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST; +import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for proper initialization of the system resource metrics. + */ +public class SystemResourcesMetricsITCase { + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + private static Configuration getConfiguration() { + Configuration configuration = new Configuration(); + configuration.setBoolean(SYSTEM_RESOURCE_METRICS, true); + configuration.setString(REPORTERS_LIST, "test_reporter"); + configuration.setString("metrics.reporter.test_reporter.class", TestReporter.class.getName()); + return configuration; + } + + @Test + public void startTaskManagerAndCheckForRegisteredSystemMetrics() throws Exception { + assertEquals(1, TestReporter.OPENED_REPORTERS.size()); + TestReporter reporter = TestReporter.OPENED_REPORTERS.iterator().next(); + + List<String> expectedPatterns = getExpectedPatterns(); + + Collection<String> gaugeNames = reporter.getGauges().values(); + + for (String expectedPattern : expectedPatterns) { + boolean found = false; + for (String gaugeName : gaugeNames) { + if (gaugeName.matches(expectedPattern)) { + found = true; + } + } + if (!found) { + fail(String.format("Failed to find gauge [%s] in registered gauges [%s]", expectedPattern, gaugeNames)); + } + } + } + + private static List<String> getExpectedPatterns() { + String[] expectedGauges = new String[] { + "System.CPU.Idle", + "System.CPU.Sys", + "System.CPU.User", + "System.CPU.IOWait", + "System.CPU.Irq", + "System.CPU.SoftIrq", + "System.CPU.Nice", + "System.Memory.Available", + "System.Memory.Total", + "System.Swap.Used", + "System.Swap.Total", + "System.Network.*ReceiveRate", + "System.Network.*SendRate" + }; + + String[] expectedHosts = new String[] { + "localhost.taskmanager.([a-f0-9\\\\-])*.", + "localhost.jobmanager." + }; + + List<String> patterns = new ArrayList<>(); + for (String expectedHost : expectedHosts) { + for (String expectedGauge : expectedGauges) { + patterns.add(expectedHost + expectedGauge); + } + } + return patterns; + } + + /** + * Test metric reporter that exposes registered metrics. + */ + public static final class TestReporter extends AbstractReporter { + public static final Set<TestReporter> OPENED_REPORTERS = ConcurrentHashMap.newKeySet(); + + @Override + public String filterCharacters(String input) { + return input; + } + + @Override + public void open(MetricConfig config) { + OPENED_REPORTERS.add(this); + } + + @Override + public void close() { + OPENED_REPORTERS.remove(this); + } + + public Map<Gauge<?>, String> getGauges() { + return gauges; + } + } +} diff --git a/pom.xml b/pom.xml index 434f577..35b57da 100644 --- a/pom.xml +++ b/pom.xml @@ -299,8 +299,14 @@ under the License. <version>1.1.4</version> </dependency> + <dependency> + <groupId>com.github.oshi</groupId> + <artifactId>oshi-core</artifactId> + <version>3.4.0</version> + </dependency> + <!-- Make sure we use a consistent avro version between Flink and Hadoop --> - <dependency> + <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>${avro.version}</version>