[FLINK-7876] Merge TaskExecutorMetricsInitializer and MetricUtils

This commit removes the TaskExecutorMetricsInitializer and moves its methods
to MetricUtils.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fb7e0b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fb7e0b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fb7e0b9

Branch: refs/heads/master
Commit: 7fb7e0b9775d1773d20e63732130ae140781a6f2
Parents: ad42ee2
Author: Till Rohrmann <[email protected]>
Authored: Wed Nov 1 12:31:52 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Wed Nov 1 15:52:01 2017 +0100

----------------------------------------------------------------------
 .../runtime/jobmaster/JobManagerRunner.java     |   3 +-
 .../flink/runtime/metrics/util/MetricUtils.java | 230 +++++++++--------
 .../utils/TaskExecutorMetricsInitializer.java   | 257 -------------------
 .../flink/runtime/jobmanager/JobManager.scala   |   3 +-
 4 files changed, 128 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 14baa6f..f95b5a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
@@ -127,7 +128,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        checkArgument(jobGraph.getNumberOfVertices() > 0, "The 
given job is empty");
 
                        final String hostAddress = 
rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress();
-                       jobManagerMetrics = new 
JobManagerMetricGroup(metricRegistry, hostAddress);
+                       jobManagerMetrics = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, hostAddress);
                        this.jobManagerMetricGroup = jobManagerMetrics;
 
                        // libraries and class loader first

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 08353e3..2a59a7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -22,23 +22,27 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import 
org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
 
-import org.apache.commons.lang3.text.WordUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.management.BufferPoolMXBean;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
 import java.lang.management.ClassLoadingMXBean;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
