[ 
https://issues.apache.org/jira/browse/FLINK-9543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570175#comment-16570175
 ] 

ASF GitHub Bot commented on FLINK-9543:
---------------------------------------

zentol closed pull request #6195: [FLINK-9543][METRICS] Expose JobMaster ID to 
metric system
URL: https://github.com/apache/flink/pull/6195
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index a267abb4137..9a2619f1e1a 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -334,9 +334,10 @@ protected void startClusterComponents(
                        LOG.debug("Starting Dispatcher REST endpoint.");
                        webMonitorEndpoint.start();
 
+                       final ResourceID resourceManagerID = 
ResourceID.generate();
                        resourceManager = createResourceManager(
                                configuration,
-                               ResourceID.generate(),
+                               resourceManagerID,
                                rpcService,
                                highAvailabilityServices,
                                heartbeatServices,
@@ -345,7 +346,7 @@ protected void startClusterComponents(
                                clusterInformation,
                                webMonitorEndpoint.getRestBaseUrl());
 
-                       jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, 
rpcService.getAddress());
+                       jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, 
rpcService.getAddress(), resourceManagerID.getResourceIdString());
 
                        final HistoryServerArchivist historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
webMonitorEndpoint);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index e09051d7160..db813255791 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -39,10 +39,12 @@
        private final Map<JobID, JobManagerJobMetricGroup> jobs = new 
HashMap<>();
 
        private final String hostname;
+       private final String jobManagerId;
 
