[ https://issues.apache.org/jira/browse/FLINK-9543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570175#comment-16570175 ]
ASF GitHub Bot commented on FLINK-9543: --------------------------------------- zentol closed pull request #6195: [FLINK-9543][METRICS] Expose JobMaster ID to metric system URL: https://github.com/apache/flink/pull/6195 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index a267abb4137..9a2619f1e1a 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -334,9 +334,10 @@ protected void startClusterComponents( LOG.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start(); + final ResourceID resourceManagerID = ResourceID.generate(); resourceManager = createResourceManager( configuration, - ResourceID.generate(), + resourceManagerID, rpcService, highAvailabilityServices, heartbeatServices, @@ -345,7 +346,7 @@ protected void startClusterComponents( clusterInformation, webMonitorEndpoint.getRestBaseUrl()); - jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress()); + jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress(), resourceManagerID.getResourceIdString()); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java index e09051d7160..db813255791 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java @@ -39,10 +39,12 @@ private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>(); private final String hostname; + private final String jobManagerId; - public JobManagerMetricGroup(MetricRegistry registry, String hostname) { + public JobManagerMetricGroup(MetricRegistry registry, String hostname, String jobManagerId) { super(registry, registry.getScopeFormats().getJobManagerFormat().formatScope(hostname), null); this.hostname = hostname; + this.jobManagerId = jobManagerId; } public String hostname() { @@ -102,6 +104,7 @@ public int numRegisteredJobMetricGroups() { @Override protected void putVariables(Map<String, String> variables) { variables.put(ScopeFormat.SCOPE_HOST, hostname); + variables.put(ScopeFormat.SCOPE_JOBMANAGER_ID, jobManagerId); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java index 3869aa642f9..de2f7303e30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -62,9 +63,10 @@ public static OperatorMetricGroup createUnregisteredOperatorMetricGroup() { */ public static class UnregisteredJobManagerMetricGroup extends JobManagerMetricGroup { private static final String DEFAULT_HOST_NAME = "UnregisteredHost"; + private static final String DEFAULT_JOBMANAGER_ID = ResourceID.generate().getResourceIdString(); private UnregisteredJobManagerMetricGroup() { - super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME); + super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME, DEFAULT_JOBMANAGER_ID); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java index eb5ea7d4d29..898578076ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java @@ -70,6 +70,10 @@ public String filterCharacters(String input) { public static final String SCOPE_HOST = asVariable("host"); + // ----- Job Manager ---- + + public static final String SCOPE_JOBMANAGER_ID = asVariable("jm_id"); + // ----- Task Manager ---- public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index 3fd268a1aeb..b350bcf400f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -57,10 +57,12 @@ private MetricUtils() { public static JobManagerMetricGroup instantiateJobManagerMetricGroup( final MetricRegistry metricRegistry, - final String hostname) { + final String hostname, + final String jobManagerId) { final JobManagerMetricGroup jobManagerMetricGroup = new JobManagerMetricGroup( metricRegistry, - hostname); + hostname, + jobManagerId); MetricGroup statusGroup = jobManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index aca4fdbea1f..409f78a59f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -296,7 +296,9 @@ public void start() throws Exception { // bring up the ResourceManager(s) LOG.info("Starting ResourceManger"); + final ResourceID resourceManagerID = ResourceID.generate(); resourceManagerRunner = startResourceManager( + resourceManagerID, configuration, haServices, heartbeatServices, @@ -356,7 +358,7 @@ public void start() throws Exception { // bring up the dispatcher that launches JobManagers when jobs submitted LOG.info("Starting job dispatcher(s) for JobManger"); - this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost"); + this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost", resourceManagerID.getResourceIdString()); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint); @@ -762,6 +764,7 @@ protected RpcService createRpcService( } protected ResourceManagerRunner startResourceManager( + ResourceID resourceId, Configuration configuration, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, @@ -770,7 +773,7 @@ protected ResourceManagerRunner startResourceManager( ClusterInformation clusterInformation) throws Exception { final ResourceManagerRunner resourceManagerRunner = new ResourceManagerRunner( - ResourceID.generate(), + resourceId, FlinkResourceManager.RESOURCE_MANAGER_NAME + '_' + UUID.randomUUID(), configuration, resourceManagerRpcService, diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index cebff5881ac..ff498b7ab44 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2517,7 +2517,8 @@ object JobManager { val jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, - configuration.getString(JobManagerOptions.ADDRESS)) + configuration.getString(JobManagerOptions.ADDRESS), + ResourceID.generate.getResourceIdString) (instanceManager, scheduler, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index cb5ec67c97c..8cdb6125c1e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.MetricRegistryImpl; @@ -46,7 +47,8 @@ @Test public void addAndRemoveJobs() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); + String jobManagerId = ResourceID.generate().getResourceIdString(); + final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost", jobManagerId); final JobID jid1 = new JobID(); final JobID jid2 = new JobID(); @@ -78,7 +80,8 @@ public void addAndRemoveJobs() throws Exception { @Test public void testCloseClosesAll() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); + String jobManagerId = ResourceID.generate().getResourceIdString(); + final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost", jobManagerId); final JobID jid1 = new JobID(); final JobID jid2 = new JobID(); @@ -104,7 +107,8 @@ public void testCloseClosesAll() throws Exception { @Test public void testGenerateScopeDefault() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); + String jobManagerId = ResourceID.generate().getResourceIdString(); + JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost", jobManagerId); assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents()); assertEquals("localhost.jobmanager.name", group.getMetricIdentifier("name")); @@ -117,8 +121,8 @@ public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant.<host>.foo.<host>"); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); - - JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host"); + String jobManagerId = ResourceID.generate().getResourceIdString(); + JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host", jobManagerId); assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents()); assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); @@ -129,7 +133,8 @@ public void testGenerateScopeCustom() throws Exception { @Test public void testCreateQueryServiceMetricInfo() { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host"); + String jobManagerId = ResourceID.generate().getResourceIdString(); + JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host", jobManagerId); QueryScopeInfo.JobManagerQueryScopeInfo info = jm.createQueryServiceMetricInfo(new DummyCharacterFilter()); assertEquals("", info.scope); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java index 6f4751b07d9..b33aabb8080 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; @@ -40,8 +41,9 @@ @Test public void testGenerateScopeDefault() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + String jobManagerId = ResourceID.generate().getResourceIdString(); - JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); + JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName", jobManagerId); JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); assertArrayEquals( @@ -61,10 +63,11 @@ public void testGenerateScopeCustom() throws Exception { cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc"); cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant.<job_name>"); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); + String jobManagerId = ResourceID.generate().getResourceIdString(); JobID jid = new JobID(); - JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); + JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName", jobManagerId); JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); assertArrayEquals( @@ -84,10 +87,11 @@ public void testGenerateScopeCustomWildcard() throws Exception { cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter"); cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>"); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); + String jobManagerId = ResourceID.generate().getResourceIdString(); JobID jid = new JobID(); - JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); + JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName", jobManagerId); JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); assertArrayEquals( @@ -105,7 +109,9 @@ public void testGenerateScopeCustomWildcard() throws Exception { public void testCreateQueryServiceMetricInfo() { JobID jid = new JobID(); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host"); + String jobManagerId = ResourceID.generate().getResourceIdString(); + + JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host", jobManagerId); JobManagerJobMetricGroup jmj = new JobManagerJobMetricGroup(registry, jm, jid, "jobname"); QueryScopeInfo.JobQueryScopeInfo info = jmj.createQueryServiceMetricInfo(new DummyCharacterFilter()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Expose JobMaster IDs to metric system > ------------------------------------- > > Key: FLINK-9543 > URL: https://issues.apache.org/jira/browse/FLINK-9543 > Project: Flink > Issue Type: New Feature > Components: Local Runtime, Metrics > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > To be able to differentiate between metrics from different taskmanagers we > should expose the Jobmanager ID (i.e. the resourceID) to the metric system, > like we do for TaskManagers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)