This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cd880fb69e51dd37f58614355be8a1a65d44540f
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Wed Oct 11 09:10:49 2017 +0200

    [FLINK-7812][metrics] Add system resources metrics
    
    This closes #4801.
---
 docs/_includes/generated/metric_configuration.html |  10 +
 docs/monitoring/metrics.md                         | 144 +++++++++++++
 .../flink/configuration/ConfigurationUtils.java    |  19 ++
 .../apache/flink/configuration/MetricOptions.java  |  14 ++
 flink-runtime/pom.xml                              |   7 +
 .../runtime/entrypoint/ClusterEntrypoint.java      |   5 +-
 .../flink/runtime/metrics/util/MetricUtils.java    |  16 +-
 .../metrics/util/SystemResourcesCounter.java       | 236 +++++++++++++++++++++
 .../util/SystemResourcesMetricsInitializer.java    | 101 +++++++++
 .../flink/runtime/minicluster/MiniCluster.java     |   6 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    |   3 +-
 .../TaskManagerServicesConfiguration.java          |  19 +-
 .../flink/runtime/jobmanager/JobManager.scala      |   3 +-
 .../minicluster/LocalFlinkMiniCluster.scala        |   5 +-
 .../flink/runtime/taskmanager/TaskManager.scala    |   3 +-
 .../runtime/metrics/TaskManagerMetricsTest.java    |   3 +-
 .../metrics/utils/SystemResourcesCounterTest.java  |  71 +++++++
 .../taskexecutor/NetworkBufferCalculationTest.java |   4 +-
 flink-tests/pom.xml                                |   6 +
 .../metrics/SystemResourcesMetricsITCase.java      | 142 +++++++++++++
 pom.xml                                            |   8 +-
 21 files changed, 811 insertions(+), 14 deletions(-)

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index aef8fbb..98054e9 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -67,5 +67,15 @@
             <td style="word-wrap: 
break-word;">"&lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;"</td>
             <td>Defines the scope format string that is applied to all metrics 
scoped to a job on a TaskManager.</td>
         </tr>
+        <tr>
+            <td><h5>metrics.system-resource</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>metrics.system-resource-probing-interval</h5></td>
+            <td style="word-wrap: break-word;">5000</td>
+            <td></td>
+        </tr>
     </tbody>
 </table>
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 55f626e..554e1c5 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1396,6 +1396,150 @@ Thus, in order to infer the metric identifier:
   </tbody>
 </table>
 