-import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.ThreadMXBean;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.List;
 
 /**
@@ -51,6 +55,21 @@ public class MetricUtils {
        private MetricUtils() {
        }
 
+       public static JobManagerMetricGroup instantiateJobManagerMetricGroup(
+                       final MetricRegistry metricRegistry,
+                       final String hostname) {
+               final JobManagerMetricGroup jobManagerMetricGroup = new 
JobManagerMetricGroup(
+                       metricRegistry,
+                       hostname);
+
+               MetricGroup statusGroup = 
jobManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
+
+               // initialize the JM metrics
+               instantiateStatusMetrics(statusGroup);
+
+               return jobManagerMetricGroup;
+       }
+
        public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
                        MetricRegistry metricRegistry,
                        TaskManagerLocation taskManagerLocation,
@@ -60,59 +79,55 @@ public class MetricUtils {
                        taskManagerLocation.getHostname(),
                        taskManagerLocation.getResourceID().toString());
 
+               MetricGroup statusGroup = 
taskManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
+
                // Initialize the TM metrics
-               
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
network);
+               instantiateStatusMetrics(statusGroup);
+               instantiateNetworkMetrics(statusGroup, network);
 
                return taskManagerMetricGroup;
        }
 
-       public static void instantiateNetworkMetrics(
-               MetricGroup metrics,
-               final NetworkEnvironment network) {
-               MetricGroup status = metrics.addGroup(METRIC_GROUP_STATUS_NAME);
+       public static void instantiateStatusMetrics(
+                       MetricGroup metricGroup) {
+               MetricGroup jvm = metricGroup.addGroup("JVM");
 
-               MetricGroup networkGroup = status
-                       .addGroup("Network");
+               instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
+               
instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
+               instantiateMemoryMetrics(jvm.addGroup("Memory"));
+               instantiateThreadMetrics(jvm.addGroup("Threads"));
+               instantiateCPUMetrics(jvm.addGroup("CPU"));
+       }
 
-               networkGroup.gauge("TotalMemorySegments", new Gauge<Integer>() {
+       private static void instantiateNetworkMetrics(
+               MetricGroup metrics,
+               final NetworkEnvironment network) {
+               metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new 
Gauge<Long> () {
                        @Override
-                       public Integer getValue() {
-                               return 
network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
+                       public Long getValue() {
+                               return (long) 
network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
                        }
                });
-               networkGroup.gauge("AvailableMemorySegments", new 
Gauge<Integer>() {
+
+               metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new 
Gauge<Long> () {
                        @Override
-                       public Integer getValue() {
-                               return 
network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
+                       public Long getValue() {
+                               return (long) 
network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
                        }
                });
        }
 
-       public static void instantiateStatusMetrics(
-               MetricGroup metrics) {
-               MetricGroup status = metrics
-                       .addGroup(METRIC_GROUP_STATUS_NAME);
-
-               MetricGroup jvm = status
-                       .addGroup("JVM");
-
-               instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
-               
instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
-               instantiateMemoryMetrics(jvm.addGroup("Memory"));
-               instantiateThreadMetrics(jvm.addGroup("Threads"));
-               instantiateCPUMetrics(jvm.addGroup("CPU"));
-       }
-
        private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
                final ClassLoadingMXBean mxBean = 
ManagementFactory.getClassLoadingMXBean();
 
-               metrics.gauge("ClassesLoaded", new Gauge<Long>() {
+               metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new 
Gauge<Long> () {
                        @Override
                        public Long getValue() {
                                return mxBean.getTotalLoadedClassCount();
                        }
                });
-               metrics.gauge("ClassesUnloaded", new Gauge<Long>() {
+
+               metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new 
Gauge<Long> () {
                        @Override
                        public Long getValue() {
                                return mxBean.getUnloadedClassCount();
@@ -123,15 +138,17 @@ public class MetricUtils {
        private static void instantiateGarbageCollectorMetrics(MetricGroup 
metrics) {
                List<GarbageCollectorMXBean> garbageCollectors = 
ManagementFactory.getGarbageCollectorMXBeans();
 
-               for (final GarbageCollectorMXBean garbageCollector : 
garbageCollectors) {
+               for (final GarbageCollectorMXBean garbageCollector: 
garbageCollectors) {
                        MetricGroup gcGroup = 
metrics.addGroup(garbageCollector.getName());
-                       gcGroup.gauge("Count", new Gauge<Long>() {
+
+                       gcGroup.<Long, Gauge<Long>>gauge("Count", new 
Gauge<Long> () {
                                @Override
                                public Long getValue() {
                                        return 
garbageCollector.getCollectionCount();
                                }
                        });
-                       gcGroup.gauge("Time", new Gauge<Long>() {
+
+                       gcGroup.<Long, Gauge<Long>>gauge("Time", new 
Gauge<Long> () {
                                @Override
                                public Long getValue() {
                                        return 
garbageCollector.getCollectionTime();
@@ -142,20 +159,22 @@ public class MetricUtils {
 
        private static void instantiateMemoryMetrics(MetricGroup metrics) {
                final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+
                MetricGroup heap = metrics.addGroup("Heap");
-               heap.gauge("Used", new Gauge<Long>() {
+
+               heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
                        @Override
                        public Long getValue() {
                                return mxBean.getHeapMemoryUsage().getUsed();
                        }
                });
-               heap.gauge("Committed", new Gauge<Long>() {
+               heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
                        @Override
                        public Long getValue() {
                                return 
mxBean.getHeapMemoryUsage().getCommitted();
                        }
                });
-               heap.gauge("Max", new Gauge<Long>() {
+               heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
                        @Override
                        public Long getValue() {
                                return mxBean.getHeapMemoryUsage().getMax();
@@ -163,54 +182,61 @@ public class MetricUtils {
                });
 
                MetricGroup nonHeap = metrics.addGroup("NonHeap");
-               nonHeap.gauge("Used", new Gauge<Long>() {
+
+               nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
                        @Override
                        public Long getValue() {
                                return mxBean.getNonHeapMemoryUsage().getUsed();
                        }
                });
-               nonHeap.gauge("Committed", new Gauge<Long>() {
+               nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> 
() {
                        @Override
                        public Long getValue() {
                                return 
mxBean.getNonHeapMemoryUsage().getCommitted();
                        }
                });
-               nonHeap.gauge("Max", new Gauge<Long>() {
+               nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
                        @Override
                        public Long getValue() {
                                return mxBean.getNonHeapMemoryUsage().getMax();
                        }
                });
 
-               List<BufferPoolMXBean> bufferMxBeans = 
ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
+               final MBeanServer con = 
ManagementFactory.getPlatformMBeanServer();
 
-               for (final BufferPoolMXBean bufferMxBean : bufferMxBeans) {
-                       MetricGroup bufferGroup = 
metrics.addGroup(WordUtils.capitalize(bufferMxBean.getName()));
-                       bufferGroup.gauge("Count", new Gauge<Long>() {
-                               @Override
-                               public Long getValue() {
-                                       return bufferMxBean.getCount();
-                               }
-                       });
-                       bufferGroup.gauge("MemoryUsed", new Gauge<Long>() {
-                               @Override
-                               public Long getValue() {
-                                       return bufferMxBean.getMemoryUsed();
-                               }
-                       });
-                       bufferGroup.gauge("TotalCapacity", new Gauge<Long>() {
-                               @Override
-                               public Long getValue() {
-                                       return bufferMxBean.getTotalCapacity();
-                               }
-                       });
+               final String directBufferPoolName = 
"java.nio:type=BufferPool,name=direct";
+
+               try {
+                       final ObjectName directObjectName = new 
ObjectName(directBufferPoolName);
+
+                       MetricGroup direct = metrics.addGroup("Direct");
+
+                       direct.<Long, Gauge<Long>>gauge("Count", new 
AttributeGauge<>(con, directObjectName, "Count", -1L));
+                       direct.<Long, Gauge<Long>>gauge("MemoryUsed", new 
AttributeGauge<>(con, directObjectName, "MemoryUsed", -1L));
+                       direct.<Long, Gauge<Long>>gauge("TotalCapacity", new 
AttributeGauge<>(con, directObjectName, "TotalCapacity", -1L));
+               } catch (MalformedObjectNameException e) {
+                       LOG.warn("Could not create object name {}.", 
directBufferPoolName, e);
+               }
+
+               final String mappedBufferPoolName = 
"java.nio:type=BufferPool,name=mapped";
+
+               try {
+                       final ObjectName mappedObjectName = new 
ObjectName(mappedBufferPoolName);
+
+                       MetricGroup mapped = metrics.addGroup("Mapped");
+
+                       mapped.<Long, Gauge<Long>>gauge("Count", new 
AttributeGauge<>(con, mappedObjectName, "Count", -1L));
+                       mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new 
AttributeGauge<>(con, mappedObjectName, "MemoryUsed", -1L));
+                       mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new 
AttributeGauge<>(con, mappedObjectName, "TotalCapacity", -1L));
+               } catch (MalformedObjectNameException e) {
+                       LOG.warn("Could not create object name {}.", 
mappedBufferPoolName, e);
                }
        }
 
        private static void instantiateThreadMetrics(MetricGroup metrics) {
                final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
 
-               metrics.gauge("Count", new Gauge<Integer>() {
+               metrics.<Integer, Gauge<Integer>>gauge("Count", new 
Gauge<Integer> () {
                        @Override
                        public Integer getValue() {
                                return mxBean.getThreadCount();
@@ -220,54 +246,48 @@ public class MetricUtils {
 
        private static void instantiateCPUMetrics(MetricGroup metrics) {
                try {
-                       final OperatingSystemMXBean mxBean = 
ManagementFactory.getOperatingSystemMXBean();
-
-                       final Method fetchCPULoadMethod = 
Class.forName("com.sun.management.OperatingSystemMXBean")
-                               .getMethod("getProcessCpuLoad");
-                       // verify that we can invoke the method
-                       fetchCPULoadMethod.invoke(mxBean);
+                       final com.sun.management.OperatingSystemMXBean mxBean = 
(com.sun.management.OperatingSystemMXBean) 
ManagementFactory.getOperatingSystemMXBean();
 
-                       final Method fetchCPUTimeMethod = 
Class.forName("com.sun.management.OperatingSystemMXBean")
-                               .getMethod("getProcessCpuTime");
-                       // verify that we can invoke the method
-                       fetchCPUTimeMethod.invoke(mxBean);
-
-                       metrics.gauge("Load", new Gauge<Double>() {
+                       metrics.<Double, Gauge<Double>>gauge("Load", new 
Gauge<Double> () {
                                @Override
                                public Double getValue() {
-                                       try {
-                                               return (Double) 
fetchCPULoadMethod.invoke(mxBean);
-                                       } catch (IllegalAccessException | 
InvocationTargetException | IllegalArgumentException ignored) {
-                                               return -1.0;
-                                       }
+                                       return mxBean.getProcessCpuLoad();
                                }
                        });
-                       metrics.gauge("Time", new Gauge<Long>() {
+                       metrics.<Long, Gauge<Long>>gauge("Time", new 
Gauge<Long> () {
                                @Override
                                public Long getValue() {
-                                       try {
-                                               return (Long) 
fetchCPUTimeMethod.invoke(mxBean);
-                                       } catch (IllegalAccessException | 
InvocationTargetException | IllegalArgumentException ignored) {
-                                               return -1L;
-                                       }
+                                       return mxBean.getProcessCpuTime();
                                }
                        });
-               } catch (ClassNotFoundException | InvocationTargetException | 
SecurityException | NoSuchMethodException | IllegalArgumentException | 
IllegalAccessException ignored) {
+               } catch (Exception e) {
                        LOG.warn("Cannot access 
com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-                               " - CPU load metrics will not be available.");
-                       // make sure that a metric still exists for the given 
name
-                       metrics.gauge("Load", new Gauge<Double>() {
-                               @Override
-                               public Double getValue() {
-                                       return -1.0;
-                               }
-                       });
-                       metrics.gauge("Time", new Gauge<Long>() {
-                               @Override
-                               public Long getValue() {
-                                       return -1L;
-                               }
-                       });
+                               " - CPU load metrics will not be available.", 
e);
+               }
+       }
+
+       private static final class AttributeGauge<T> implements Gauge<T> {
+               private final MBeanServer server;
+               private final ObjectName objectName;
+               private final String attributeName;
+               private final T errorValue;
+
+               private AttributeGauge(MBeanServer server, ObjectName 
objectName, String attributeName, T errorValue) {
+                       this.server = Preconditions.checkNotNull(server);
+                       this.objectName = 
Preconditions.checkNotNull(objectName);
+                       this.attributeName = 
Preconditions.checkNotNull(attributeName);
+                       this.errorValue = errorValue;
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public T getValue() {
+                       try {
+                               return (T) server.getAttribute(objectName, 
attributeName);
+                       } catch (MBeanException | AttributeNotFoundException | 
InstanceNotFoundException | ReflectionException e) {
+                               LOG.warn("Could not read attribute {}.", 
attributeName, e);
+                               return errorValue;
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
deleted file mode 100644
index 1f8d5ed..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor.utils;
-
-import com.sun.management.OperatingSystemMXBean;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.taskexecutor.TaskExecutor;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.AttributeNotFoundException;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.ReflectionException;
-import java.lang.management.ClassLoadingMXBean;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.ThreadMXBean;
-import java.util.List;
-
-/**
- * Utility class ot initialize {@link TaskExecutor} specific metrics.
- */
-public class TaskExecutorMetricsInitializer {
-       private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorMetricsInitializer.class);
-
-       public static void instantiateStatusMetrics(
-               MetricGroup taskManagerMetricGroup,
-               NetworkEnvironment network) {
-               MetricGroup status = taskManagerMetricGroup.addGroup("Status");
-
-               instantiateNetworkMetrics(status.addGroup("Network"), network);
-
-               MetricGroup jvm = status.addGroup("JVM");
-
-               instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
-               
instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
-               instantiateMemoryMetrics(jvm.addGroup("Memory"));
-               instantiateThreadMetrics(jvm.addGroup("Threads"));
-               instantiateCPUMetrics(jvm.addGroup("CPU"));
-       }
-
-       private static void instantiateNetworkMetrics(
-               MetricGroup metrics,
-               final NetworkEnvironment network) {
-               metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new 
Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return (long) 
network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
-                       }
-               });
-
-               metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new 
Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return (long) 
network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
-                       }
-               });
-       }
-
-       private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
-               final ClassLoadingMXBean mxBean = 
ManagementFactory.getClassLoadingMXBean();
-
-               metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new 
Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getTotalLoadedClassCount();
-                       }
-               });
-
-               metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new 
Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getUnloadedClassCount();
-                       }
-               });
-       }
-
-       private static void instantiateGarbageCollectorMetrics(MetricGroup 
metrics) {
-               List<GarbageCollectorMXBean> garbageCollectors = 
ManagementFactory.getGarbageCollectorMXBeans();
-
-               for (final GarbageCollectorMXBean garbageCollector: 
garbageCollectors) {
-                       MetricGroup gcGroup = 
metrics.addGroup(garbageCollector.getName());
-
-                       gcGroup.<Long, Gauge<Long>>gauge("Count", new 
Gauge<Long> () {
-                               @Override
-                               public Long getValue() {
-                                       return 
garbageCollector.getCollectionCount();
-                               }
-                       });
-
-                       gcGroup.<Long, Gauge<Long>>gauge("Time", new 
Gauge<Long> () {
-                               @Override
-                               public Long getValue() {
-                                       return 
garbageCollector.getCollectionTime();
-                               }
-                       });
-               }
-       }
-
-       private static void instantiateMemoryMetrics(MetricGroup metrics) {
-               final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
-
-               MetricGroup heap = metrics.addGroup("Heap");
-
-               heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getHeapMemoryUsage().getUsed();
-                       }
-               });
-               heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return 
mxBean.getHeapMemoryUsage().getCommitted();
-                       }
-               });
-               heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getHeapMemoryUsage().getMax();
-                       }
-               });
-
-               MetricGroup nonHeap = metrics.addGroup("NonHeap");
-
-               nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getNonHeapMemoryUsage().getUsed();
-                       }
-               });
-               nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> 
() {
-                       @Override
-                       public Long getValue() {
-                               return 
mxBean.getNonHeapMemoryUsage().getCommitted();
-                       }
-               });
-               nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
-                       @Override
-                       public Long getValue() {
-                               return mxBean.getNonHeapMemoryUsage().getMax();
-                       }
-               });
-
-               final MBeanServer con = 
ManagementFactory.getPlatformMBeanServer();
-
-               final String directBufferPoolName = 
"java.nio:type=BufferPool,name=direct";
-
-               try {
-                       final ObjectName directObjectName = new 
ObjectName(directBufferPoolName);
-
-                       MetricGroup direct = metrics.addGroup("Direct");
-
-                       direct.<Long, Gauge<Long>>gauge("Count", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "Count", 
-1L));
-                       direct.<Long, Gauge<Long>>gauge("MemoryUsed", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, 
"MemoryUsed", -1L));
-                       direct.<Long, Gauge<Long>>gauge("TotalCapacity", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, 
"TotalCapacity", -1L));
-               } catch (MalformedObjectNameException e) {
-                       LOG.warn("Could not create object name {}.", 
directBufferPoolName, e);
-               }
-
-               final String mappedBufferPoolName = 
"java.nio:type=BufferPool,name=mapped";
-
-               try {
-                       final ObjectName mappedObjectName = new 
ObjectName(mappedBufferPoolName);
-
-                       MetricGroup mapped = metrics.addGroup("Mapped");
-
-                       mapped.<Long, Gauge<Long>>gauge("Count", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "Count", 
-1L));
-                       mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, 
"MemoryUsed", -1L));
-                       mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, 
"TotalCapacity", -1L));
-               } catch (MalformedObjectNameException e) {
-                       LOG.warn("Could not create object name {}.", 
mappedBufferPoolName, e);
-               }
-       }
-
-       private static void instantiateThreadMetrics(MetricGroup metrics) {
-               final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
-
-               metrics.<Integer, Gauge<Integer>>gauge("Count", new 
Gauge<Integer> () {
-                       @Override
-                       public Integer getValue() {
-                               return mxBean.getThreadCount();
-                       }
-               });
-       }
-
-       private static void instantiateCPUMetrics(MetricGroup metrics) {
-               try {
-                       final OperatingSystemMXBean mxBean = 
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
-
-                       metrics.<Double, Gauge<Double>>gauge("Load", new 
Gauge<Double> () {
-                               @Override
-                               public Double getValue() {
-                                       return mxBean.getProcessCpuLoad();
-                               }
-                       });
-                       metrics.<Long, Gauge<Long>>gauge("Time", new 
Gauge<Long> () {
-                               @Override
-                               public Long getValue() {
-                                       return mxBean.getProcessCpuTime();
-                               }
-                       });
-               } catch (Exception e) {
-                       LOG.warn("Cannot access 
com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-                               " - CPU load metrics will not be available.", 
e);
-               }
-       }
-
-       private static final class AttributeGauge<T> implements Gauge<T> {
-               private final MBeanServer server;
-               private final ObjectName objectName;
-               private final String attributeName;
-               private final T errorValue;
-
-               private AttributeGauge(MBeanServer server, ObjectName 
objectName, String attributeName, T errorValue) {
-                       this.server = Preconditions.checkNotNull(server);
-                       this.objectName = 
Preconditions.checkNotNull(objectName);
-                       this.attributeName = 
Preconditions.checkNotNull(attributeName);
-                       this.errorValue = errorValue;
-               }
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public T getValue() {
-                       try {
-                               return (T) server.getAttribute(objectName, 
attributeName);
-                       } catch (MBeanException | AttributeNotFoundException | 
InstanceNotFoundException | ReflectionException e) {
-                               LOG.warn("Could not read attribute {}.", 
attributeName, e);
-                               return errorValue;
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb7e0b9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d40a0fd..4fb1196 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1878,7 +1878,6 @@ class JobManager(
     jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new 
Gauge[Long] {
       override def getValue: Long = JobManager.this.currentJobs.size
     })
-    MetricUtils.instantiateStatusMetrics(jobManagerMetricGroup)
   }
 }
 
@@ -2513,7 +2512,7 @@ object JobManager {
       }
     }
 
-    val jobManagerMetricGroup = new JobManagerMetricGroup(
+    val jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
       metricRegistry,
       configuration.getString(JobManagerOptions.ADDRESS))
 

Reply via email to