-       public JobManagerMetricGroup(MetricRegistry registry, String hostname) {
+       public JobManagerMetricGroup(MetricRegistry registry, String hostname, 
String jobManagerId) {
                super(registry, 
registry.getScopeFormats().getJobManagerFormat().formatScope(hostname), null);
                this.hostname = hostname;
+               this.jobManagerId = jobManagerId;
        }
 
        public String hostname() {
@@ -102,6 +104,7 @@ public int numRegisteredJobMetricGroups() {
        @Override
        protected void putVariables(Map<String, String> variables) {
                variables.put(ScopeFormat.SCOPE_HOST, hostname);
+               variables.put(ScopeFormat.SCOPE_JOBMANAGER_ID, jobManagerId);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
index 3869aa642f9..de2f7303e30 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -62,9 +63,10 @@ public static OperatorMetricGroup 
createUnregisteredOperatorMetricGroup() {
         */
        public static class UnregisteredJobManagerMetricGroup extends 
JobManagerMetricGroup {
                private static final String DEFAULT_HOST_NAME = 
"UnregisteredHost";
+               private static final String DEFAULT_JOBMANAGER_ID = 
ResourceID.generate().getResourceIdString();
 
                private UnregisteredJobManagerMetricGroup() {
-                       super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME);
+                       super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME, 
DEFAULT_JOBMANAGER_ID);
                }
 
                @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
index eb5ea7d4d29..898578076ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
@@ -70,6 +70,10 @@ public String filterCharacters(String input) {
 
        public static final String SCOPE_HOST = asVariable("host");
 
+       // ----- Job Manager ----
+
+       public static final String SCOPE_JOBMANAGER_ID = asVariable("jm_id");
+
        // ----- Task Manager ----
 
        public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 3fd268a1aeb..b350bcf400f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -57,10 +57,12 @@ private MetricUtils() {
 
        public static JobManagerMetricGroup instantiateJobManagerMetricGroup(
                        final MetricRegistry metricRegistry,
-                       final String hostname) {
+                       final String hostname,
+                       final String jobManagerId) {
                final JobManagerMetricGroup jobManagerMetricGroup = new 
JobManagerMetricGroup(
                        metricRegistry,
-                       hostname);
+                       hostname,
+                       jobManagerId);
 
                MetricGroup statusGroup = 
jobManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index aca4fdbea1f..409f78a59f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -296,7 +296,9 @@ public void start() throws Exception {
 
                                // bring up the ResourceManager(s)
                                LOG.info("Starting ResourceManger");
+                               final ResourceID resourceManagerID = 
ResourceID.generate();
                                resourceManagerRunner = startResourceManager(
+                                       resourceManagerID,
                                        configuration,
                                        haServices,
                                        heartbeatServices,
@@ -356,7 +358,7 @@ public void start() throws Exception {
                                // bring up the dispatcher that launches 
JobManagers when jobs submitted
                                LOG.info("Starting job dispatcher(s) for 
JobManger");
 
-                               this.jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");
+                               this.jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost", 
resourceManagerID.getResourceIdString());
 
                                final HistoryServerArchivist 
historyServerArchivist = 
HistoryServerArchivist.createHistoryServerArchivist(configuration, 
dispatcherRestEndpoint);
 
@@ -762,6 +764,7 @@ protected RpcService createRpcService(
        }
 
        protected ResourceManagerRunner startResourceManager(
+                   ResourceID resourceId,
                        Configuration configuration,
                        HighAvailabilityServices haServices,
                        HeartbeatServices heartbeatServices,
@@ -770,7 +773,7 @@ protected ResourceManagerRunner startResourceManager(
                        ClusterInformation clusterInformation) throws Exception 
{
 
                final ResourceManagerRunner resourceManagerRunner = new 
ResourceManagerRunner(
-                       ResourceID.generate(),
+                       resourceId,
                        FlinkResourceManager.RESOURCE_MANAGER_NAME + '_' + 
UUID.randomUUID(),
                        configuration,
                        resourceManagerRpcService,
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cebff5881ac..ff498b7ab44 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2517,7 +2517,8 @@ object JobManager {
 
     val jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
       metricRegistry,
-      configuration.getString(JobManagerOptions.ADDRESS))
+      configuration.getString(JobManagerOptions.ADDRESS),
+      ResourceID.generate.getResourceIdString)
 
     (instanceManager,
       scheduler,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index cb5ec67c97c..8cdb6125c1e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
@@ -46,7 +47,8 @@
        @Test
        public void addAndRemoveJobs() throws Exception {
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-               final JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
+               String jobManagerId = 
ResourceID.generate().getResourceIdString();
+               final JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost", jobManagerId);
 
                final JobID jid1 = new JobID();
                final JobID jid2 = new JobID();
@@ -78,7 +80,8 @@ public void addAndRemoveJobs() throws Exception {
        @Test
        public void testCloseClosesAll() throws Exception {
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-               final JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
+               String jobManagerId = 
ResourceID.generate().getResourceIdString();
+               final JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost", jobManagerId);
 
                final JobID jid1 = new JobID();
                final JobID jid2 = new JobID();
@@ -104,7 +107,8 @@ public void testCloseClosesAll() throws Exception {
        @Test
        public void testGenerateScopeDefault() throws Exception {
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
+               String jobManagerId = 
ResourceID.generate().getResourceIdString();
+               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost", jobManagerId);
 
                assertArrayEquals(new String[]{"localhost", "jobmanager"}, 
group.getScopeComponents());
                assertEquals("localhost.jobmanager.name", 
group.getMetricIdentifier("name"));
@@ -117,8 +121,8 @@ public void testGenerateScopeCustom() throws Exception {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_JM, 
"constant.<host>.foo.<host>");
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
-
-               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "host");
+               String jobManagerId = 
ResourceID.generate().getResourceIdString();
+               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "host", jobManagerId);
 
                assertArrayEquals(new String[]{"constant", "host", "foo", 
"host"}, group.getScopeComponents());
                assertEquals("constant.host.foo.host.name", 
group.getMetricIdentifier("name"));
@@ -129,7 +133,8 @@ public void testGenerateScopeCustom() throws Exception {
        @Test
        public void testCreateQueryServiceMetricInfo() {
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-               JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, 
"host");
+               String jobManagerId = 
ResourceID.generate().getResourceIdString();
+               JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, 
"host", jobManagerId);
 
                QueryScopeInfo.JobManagerQueryScopeInfo info = 
jm.createQueryServiceMetricInfo(new DummyCharacterFilter());
                assertEquals("", info.scope);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index 6f4751b07d9..b33aabb8080 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -40,8 +41,9 @@
        @Test
        public void testGenerateScopeDefault() throws Exception {
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               String jobManagerId = 
ResourceID.generate().getResourceIdString();
 
-               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
+               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName", jobManagerId);
                JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, new JobID(), "myJobName");
 
                assertArrayEquals(
@@ -61,10 +63,11 @@ public void testGenerateScopeCustom() throws Exception {
                cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc");
                cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, 
"some-constant.<job_name>");
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
+               String jobManagerId = 
ResourceID.generate().getResourceIdString();
 
                JobID jid = new JobID();
 
-               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
+               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName", jobManagerId);
                JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jid, "myJobName");
 
                assertArrayEquals(
@@ -84,10 +87,11 @@ public void testGenerateScopeCustomWildcard() throws 
Exception {
                cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
                cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, 
"*.some-constant.<job_id>");
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
+               String jobManagerId = 
ResourceID.generate().getResourceIdString();
 
                JobID jid = new JobID();
 
-               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
+               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName", jobManagerId);
                JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jid, "myJobName");
 
                assertArrayEquals(
@@ -105,7 +109,9 @@ public void testGenerateScopeCustomWildcard() throws 
Exception {
        public void testCreateQueryServiceMetricInfo() {
                JobID jid = new JobID();
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-               JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, 
"host");
+               String jobManagerId = 
ResourceID.generate().getResourceIdString();
+
+               JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, 
"host", jobManagerId);
                JobManagerJobMetricGroup jmj = new 
JobManagerJobMetricGroup(registry, jm, jid, "jobname");
 
                QueryScopeInfo.JobQueryScopeInfo info = 
jmj.createQueryServiceMetricInfo(new DummyCharacterFilter());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Expose JobMaster IDs to metric system
> -------------------------------------
>
>                 Key: FLINK-9543
>                 URL: https://issues.apache.org/jira/browse/FLINK-9543
>             Project: Flink
>          Issue Type: New Feature
>          Components: Local Runtime, Metrics
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> To be able to differentiate between metrics from different taskmanagers we 
> should expose the Jobmanager ID (i.e. the resourceID) to the metric system, 
> like we do for TaskManagers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to