This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ed58c34e9859e735c3fb542a8ebc577655d9aa46 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Tue Oct 10 17:24:21 2017 +0200 [hotfix][metrics] Replace anonymous classes with lambdas --- .../flink/runtime/metrics/util/MetricUtils.java | 115 ++++----------------- 1 file changed, 20 insertions(+), 95 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index 3fd268a..367979e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.metrics.util; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -41,7 +42,7 @@ import javax.management.ReflectionException; import java.lang.management.ClassLoadingMXBean; import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; import java.lang.management.ThreadMXBean; import java.util.List; @@ -105,37 +106,16 @@ public class MetricUtils { private static void instantiateNetworkMetrics( MetricGroup metrics, final NetworkEnvironment network) { - metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new Gauge<Long> () { - @Override - public Long getValue() { - return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments(); - } - }); - metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new Gauge<Long> () { - @Override - public Long getValue() { - return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments(); - } - }); + final NetworkBufferPool networkBufferPool = network.getNetworkBufferPool(); + metrics.<Integer, Gauge<Integer>>gauge("TotalMemorySegments", networkBufferPool::getTotalNumberOfMemorySegments); + metrics.<Integer, Gauge<Integer>>gauge("AvailableMemorySegments", networkBufferPool::getNumberOfAvailableMemorySegments); } private static void instantiateClassLoaderMetrics(MetricGroup metrics) { final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean(); - - metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getTotalLoadedClassCount(); - } - }); - - metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getUnloadedClassCount(); - } - }); + metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", mxBean::getTotalLoadedClassCount); + metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", mxBean::getUnloadedClassCount); } private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) { @@ -144,66 +124,26 @@ public class MetricUtils { for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) { MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName()); - gcGroup.<Long, Gauge<Long>>gauge("Count", new Gauge<Long> () { - @Override - public Long getValue() { - return garbageCollector.getCollectionCount(); - } - }); - - gcGroup.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () { - @Override - public Long getValue() { - return garbageCollector.getCollectionTime(); - } - }); + gcGroup.<Long, Gauge<Long>>gauge("Count", garbageCollector::getCollectionCount); + gcGroup.<Long, Gauge<Long>>gauge("Time", garbageCollector::getCollectionTime); } } private static void instantiateMemoryMetrics(MetricGroup metrics) { - final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); + final MemoryUsage heapMemoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + final MemoryUsage nonHeapMemoryUsage = ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(); MetricGroup heap = metrics.addGroup("Heap"); - heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getHeapMemoryUsage().getUsed(); - } - }); - heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getHeapMemoryUsage().getCommitted(); - } - }); - heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getHeapMemoryUsage().getMax(); - } - }); + heap.<Long, Gauge<Long>>gauge("Used", heapMemoryUsage::getUsed); + heap.<Long, Gauge<Long>>gauge("Committed", heapMemoryUsage::getCommitted); + heap.<Long, Gauge<Long>>gauge("Max", heapMemoryUsage::getMax); MetricGroup nonHeap = metrics.addGroup("NonHeap"); - nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getNonHeapMemoryUsage().getUsed(); - } - }); - nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getNonHeapMemoryUsage().getCommitted(); - } - }); - nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getNonHeapMemoryUsage().getMax(); - } - }); + nonHeap.<Long, Gauge<Long>>gauge("Used", nonHeapMemoryUsage::getUsed); + nonHeap.<Long, Gauge<Long>>gauge("Committed", nonHeapMemoryUsage::getCommitted); + nonHeap.<Long, Gauge<Long>>gauge("Max", nonHeapMemoryUsage::getMax); final MBeanServer con = ManagementFactory.getPlatformMBeanServer(); @@ -239,30 +179,15 @@ public class MetricUtils { private static void instantiateThreadMetrics(MetricGroup metrics) { final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); - metrics.<Integer, Gauge<Integer>>gauge("Count", new Gauge<Integer> () { - @Override - public Integer getValue() { - return mxBean.getThreadCount(); - } - }); + metrics.<Integer, Gauge<Integer>>gauge("Count", mxBean::getThreadCount); } private static void instantiateCPUMetrics(MetricGroup metrics) { try { final com.sun.management.OperatingSystemMXBean mxBean = (com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); - metrics.<Double, Gauge<Double>>gauge("Load", new Gauge<Double> () { - @Override - public Double getValue() { - return mxBean.getProcessCpuLoad(); - } - }); - metrics.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () { - @Override - public Long getValue() { - return mxBean.getProcessCpuTime(); - } - }); + metrics.<Double, Gauge<Double>>gauge("Load", mxBean::getProcessCpuLoad); + metrics.<Long, Gauge<Long>>gauge("Time", mxBean::getProcessCpuTime); } catch (Exception e) { LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + " - CPU load metrics will not be available.", e);