+### System resources
+
+System resources reporting is disabled by default. When 
`metrics.system-resource`
+is enabled additional metrics listed below will be available on Job- and 
TaskManager.
+System resources metrics are updated periodically and they present average 
values for a
+configured interval (`metrics.system-resource-probing-interval`).
+
+System resources reporting requires an optional dependency to be present on the
+classpath (for example placed in Flink's `lib` directory):
+
+  - `com.github.oshi:oshi-core:3.4.0` (licensed under EPL 1.0 license)
+
+Including it's transitive dependencies:
+
+  - `net.java.dev.jna:jna-platform:jar:4.2.2`
+  - `net.java.dev.jna:jna:jar:4.2.2`
+
+Failures in this regard will be reported as warning messages like 
`NoClassDefFoundError`
+logged by `SystemResourcesMetricsInitializer` during the startup.
+
+#### System CPU
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Scope</th>
+      <th class="text-left" style="width: 25%">Infix</th>
+      <th class="text-left" style="width: 23%">Metrics</th>
+      <th class="text-left" style="width: 32%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="12"><strong>Job-/TaskManager</strong></th>
+      <td rowspan="12">System.CPU</td>
+      <td>Usage</td>
+      <td>Overall % of CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>Idle</td>
+      <td>% of CPU Idle usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>Sys</td>
+      <td>% of System CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>User</td>
+      <td>% of User CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>IOWait</td>
+      <td>% of IOWait CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>Irq</td>
+      <td>% of Irq CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>SoftIrq</td>
+      <td>% of SoftIrq CPU usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>Nice</td>
+      <td>% of Nice Idle usage on the machine.</td>
+    </tr>
+    <tr>
+      <td>Load1min</td>
+      <td>Average CPU load over 1 minute</td>
+    </tr>
+    <tr>
+      <td>Load5min</td>
+      <td>Average CPU load over 5 minute</td>
+    </tr>
+    <tr>
+      <td>Load15min</td>
+      <td>Average CPU load over 15 minute</td>
+    </tr>
+    <tr>
+      <td>UsageCPU*</td>
+      <td>% of CPU usage per each processor</td>
+    </tr>
+  </tbody>
+</table>
+
+#### System memory
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Scope</th>
+      <th class="text-left" style="width: 25%">Infix</th>
+      <th class="text-left" style="width: 23%">Metrics</th>
+      <th class="text-left" style="width: 32%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="4"><strong>Job-/TaskManager</strong></th>
+      <td rowspan="2">System.Memory</td>
+      <td>Available</td>
+      <td>Available memory in bytes</td>
+    </tr>
+    <tr>
+      <td>Total</td>
+      <td>Total memory in bytes</td>
+    </tr>
+    <tr>
+      <td rowspan="2">System.Swap</td>
+      <td>Used</td>
+      <td>Used swap bytes</td>
+    </tr>
+    <tr>
+      <td>Total</td>
+      <td>Total swap in bytes</td>
+    </tr>
+  </tbody>
+</table>
+
+#### System network
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Scope</th>
+      <th class="text-left" style="width: 25%">Infix</th>
+      <th class="text-left" style="width: 23%">Metrics</th>
+      <th class="text-left" style="width: 32%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="2"><strong>Job-/TaskManager</strong></th>
+      <td rowspan="2">System.Network.INTERFACE_NAME</td>
+      <td>ReceiveRate</td>
+      <td>Average receive rate in bytes per second</td>
+    </tr>
+    <tr>
+      <td>SendRate</td>
+      <td>Average send rate in bytes per second</td>
+    </tr>
+  </tbody>
+</table>
+
 ## Latency tracking
 
 Flink allows to track the latency of records traveling through the system. To 
enable the latency tracking
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 1b30821..7b717bd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -18,12 +18,18 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.api.common.time.Time;
+
 import javax.annotation.Nonnull;
 
 import java.io.File;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
+import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
+import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;
+
 /**
  * Utility class for {@link Configuration} related helper functions.
  */
@@ -70,6 +76,19 @@ public class ConfigurationUtils {
        }
 
        /**
+        * @return extracted {@link 
MetricOptions#SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL} or {@code 
Optional.empty()} if
+        * {@link MetricOptions#SYSTEM_RESOURCE_METRICS} are disabled.
+        */
+       public static Optional<Time> 
getSystemResourceMetricsProbingInterval(Configuration configuration) {
+               if (!configuration.getBoolean(SYSTEM_RESOURCE_METRICS)) {
+                       return Optional.empty();
+               } else {
+                       return Optional.of(Time.milliseconds(
+                               
configuration.getLong(SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL)));
+               }
+       }
+
+       /**
         * Extracts the task manager directories for temporary files as defined 
by
         * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}.
         *
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 3b11645..f9fd024 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -110,6 +110,20 @@ public class MetricOptions {
                        .defaultValue(128)
                        .withDescription("Defines the number of measured 
latencies to maintain at each operator.");
 
+       /**
+        * Whether Flink should report system resource metrics such as 
machine's CPU, memory or network usage.
+        */
+       public static final ConfigOption<Boolean> SYSTEM_RESOURCE_METRICS =
+               key("metrics.system-resource")
+                       .defaultValue(false);
+       /**
+        * Interval between probing of system resource metrics specified in 
milliseconds. Has an effect only when
+        * {@link #SYSTEM_RESOURCE_METRICS} is enabled.
+        */
+       public static final ConfigOption<Long> 
SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL =
+               key("metrics.system-resource-probing-interval")
+                       .defaultValue(5000L);
+
        private MetricOptions() {
        }
 }
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index e163881..bc4a3cb 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -306,6 +306,13 @@ under the License.
                        <groupId>org.reflections</groupId>
                        <artifactId>reflections</artifactId>
                </dependency>
+
+               <!-- Used only for additional logging. Optional because of 
unclear EPL 1.0 license compatibility. -->
+               <dependency>
+                       <groupId>com.github.oshi</groupId>
+                       <artifactId>oshi-core</artifactId>
+                       <optional>true</optional>
+               </dependency>
        </dependencies>
 
        <!-- Dependency Management to converge transitive dependency versions 
-->
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index b429de5..ddd3751c 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -346,7 +346,10 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                                clusterInformation,
                                webMonitorEndpoint.getRestBaseUrl());
 
