[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417541#comment-16417541 ] ASF GitHub Bot commented on FLINK-8740: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5755 > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} > *Proposed Solution* > I suspect that there may be other related issues than just the metrics, but a > code change that seems to fix the issue is that, during recovery, to remove > the existing registered Job Metrics: > {code:java} > if (isRecovery) { >log.info(s"Removing metrics for $jobId, new will be added during recover") >jobManagerMetricGroup.removeJob(jobId) > }{code} > I'd be happy to submit this in a PR if that is acceptable to open up the > discussion, but I am not sure the consequences of not closing the previous > JMMG or perhaps simply not re-registering job-level metrics during recovery. > Doing this would seem to entail informing lower levels about the recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417107#comment-16417107 ] ASF GitHub Bot commented on FLINK-8740: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5755 Thanks for the review @zentol. Addressing your last comment while merging this PR. > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} > *Proposed Solution* > I suspect that there may be other related issues than just the metrics, but a > code change that seems to fix the issue is that, during recovery, to remove > the existing registered Job Metrics: > {code:java} > if (isRecovery) { >log.info(s"Removing metrics for $jobId, new will be added during recover") >jobManagerMetricGroup.removeJob(jobId) > }{code} > I'd be happy to submit this in a PR if that is acceptable to open up the > discussion, but I am not sure the consequences of not closing the previous > JMMG or perhaps simply not re-registering job-level metrics during recovery. > Doing this would seem to entail informing lower levels about the recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417106#comment-16417106 ] ASF GitHub Bot commented on FLINK-8740: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5755#discussion_r177695516 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -165,7 +166,7 @@ private final BlobServer blobServer; /** The metrics for the job. */ --- End diff -- Will remove it. > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} > *Proposed Solution* > I suspect that there may be other related issues than just the metrics, but a > code change that seems to fix the issue is that, during recovery, to remove > the existing registered Job Metrics: > {code:java} > if (isRecovery) { >log.info(s"Removing metrics for $jobId, new will be added during recover") >jobManagerMetricGroup.removeJob(jobId) > }{code} > I'd be happy to submit this in a PR if that is acceptable to open up the > discussion, but I am not sure the consequences of not closing the previous > JMMG or perhaps simply not re-registering job-level metrics during recovery. > Doing this would seem to entail informing lower levels about the recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415934#comment-16415934 ] ASF GitHub Bot commented on FLINK-8740: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5755#discussion_r177497889 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -165,7 +166,7 @@ private final BlobServer blobServer; /** The metrics for the job. */ --- End diff -- outdated comment > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} > *Proposed Solution* > I suspect that there may be other related issues than just the metrics, but a > code change that seems to fix the issue is that, during recovery, to remove > the existing registered Job Metrics: > {code:java} > if (isRecovery) { >log.info(s"Removing metrics for $jobId, new will be added during recover") >jobManagerMetricGroup.removeJob(jobId) > }{code} > I'd be happy to submit this in a PR if that is acceptable to open up the > discussion, but I am not sure the consequences of not closing the previous > JMMG or perhaps simply not re-registering job-level metrics during recovery. > Doing this would seem to entail informing lower levels about the recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415799#comment-16415799 ] ASF GitHub Bot commented on FLINK-8740: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5755 @zentol, I've addressed your comments and updated the PR accordingly. > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} > *Proposed Solution* > I suspect that there may be other related issues than just the metrics, but a > code change that seems to fix the issue is that, during recovery, to remove > the existing registered Job Metrics: > {code:java} > if (isRecovery) { >log.info(s"Removing metrics for $jobId, new will be added during recover") >jobManagerMetricGroup.removeJob(jobId) > }{code} > I'd be happy to submit this in a PR if that is acceptable to open up the > discussion, but I am not sure the consequences of not closing the previous > JMMG or perhaps simply not re-registering job-level metrics during recovery. > Doing this would seem to entail informing lower levels about the recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415794#comment-16415794 ] ASF GitHub Bot commented on FLINK-8740: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5755#discussion_r177466102 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.factories; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import javax.annotation.Nonnull; + +/** + * Implementation of {@link JobManagerJobMetricGroupFactory} which makes sure that there is always + * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} registered. + */ +public class DefaultJobManagerJobMetricGroupFactory implements JobManagerJobMetricGroupFactory { + + private final JobManagerMetricGroup jobManagerMetricGroup; + + public DefaultJobManagerJobMetricGroupFactory(@Nonnull JobManagerMetricGroup jobManagerMetricGroup) { + this.jobManagerMetricGroup = jobManagerMetricGroup; + } + + @Override + public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) { + jobManagerMetricGroup.removeJob(jobGraph.getJobID()); --- End diff -- You're right, I will change it like you've described: - Call `JobManagerJobMetricGroup#close` explicitly - Remove `JobManagerMetricGroup#removeJob` from `DefaultJobManagerJobMetricGroupFactory` > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} >
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415665#comment-16415665 ] ASF GitHub Bot commented on FLINK-8740: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5755#discussion_r177436964 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.factories; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import javax.annotation.Nonnull; + +/** + * Implementation of {@link JobManagerJobMetricGroupFactory} which makes sure that there is always + * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} registered. + */ +public class DefaultJobManagerJobMetricGroupFactory implements JobManagerJobMetricGroupFactory { + + private final JobManagerMetricGroup jobManagerMetricGroup; + + public DefaultJobManagerJobMetricGroupFactory(@Nonnull JobManagerMetricGroup jobManagerMetricGroup) { + this.jobManagerMetricGroup = jobManagerMetricGroup; + } + + @Override + public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) { + jobManagerMetricGroup.removeJob(jobGraph.getJobID()); --- End diff -- yes, you would still need the factory when using the `TaskMetricGroup`-like pattern, but then * the factory wouldn't have that odd side-effect * there would be a single _expected_ path on which metrics are unregistered (currently, the last `JobManagerMetricGroup` is implicitly closed when the `JobManagerMetricGroup` is closed) > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision:
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415655#comment-16415655 ] ASF GitHub Bot commented on FLINK-8740: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5755#discussion_r17742 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.factories; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import javax.annotation.Nonnull; + +/** + * Implementation of {@link JobManagerJobMetricGroupFactory} which makes sure that there is always + * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} registered. + */ +public class DefaultJobManagerJobMetricGroupFactory implements JobManagerJobMetricGroupFactory { + + private final JobManagerMetricGroup jobManagerMetricGroup; + + public DefaultJobManagerJobMetricGroupFactory(@Nonnull JobManagerMetricGroup jobManagerMetricGroup) { + this.jobManagerMetricGroup = jobManagerMetricGroup; + } + + @Override + public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) { + jobManagerMetricGroup.removeJob(jobGraph.getJobID()); --- End diff -- I think closing the `JobManagerJobMetricGroup` alone is not enough, because then we still have to create a fresh one. I was a bit hesitant changing the `AbstractMetricGroup` because to me it looked as if you could only add metrics but never remove them (except when closing). So in order to be as little invasive as possible, I went for the approach where we create a fresh metrics group whenever we recreate the `ExecutionGraph`. Given the factory, this could also be easily changed later because whether you create a fresh instance or reset it is an implementation detail. > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415195#comment-16415195 ] ASF GitHub Bot commented on FLINK-8740: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5755#discussion_r177340014 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.factories; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import javax.annotation.Nonnull; + +/** + * Implementation of {@link JobManagerJobMetricGroupFactory} which makes sure that there is always + * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} registered. + */ +public class DefaultJobManagerJobMetricGroupFactory implements JobManagerJobMetricGroupFactory { + + private final JobManagerMetricGroup jobManagerMetricGroup; + + public DefaultJobManagerJobMetricGroupFactory(@Nonnull JobManagerMetricGroup jobManagerMetricGroup) { + this.jobManagerMetricGroup = jobManagerMetricGroup; + } + + @Override + public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) { + jobManagerMetricGroup.removeJob(jobGraph.getJobID()); --- End diff -- you could use the same pattern we use for `TaskMetricGroups`, which remove themselves from their parent `TaskManagerJobMetricGroup` when they are closed. That said I'm not really fond of it. You could also add a `reset` method to the `JobManagerJobMetricGroup` that removes all metrics. This may be more reasonable than replacing the group. > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415078#comment-16415078 ] ASF GitHub Bot commented on FLINK-8740: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5755#discussion_r177317868 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.factories; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import javax.annotation.Nonnull; + +/** + * Implementation of {@link JobManagerJobMetricGroupFactory} which makes sure that there is always + * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} registered. + */ +public class DefaultJobManagerJobMetricGroupFactory implements JobManagerJobMetricGroupFactory { + + private final JobManagerMetricGroup jobManagerMetricGroup; + + public DefaultJobManagerJobMetricGroupFactory(@Nonnull JobManagerMetricGroup jobManagerMetricGroup) { + this.jobManagerMetricGroup = jobManagerMetricGroup; + } + + @Override + public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) { + jobManagerMetricGroup.removeJob(jobGraph.getJobID()); --- End diff -- We could also pass in the `JobManagerMetricGroup` into the `JobMaster`, but I think by only exposing the `JobManagerJobMetricGroup` to it, gives a better separation of concerns. > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} > *Proposed Solution* > I suspect
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415076#comment-16415076 ] ASF GitHub Bot commented on FLINK-8740: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5755#discussion_r177317210 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.factories; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import javax.annotation.Nonnull; + +/** + * Implementation of {@link JobManagerJobMetricGroupFactory} which makes sure that there is always + * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} registered. + */ +public class DefaultJobManagerJobMetricGroupFactory implements JobManagerJobMetricGroupFactory { + + private final JobManagerMetricGroup jobManagerMetricGroup; + + public DefaultJobManagerJobMetricGroupFactory(@Nonnull JobManagerMetricGroup jobManagerMetricGroup) { + this.jobManagerMetricGroup = jobManagerMetricGroup; + } + + @Override + public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) { + jobManagerMetricGroup.removeJob(jobGraph.getJobID()); --- End diff -- Yes, but the `JobMaster` does not have access to the `JobManagerMetricGroup`. That's why it is necessary to remove it here. Can change the return type of `JobManagerMetricGroup#removeJob` but I think it is not strictly necessary. > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415075#comment-16415075 ] ASF GitHub Bot commented on FLINK-8740: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5755#discussion_r177317071 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.factories; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import javax.annotation.Nonnull; + +/** + * Implementation of {@link JobManagerJobMetricGroupFactory} which makes sure that there is always + * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} registered. --- End diff -- True, will change it such that it says that it will remove previously registered `JobManagerJobMetricGroups`. > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} > *Proposed Solution* > I suspect that there may be other related issues than just the metrics, but a > code change that seems to fix the issue is that, during recovery, to remove > the existing registered Job Metrics: > {code:java} > if (isRecovery) { >log.info(s"Removing metrics for $jobId, new will be added during recover") >jobManagerMetricGroup.removeJob(jobId) > }{code} > I'd be happy to submit this in a PR if that is acceptable to open up the > discussion, but I am not sure the consequences of not closing the previous > JMMG or perhaps simply not re-registering job-level metrics during recovery. > Doing this would seem to
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413852#comment-16413852 ] ASF GitHub Bot commented on FLINK-8740: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5755#discussion_r177090323 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.factories; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import javax.annotation.Nonnull; + +/** + * Implementation of {@link JobManagerJobMetricGroupFactory} which makes sure that there is always + * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} registered. + */ +public class DefaultJobManagerJobMetricGroupFactory implements JobManagerJobMetricGroupFactory { + + private final JobManagerMetricGroup jobManagerMetricGroup; + + public DefaultJobManagerJobMetricGroupFactory(@Nonnull JobManagerMetricGroup jobManagerMetricGroup) { + this.jobManagerMetricGroup = jobManagerMetricGroup; + } + + @Override + public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) { + jobManagerMetricGroup.removeJob(jobGraph.getJobID()); --- End diff -- This should only be necessary if the group wasn't properly cleaned up previously by the JobMaster, correct? If so I suggest for debugging purposes to modify `removeJob` to return a boolean indicating whether something was actually deleted and log that as a warning. > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name >
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413853#comment-16413853 ] ASF GitHub Bot commented on FLINK-8740: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5755#discussion_r177087225 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.factories; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import javax.annotation.Nonnull; + +/** + * Implementation of {@link JobManagerJobMetricGroupFactory} which makes sure that there is always + * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} registered. --- End diff -- This comment is misleading as the `JobManagerMetricGroup` already ensures that at most one group is _registered_. > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} > *Proposed Solution* > I suspect that there may be other related issues than just the metrics, but a > code change that seems to fix the issue is that, during recovery, to remove > the existing registered Job Metrics: > {code:java} > if (isRecovery) { >log.info(s"Removing metrics for $jobId, new will be added during recover") >jobManagerMetricGroup.removeJob(jobId) > }{code} > I'd be happy to submit this in a PR if that is acceptable to open up the > discussion, but I am not sure the consequences of not closing the previous > JMMG or perhaps simply not re-registering job-level metrics during recovery. > Doing this would seem to entail
[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411511#comment-16411511 ] ASF GitHub Bot commented on FLINK-8740: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5755 [FLINK-8740] [metrics] Create new JobManagerJobMetricGroup when creating a new ExecutionGraph ## What is the purpose of the change Closes the JobManagerJobMetricGroup when suspending the `JobManager`. This allows to reregister the job metrics when the `JobManager` regains its leadership. ## Verifying this change - Tested manually ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink jobLevelMetricsHA Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5755.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5755 commit ef2de68bcad3c4715941a1e61d0a460b4c609520 Author: Till RohrmannDate: 2018-03-23T13:08:49Z [FLINK-8740] [metrics] Create new JobManagerJobMetricGroup when creating a new ExecutionGraph commit 98a49cd032346a2b09641ee6d30baadf9a98855f Author: Till Rohrmann Date: 2018-03-23T14:43:20Z [hotfix] Create ExecutionGraph when JobMaster is started The ExecutionGraph is not a final resource in the JobMaster. For example, it is necessary to create a new ExecutionGraph when rescaling the job or when the JobMaster loses and regains its leadership. commit dcddfeb0b8883964505bed55d5b3730cae9abe60 Author: Till Rohrmann Date: 2018-03-23T14:46:29Z [hotfix] Remove unused fields in JobMaster > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} > *Proposed Solution* > I suspect that there may be other related issues than just the metrics, but a > code change that seems to fix the issue is that, during recovery, to remove > the existing registered Job