-                       jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, 
rpcService.getAddress());
+                       jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(
+                               metricRegistry,
+                               rpcService.getAddress(),
+                               
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
 
                        final HistoryServerArchivist historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
webMonitorEndpoint);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 367979e..b150631 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.metrics.util;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -45,6 +46,9 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
 import java.lang.management.ThreadMXBean;
 import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics;
 
 /**
  * Utility class to register pre-defined metric sets.
@@ -58,7 +62,8 @@ public class MetricUtils {
 
        public static JobManagerMetricGroup instantiateJobManagerMetricGroup(
                        final MetricRegistry metricRegistry,
-                       final String hostname) {
+                       final String hostname,
+                       final Optional<Time> systemResourceProbeInterval) {
                final JobManagerMetricGroup jobManagerMetricGroup = new 
JobManagerMetricGroup(
                        metricRegistry,
                        hostname);
@@ -68,13 +73,17 @@ public class MetricUtils {
                // initialize the JM metrics
                instantiateStatusMetrics(statusGroup);
 
+               if (systemResourceProbeInterval.isPresent()) {
+                       instantiateSystemMetrics(jobManagerMetricGroup, 
systemResourceProbeInterval.get());
+               }
                return jobManagerMetricGroup;
        }
 
        public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
                        MetricRegistry metricRegistry,
                        TaskManagerLocation taskManagerLocation,
-                       NetworkEnvironment network) {
+                       NetworkEnvironment network,
+                       Optional<Time> systemResourceProbeInterval) {
                final TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(
                        metricRegistry,
                        taskManagerLocation.getHostname(),
@@ -89,6 +98,9 @@ public class MetricUtils {
                        .addGroup("Network");
                instantiateNetworkMetrics(networkGroup, network);
 
+               if (systemResourceProbeInterval.isPresent()) {
+                       instantiateSystemMetrics(taskManagerMetricGroup, 
systemResourceProbeInterval.get());
+               }
                return taskManagerMetricGroup;
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesCounter.java
new file mode 100644
index 0000000..7365617
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesCounter.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.api.common.time.Time;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import oshi.SystemInfo;
+import oshi.hardware.CentralProcessor;
+import oshi.hardware.CentralProcessor.TickType;
+import oshi.hardware.HardwareAbstractionLayer;
+import oshi.hardware.NetworkIF;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Daemon thread probing system resources.
+ *
+ * <p>To accurately and consistently report CPU and network usage we have to 
periodically probe
+ * CPU ticks and network sent/received bytes and then convert those values to 
CPU usage and
+ * send/receive byte rates.
+ */
+@ThreadSafe
+public class SystemResourcesCounter extends Thread {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SystemResourcesCounter.class);
+
+       private final long probeIntervalMs;
+       private final SystemInfo systemInfo = new SystemInfo();
+       private final HardwareAbstractionLayer hardwareAbstractionLayer = 
systemInfo.getHardware();
+
+       private volatile boolean running = true;
+
+       private long[] previousCpuTicks;
+       private long[] bytesReceivedPerInterface;
+       private long[] bytesSentPerInterface;
+
+       private volatile double cpuUser;
+       private volatile double cpuNice;
+       private volatile double cpuSys;
+       private volatile double cpuIdle;
+       private volatile double cpuIOWait;
+       private volatile double cpuIrq;
+       private volatile double cpuSoftIrq;
+       private volatile double cpuUsage;
+
+       private volatile double cpuLoad1;
+       private volatile double cpuLoad5;
+       private volatile double cpuLoad15;
+
+       private AtomicReferenceArray<Double> cpuUsagePerProcessor;
+
+       private final String[] networkInterfaceNames;
+
+       private AtomicLongArray receiveRatePerInterface;
+       private AtomicLongArray sendRatePerInterface;
+
+       public SystemResourcesCounter(Time probeInterval) {
+               probeIntervalMs = probeInterval.toMilliseconds();
+               checkState(this.probeIntervalMs > 0);
+
+               setName(SystemResourcesCounter.class.getSimpleName() + " 
probing thread");
+
+               cpuUsagePerProcessor = new 
AtomicReferenceArray<>(hardwareAbstractionLayer.getProcessor().getLogicalProcessorCount());
+
+               NetworkIF[] networkIFs = 
hardwareAbstractionLayer.getNetworkIFs();
+               bytesReceivedPerInterface = new long[networkIFs.length];
+               bytesSentPerInterface = new long[networkIFs.length];
+               receiveRatePerInterface = new 
AtomicLongArray(networkIFs.length);
+               sendRatePerInterface = new AtomicLongArray(networkIFs.length);
+               networkInterfaceNames = new String[networkIFs.length];
+
+               for (int i = 0; i < networkInterfaceNames.length; i++) {
+                       networkInterfaceNames[i] = networkIFs[i].getName();
+               }
+       }
+
+       @Override
+       public void run() {
+               try {
+                       while (running) {
+                               
calculateCPUUsage(hardwareAbstractionLayer.getProcessor());
+                               
calculateNetworkUsage(hardwareAbstractionLayer.getNetworkIFs());
+                               Thread.sleep(probeIntervalMs);
+                       }
+               } catch (InterruptedException e) {
+                       if (running) {
+                               LOG.warn("{} has failed", 
SystemResourcesCounter.class.getSimpleName(), e);
+                       }
+               }
+       }
+
+       public void shutdown() throws InterruptedException {
+               running = false;
+               interrupt();
+               join();
+       }
+
+       public double getCpuUser() {
+               return cpuUser;
+       }
+
+       public double getCpuNice() {
+               return cpuNice;
+       }
+
+       public double getCpuSys() {
+               return cpuSys;
+       }
+
+       public double getCpuIdle() {
+               return cpuIdle;
+       }
+
+       public double getIOWait() {
+               return cpuIOWait;
+       }
+
+       public double getCpuIrq() {
+               return cpuIrq;
+       }
+
+       public double getCpuSoftIrq() {
+               return cpuSoftIrq;
+       }
+
+       public double getCpuUsage() {
+               return cpuUsage;
+       }
+
+       public double getCpuLoad1() {
+               return cpuLoad1;
+       }
+
+       public double getCpuLoad5() {
+               return cpuLoad5;
+       }
+
+       public double getCpuLoad15() {
+               return cpuLoad15;
+       }
+
+       public int getProcessorsCount() {
+               return cpuUsagePerProcessor.length();
+       }
+
+       public double getCpuUsagePerProcessor(int processor) {
+               return cpuUsagePerProcessor.get(processor);
+       }
+
+       public String[] getNetworkInterfaceNames() {
+               return networkInterfaceNames;
+       }
+
+       public long getReceiveRatePerInterface(int interfaceNo) {
+               return receiveRatePerInterface.get(interfaceNo);
+       }
+
+       public long getSendRatePerInterface(int interfaceNo) {
+               return sendRatePerInterface.get(interfaceNo);
+       }
+
+       private void calculateCPUUsage(CentralProcessor processor) {
+               long[] ticks = processor.getSystemCpuLoadTicks();
+               if (this.previousCpuTicks == null) {
+                       this.previousCpuTicks = ticks;
+               }
+
+               long userTicks = ticks[TickType.USER.getIndex()] - 
previousCpuTicks[TickType.USER.getIndex()];
+               long niceTicks = ticks[TickType.NICE.getIndex()] - 
previousCpuTicks[TickType.NICE.getIndex()];
+               long sysTicks = ticks[TickType.SYSTEM.getIndex()] - 
previousCpuTicks[TickType.SYSTEM.getIndex()];
+               long idleTicks = ticks[TickType.IDLE.getIndex()] - 
previousCpuTicks[TickType.IDLE.getIndex()];
+               long iowaitTicks = ticks[TickType.IOWAIT.getIndex()] - 
previousCpuTicks[TickType.IOWAIT.getIndex()];
+               long irqTicks = ticks[TickType.IRQ.getIndex()] - 
previousCpuTicks[TickType.IRQ.getIndex()];
+               long softIrqTicks = ticks[TickType.SOFTIRQ.getIndex()] - 
previousCpuTicks[TickType.SOFTIRQ.getIndex()];
+               long totalCpuTicks = userTicks + niceTicks + sysTicks + 
idleTicks + iowaitTicks + irqTicks + softIrqTicks;
+               this.previousCpuTicks = ticks;
+
+               cpuUser = 100d * userTicks / totalCpuTicks;
+               cpuNice = 100d * niceTicks / totalCpuTicks;
+               cpuSys = 100d * sysTicks / totalCpuTicks;
+               cpuIdle = 100d * idleTicks / totalCpuTicks;
+               cpuIOWait = 100d * iowaitTicks / totalCpuTicks;
+               cpuIrq = 100d * irqTicks / totalCpuTicks;
+               cpuSoftIrq = 100d * softIrqTicks / totalCpuTicks;
+
+               cpuUsage = processor.getSystemCpuLoad() * 100;
+
+               double[] loadAverage = processor.getSystemLoadAverage(3);
+               cpuLoad1 = (loadAverage[0] < 0 ? Double.NaN : loadAverage[0]);
+               cpuLoad5 = (loadAverage[1] < 0 ? Double.NaN : loadAverage[1]);
+               cpuLoad15 = (loadAverage[2] < 0 ? Double.NaN : loadAverage[2]);
+
+               double[] load = processor.getProcessorCpuLoadBetweenTicks();
+               checkState(load.length == cpuUsagePerProcessor.length());
+               for (int i = 0; i < load.length; i++) {
+                       cpuUsagePerProcessor.set(i, load[i] * 100);
+               }
+       }
+
+       private void calculateNetworkUsage(NetworkIF[] networkIFs) {
+               checkState(networkIFs.length == 
receiveRatePerInterface.length());
+
+               for (int i = 0; i < networkIFs.length; i++) {
+                       NetworkIF networkIF = networkIFs[i];
+                       networkIF.updateNetworkStats();
+
+                       receiveRatePerInterface.set(i, 
(networkIF.getBytesRecv() - bytesReceivedPerInterface[i]) * 1000 / 
probeIntervalMs);
+                       sendRatePerInterface.set(i, (networkIF.getBytesSent() - 
bytesSentPerInterface[i]) * 1000 / probeIntervalMs);
+
+                       bytesReceivedPerInterface[i] = networkIF.getBytesRecv();
+                       bytesSentPerInterface[i] = networkIF.getBytesSent();
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesMetricsInitializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesMetricsInitializer.java
new file mode 100644
index 0000000..01a9347
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/SystemResourcesMetricsInitializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import oshi.SystemInfo;
+import oshi.hardware.GlobalMemory;
+import oshi.hardware.HardwareAbstractionLayer;
+
+/**
+ * Utility class to initialize system resource metrics.
+ */
+public class SystemResourcesMetricsInitializer {
+       private static final Logger LOG = 
LoggerFactory.getLogger(SystemResourcesMetricsInitializer.class);
+
+       public static void instantiateSystemMetrics(MetricGroup metricGroup, 
Time probeInterval) {
+               try {
+                       MetricGroup system = metricGroup.addGroup("System");
+
+                       SystemResourcesCounter systemResourcesCounter = new 
SystemResourcesCounter(probeInterval);
+                       systemResourcesCounter.start();
+
+                       SystemInfo systemInfo = new SystemInfo();
+                       HardwareAbstractionLayer hardwareAbstractionLayer = 
systemInfo.getHardware();
+
+                       instantiateMemoryMetrics(system.addGroup("Memory"), 
hardwareAbstractionLayer.getMemory());
+                       instantiateSwapMetrics(system.addGroup("Swap"), 
hardwareAbstractionLayer.getMemory());
+                       instantiateCPUMetrics(system.addGroup("CPU"), 
systemResourcesCounter);
+                       instantiateNetworkMetrics(system.addGroup("Network"), 
systemResourcesCounter);
+               }
+               catch (NoClassDefFoundError ex) {
+                       LOG.warn(
+                               "Failed to initialize system resource metrics 
because of missing class definitions." +
+                               " Did you forget to explicitly add the 
oshi-core optional dependency?",
+                               ex);
+               }
+       }
+
+       private static void instantiateMemoryMetrics(MetricGroup metrics, 
GlobalMemory memory) {
+               metrics.<Long, Gauge<Long>>gauge("Available", 
memory::getAvailable);
+               metrics.<Long, Gauge<Long>>gauge("Total", memory::getTotal);
+       }
+
+       private static void instantiateSwapMetrics(MetricGroup metrics, 
GlobalMemory memory) {
+               metrics.<Long, Gauge<Long>>gauge("Used", memory::getSwapUsed);
+               metrics.<Long, Gauge<Long>>gauge("Total", memory::getSwapTotal);
+       }
+
+       private static void instantiateCPUMetrics(MetricGroup metrics, 
SystemResourcesCounter usageCounter) {
+               metrics.<Double, Gauge<Double>>gauge("Usage", 
usageCounter::getCpuUsage);
+               metrics.<Double, Gauge<Double>>gauge("Idle", 
usageCounter::getCpuIdle);
+               metrics.<Double, Gauge<Double>>gauge("Sys", 
usageCounter::getCpuSys);
+               metrics.<Double, Gauge<Double>>gauge("User", 
usageCounter::getCpuUser);
+               metrics.<Double, Gauge<Double>>gauge("IOWait", 
usageCounter::getIOWait);
+               metrics.<Double, Gauge<Double>>gauge("Nice", 
usageCounter::getCpuNice);
+               metrics.<Double, Gauge<Double>>gauge("Irq", 
usageCounter::getCpuIrq);
+               metrics.<Double, Gauge<Double>>gauge("SoftIrq", 
usageCounter::getCpuSoftIrq);
+
+               metrics.<Double, Gauge<Double>>gauge("Load1min", 
usageCounter::getCpuLoad1);
+               metrics.<Double, Gauge<Double>>gauge("Load5min", 
usageCounter::getCpuLoad5);
+               metrics.<Double, Gauge<Double>>gauge("Load15min", 
usageCounter::getCpuLoad15);
+
+               for (int i = 0; i < usageCounter.getProcessorsCount(); i++) {
+                       final int processor = i;
+                       metrics.<Double, Gauge<Double>>gauge(
+                               String.format("UsageCPU%d", processor),
+                               () -> 
usageCounter.getCpuUsagePerProcessor(processor));
+               }
+       }
+
+       private static void instantiateNetworkMetrics(MetricGroup metrics, 
SystemResourcesCounter usageCounter) {
+               for (int i = 0; i < 
usageCounter.getNetworkInterfaceNames().length; i++) {
+                       MetricGroup interfaceGroup = 
metrics.addGroup(usageCounter.getNetworkInterfaceNames()[i]);
+
+                       final int interfaceNo = i;
+                       interfaceGroup.<Long, Gauge<Long>>gauge("ReceiveRate", 
() -> usageCounter.getReceiveRatePerInterface(interfaceNo));
+                       interfaceGroup.<Long, Gauge<Long>>gauge("SendRate", () 
-> usageCounter.getSendRatePerInterface(interfaceNo));
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 97ab5a5..8054a38 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
@@ -354,7 +355,10 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                // bring up the dispatcher that launches 
JobManagers when jobs submitted
                                LOG.info("Starting job dispatcher(s) for 
JobManger");
 
-                               this.jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");
+                               this.jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(
+                                       metricRegistry,
+                                       "localhost",
+                                       
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
 
                                final HistoryServerArchivist 
historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
dispatcherRestEndpoint);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index c1f0cc8..9ab7f80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -367,7 +367,8 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                TaskManagerMetricGroup taskManagerMetricGroup = 
MetricUtils.instantiateTaskManagerMetricGroup(
                        metricRegistry,
                        taskManagerServices.getTaskManagerLocation(),
-                       taskManagerServices.getNetworkEnvironment());
+                       taskManagerServices.getNetworkEnvironment(),
+                       
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
                TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 86bc46d..eec39ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
+import java.util.Optional;
 
 import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
 import static org.apache.flink.util.MathUtils.checkedDownCast;
@@ -83,6 +85,10 @@ public class TaskManagerServicesConfiguration {
 
        private final boolean localRecoveryEnabled;
 
+       private boolean systemResourceMetricsEnabled;
+
+       private Optional<Time> systemResourceMetricsProbingInterval;
+
        public TaskManagerServicesConfiguration(
                        InetAddress taskManagerAddress,
                        String[] tmpDirPaths,
@@ -95,7 +101,8 @@ public class TaskManagerServicesConfiguration {
                        MemoryType memoryType,
                        boolean preAllocateMemory,
                        float memoryFraction,
-                       long timerServiceShutdownTimeout) {
+                       long timerServiceShutdownTimeout,
+                       Optional<Time> systemResourceMetricsProbingInterval) {
 
                this.taskManagerAddress = checkNotNull(taskManagerAddress);
                this.tmpDirPaths = checkNotNull(tmpDirPaths);
@@ -113,6 +120,9 @@ public class TaskManagerServicesConfiguration {
                checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
                        "service shutdown timeout must be greater or equal to 
0.");
                this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
+
+               this.systemResourceMetricsEnabled = 
systemResourceMetricsEnabled;
+               this.systemResourceMetricsProbingInterval = 
checkNotNull(systemResourceMetricsProbingInterval);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -179,6 +189,10 @@ public class TaskManagerServicesConfiguration {
                return timerServiceShutdownTimeout;
        }
 
+       public Optional<Time> getSystemResourceMetricsProbingInterval() {
+               return systemResourceMetricsProbingInterval;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Parsing of Flink configuration
        // 
--------------------------------------------------------------------------------------------
@@ -276,7 +290,8 @@ public class TaskManagerServicesConfiguration {
                        memType,
                        preAllocateMemory,
                        memoryFraction,
-                       timerServiceShutdownTimeout);
+                       timerServiceShutdownTimeout,
+                       
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
        }
 
        // 
--------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2a8f492..0855991 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2519,7 +2519,8 @@ object JobManager {
 
     val jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
       metricRegistry,
-      configuration.getString(JobManagerOptions.ADDRESS))
+      configuration.getString(JobManagerOptions.ADDRESS),
+      
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration))
 
     (instanceManager,
       scheduler,
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index fb57861..6b9e4a8 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -47,7 +47,7 @@ import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
 import 
org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, 
StoppingFailure, StoppingResponse}
 import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, 
TaskManagerMetricGroup}
-import org.apache.flink.runtime.metrics.util.MetricUtils
+import org.apache.flink.runtime.metrics.util.{MetricUtils, 
SystemResourcesMetricsInitializer}
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, 
TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
@@ -246,7 +246,8 @@ class LocalFlinkMiniCluster(
     val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
       metricRegistryOpt.get,
       taskManagerServices.getTaskManagerLocation(),
-      taskManagerServices.getNetworkEnvironment())
+      taskManagerServices.getNetworkEnvironment(),
+      taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval)
 
     val props = getTaskManagerProps(
       taskManagerClass,
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 62fe862..1de4848 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2019,7 +2019,8 @@ object TaskManager {
     val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
       metricRegistry,
       taskManagerServices.getTaskManagerLocation(),
-      taskManagerServices.getNetworkEnvironment())
+      taskManagerServices.getNetworkEnvironment(),
+      taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval)
 
     // create the actor properties (which define the actor constructor 
parameters)
     val tmProps = getTaskManagerProps(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index db04023..4d18060 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -106,7 +106,8 @@ public class TaskManagerMetricsTest extends TestLogger {
                        TaskManagerMetricGroup taskManagerMetricGroup = 
MetricUtils.instantiateTaskManagerMetricGroup(
                                metricRegistry,
                                taskManagerServices.getTaskManagerLocation(),
-                               taskManagerServices.getNetworkEnvironment());
+                               taskManagerServices.getNetworkEnvironment(),
+                               
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
                        // create the task manager
                        final Props tmProps = TaskManager.getTaskManagerProps(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/utils/SystemResourcesCounterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/utils/SystemResourcesCounterTest.java
new file mode 100644
index 0000000..f6c228d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/utils/SystemResourcesCounterTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.utils;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.metrics.util.SystemResourcesCounter;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests for {@link SystemResourcesCounter}.
+ */
+public class SystemResourcesCounterTest {
+
+       private static final double EPSILON = 0.01;
+
+       @Test
+       public void testObtainAnyMetrics() throws InterruptedException {
+               SystemResourcesCounter systemResources = new 
SystemResourcesCounter(Time.milliseconds(10));
+               double initialCpuIdle = systemResources.getCpuIdle();
+
+               systemResources.start();
+               // wait for stats to update/calculate
+               try {
+                       double cpuIdle;
+                       do {
+                               Thread.sleep(1);
+                               cpuIdle = systemResources.getCpuIdle();
+                       }
+                       while (initialCpuIdle == cpuIdle || 
Double.isNaN(cpuIdle) || cpuIdle == 0.0);
+               }
+               finally {
+                       systemResources.shutdown();
+                       systemResources.join();
+               }
+
+               double totalCpuUsage = systemResources.getCpuIrq() +
+                       systemResources.getCpuNice() +
+                       systemResources.getCpuSoftIrq() +
+                       systemResources.getCpuSys() +
+                       systemResources.getCpuUser() +
+                       systemResources.getIOWait();
+
+               assertTrue(
+                       "There should be at least one processor",
+                       systemResources.getProcessorsCount() > 0);
+               assertTrue(
+                       "There should be at least one network interface",
+                       systemResources.getNetworkInterfaceNames().length > 0);
+               assertEquals(100.0, totalCpuUsage + 
systemResources.getCpuIdle(), EPSILON);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index 6374cf8..586f937 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.net.InetAddress;
+import java.util.Optional;
 
 import static org.apache.flink.util.MathUtils.checkedDownCast;
 import static org.junit.Assert.assertEquals;
@@ -108,6 +109,7 @@ public class NetworkBufferCalculationTest extends 
TestLogger {
                        memType,
                        false,
                        managedMemoryFraction,
-                       0);
+                       0,
+                       Optional.empty());
        }
 }
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index f9746e1..80cf8de 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -231,6 +231,12 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>com.github.oshi</groupId>
+                       <artifactId>oshi-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- utility to scan classpaths -->
                <dependency>
                        <groupId>org.reflections</groupId>
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
new file mode 100644
index 0000000..6d9a7b0
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization of the system resource metrics.
+ */
+public class SystemResourcesMetricsITCase {
+
+       @ClassRule
+       public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+               new MiniClusterResourceConfiguration.Builder()
+                       .setConfiguration(getConfiguration())
+                       .setNumberTaskManagers(1)
+                       .setNumberSlotsPerTaskManager(1)
+                       .build());
+
+       private static Configuration getConfiguration() {
+               Configuration configuration = new Configuration();
+               configuration.setBoolean(SYSTEM_RESOURCE_METRICS, true);
+               configuration.setString(REPORTERS_LIST, "test_reporter");
+               configuration.setString("metrics.reporter.test_reporter.class", 
TestReporter.class.getName());
+               return configuration;
+       }
+
+       @Test
+       public void startTaskManagerAndCheckForRegisteredSystemMetrics() throws 
Exception {
+               assertEquals(1, TestReporter.OPENED_REPORTERS.size());
+               TestReporter reporter = 
TestReporter.OPENED_REPORTERS.iterator().next();
+
+               List<String> expectedPatterns = getExpectedPatterns();
+
+               Collection<String> gaugeNames = reporter.getGauges().values();
+
+               for (String expectedPattern : expectedPatterns) {
+                       boolean found = false;
+                       for (String gaugeName : gaugeNames) {
+                               if (gaugeName.matches(expectedPattern)) {
+                                       found = true;
+                               }
+                       }
+                       if (!found) {
+                               fail(String.format("Failed to find gauge [%s] 
in registered gauges [%s]", expectedPattern, gaugeNames));
+                       }
+               }
+       }
+
+       private static List<String> getExpectedPatterns() {
+               String[] expectedGauges = new String[] {
+                       "System.CPU.Idle",
+                       "System.CPU.Sys",
+                       "System.CPU.User",
+                       "System.CPU.IOWait",
+                       "System.CPU.Irq",
+                       "System.CPU.SoftIrq",
+                       "System.CPU.Nice",
+                       "System.Memory.Available",
+                       "System.Memory.Total",
+                       "System.Swap.Used",
+                       "System.Swap.Total",
+                       "System.Network.*ReceiveRate",
+                       "System.Network.*SendRate"
+               };
+
+               String[] expectedHosts = new String[] {
+                       "localhost.taskmanager.([a-f0-9\\\\-])*.",
+                       "localhost.jobmanager."
+               };
+
+               List<String> patterns = new ArrayList<>();
+               for (String expectedHost : expectedHosts) {
+                       for (String expectedGauge : expectedGauges) {
+                               patterns.add(expectedHost + expectedGauge);
+                       }
+               }
+               return patterns;
+       }
+
+       /**
+        * Test metric reporter that exposes registered metrics.
+        */
+       public static final class TestReporter extends AbstractReporter {
+               public static final Set<TestReporter> OPENED_REPORTERS = 
ConcurrentHashMap.newKeySet();
+
+               @Override
+               public String filterCharacters(String input) {
+                       return input;
+               }
+
+               @Override
+               public void open(MetricConfig config) {
+                       OPENED_REPORTERS.add(this);
+               }
+
+               @Override
+               public void close() {
+                       OPENED_REPORTERS.remove(this);
+               }
+
+               public Map<Gauge<?>, String> getGauges() {
+                       return gauges;
+               }
+       }
+}
diff --git a/pom.xml b/pom.xml
index 434f577..35b57da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -299,8 +299,14 @@ under the License.
                                <version>1.1.4</version>
                        </dependency>
 
+                       <dependency>
+                               <groupId>com.github.oshi</groupId>
+                               <artifactId>oshi-core</artifactId>
+                               <version>3.4.0</version>
+                       </dependency>
+
                        <!-- Make sure we use a consistent avro version between 
Flink and Hadoop -->            
-                       <dependency>
+                       <dependency>
                                <groupId>org.apache.avro</groupId>
                                <artifactId>avro</artifactId>
                                <version>${avro.version}</version>

Reply via email to