[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417541#comment-16417541
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5755


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect that there may be other related issues than just the metrics, but a 
> code change that seems to fix the issue is that, during recovery, to remove 
> the existing registered Job Metrics:
> {code:java}
> if (isRecovery) {
>log.info(s"Removing metrics for $jobId, new will be added during recover")
>jobManagerMetricGroup.removeJob(jobId)
> }{code}
> I'd be happy to submit this in a PR if that is acceptable to open up the 
> discussion, but I am not sure the consequences of not closing the previous 
> JMMG or perhaps simply not re-registering job-level metrics during recovery. 
> Doing this would seem to entail informing lower levels about the recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417107#comment-16417107
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5755
  
Thanks for the review @zentol. Addressing your last comment while merging 
this PR.


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect that there may be other related issues than just the metrics, but a 
> code change that seems to fix the issue is that, during recovery, to remove 
> the existing registered Job Metrics:
> {code:java}
> if (isRecovery) {
>log.info(s"Removing metrics for $jobId, new will be added during recover")
>jobManagerMetricGroup.removeJob(jobId)
> }{code}
> I'd be happy to submit this in a PR if that is acceptable to open up the 
> discussion, but I am not sure the consequences of not closing the previous 
> JMMG or perhaps simply not re-registering job-level metrics during recovery. 
> Doing this would seem to entail informing lower levels about the recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417106#comment-16417106
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5755#discussion_r177695516
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -165,7 +166,7 @@
private final BlobServer blobServer;
 
/** The metrics for the job. */
--- End diff --

Will remove it.


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect that there may be other related issues than just the metrics, but a 
> code change that seems to fix the issue is that, during recovery, to remove 
> the existing registered Job Metrics:
> {code:java}
> if (isRecovery) {
>log.info(s"Removing metrics for $jobId, new will be added during recover")
>jobManagerMetricGroup.removeJob(jobId)
> }{code}
> I'd be happy to submit this in a PR if that is acceptable to open up the 
> discussion, but I am not sure the consequences of not closing the previous 
> JMMG or perhaps simply not re-registering job-level metrics during recovery. 
> Doing this would seem to entail informing lower levels about the recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415934#comment-16415934
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5755#discussion_r177497889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -165,7 +166,7 @@
private final BlobServer blobServer;
 
/** The metrics for the job. */
--- End diff --

outdated comment


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect that there may be other related issues than just the metrics, but a 
> code change that seems to fix the issue is that, during recovery, to remove 
> the existing registered Job Metrics:
> {code:java}
> if (isRecovery) {
>log.info(s"Removing metrics for $jobId, new will be added during recover")
>jobManagerMetricGroup.removeJob(jobId)
> }{code}
> I'd be happy to submit this in a PR if that is acceptable to open up the 
> discussion, but I am not sure the consequences of not closing the previous 
> JMMG or perhaps simply not re-registering job-level metrics during recovery. 
> Doing this would seem to entail informing lower levels about the recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415799#comment-16415799
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5755
  
@zentol, I've addressed your comments and updated the PR accordingly.


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect that there may be other related issues than just the metrics, but a 
> code change that seems to fix the issue is that, during recovery, to remove 
> the existing registered Job Metrics:
> {code:java}
> if (isRecovery) {
>log.info(s"Removing metrics for $jobId, new will be added during recover")
>jobManagerMetricGroup.removeJob(jobId)
> }{code}
> I'd be happy to submit this in a PR if that is acceptable to open up the 
> discussion, but I am not sure the consequences of not closing the previous 
> JMMG or perhaps simply not re-registering job-level metrics during recovery. 
> Doing this would seem to entail informing lower levels about the recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415794#comment-16415794
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5755#discussion_r177466102
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Implementation of {@link JobManagerJobMetricGroupFactory} which makes 
sure that there is always
+ * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} 
registered.
+ */
+public class DefaultJobManagerJobMetricGroupFactory implements 
JobManagerJobMetricGroupFactory {
+
+   private final JobManagerMetricGroup jobManagerMetricGroup;
+
+   public DefaultJobManagerJobMetricGroupFactory(@Nonnull 
JobManagerMetricGroup jobManagerMetricGroup) {
+   this.jobManagerMetricGroup = jobManagerMetricGroup;
+   }
+
+   @Override
+   public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) {
+   jobManagerMetricGroup.removeJob(jobGraph.getJobID());
--- End diff --

You're right, I will change it like you've described:

- Call `JobManagerJobMetricGroup#close` explicitly
- Remove `JobManagerMetricGroup#removeJob` from 
`DefaultJobManagerJobMetricGroupFactory`


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> 

[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415665#comment-16415665
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5755#discussion_r177436964
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Implementation of {@link JobManagerJobMetricGroupFactory} which makes 
sure that there is always
+ * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} 
registered.
+ */
+public class DefaultJobManagerJobMetricGroupFactory implements 
JobManagerJobMetricGroupFactory {
+
+   private final JobManagerMetricGroup jobManagerMetricGroup;
+
+   public DefaultJobManagerJobMetricGroupFactory(@Nonnull 
JobManagerMetricGroup jobManagerMetricGroup) {
+   this.jobManagerMetricGroup = jobManagerMetricGroup;
+   }
+
+   @Override
+   public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) {
+   jobManagerMetricGroup.removeJob(jobGraph.getJobID());
--- End diff --

yes, you would still need the factory when using the `TaskMetricGroup`-like 
pattern, but then
* the factory wouldn't have that odd side-effect
* there would be a single _expected_ path on which metrics are unregistered 
(currently, the last `JobManagerMetricGroup` is implicitly closed when the 
`JobManagerMetricGroup` is closed)


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: 

[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415655#comment-16415655
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5755#discussion_r17742
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Implementation of {@link JobManagerJobMetricGroupFactory} which makes 
sure that there is always
+ * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} 
registered.
+ */
+public class DefaultJobManagerJobMetricGroupFactory implements 
JobManagerJobMetricGroupFactory {
+
+   private final JobManagerMetricGroup jobManagerMetricGroup;
+
+   public DefaultJobManagerJobMetricGroupFactory(@Nonnull 
JobManagerMetricGroup jobManagerMetricGroup) {
+   this.jobManagerMetricGroup = jobManagerMetricGroup;
+   }
+
+   @Override
+   public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) {
+   jobManagerMetricGroup.removeJob(jobGraph.getJobID());
--- End diff --

I think closing the `JobManagerJobMetricGroup` alone is not enough, because 
then we still have to create a fresh one.

I was a bit hesitant changing the `AbstractMetricGroup` because to me it 
looked as if you could only add metrics but never remove them (except when 
closing). So in order to be as little invasive as possible, I went for the 
approach where we create a fresh metrics group whenever we recreate the 
`ExecutionGraph`.

Given the factory, this could also be easily changed later because whether 
you create a fresh instance or reset it is an implementation detail.


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN 

[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415195#comment-16415195
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5755#discussion_r177340014
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Implementation of {@link JobManagerJobMetricGroupFactory} which makes 
sure that there is always
+ * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} 
registered.
+ */
+public class DefaultJobManagerJobMetricGroupFactory implements 
JobManagerJobMetricGroupFactory {
+
+   private final JobManagerMetricGroup jobManagerMetricGroup;
+
+   public DefaultJobManagerJobMetricGroupFactory(@Nonnull 
JobManagerMetricGroup jobManagerMetricGroup) {
+   this.jobManagerMetricGroup = jobManagerMetricGroup;
+   }
+
+   @Override
+   public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) {
+   jobManagerMetricGroup.removeJob(jobGraph.getJobID());
--- End diff --

you could use the same pattern we use for `TaskMetricGroups`, which remove 
themselves from their parent `TaskManagerJobMetricGroup` when they are closed. 
That said I'm not really fond of it.

You could also add a `reset` method to the `JobManagerJobMetricGroup` that 
removes all metrics. This may be more reasonable than replacing the group.


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name 

[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415078#comment-16415078
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5755#discussion_r177317868
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Implementation of {@link JobManagerJobMetricGroupFactory} which makes 
sure that there is always
+ * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} 
registered.
+ */
+public class DefaultJobManagerJobMetricGroupFactory implements 
JobManagerJobMetricGroupFactory {
+
+   private final JobManagerMetricGroup jobManagerMetricGroup;
+
+   public DefaultJobManagerJobMetricGroupFactory(@Nonnull 
JobManagerMetricGroup jobManagerMetricGroup) {
+   this.jobManagerMetricGroup = jobManagerMetricGroup;
+   }
+
+   @Override
+   public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) {
+   jobManagerMetricGroup.removeJob(jobGraph.getJobID());
--- End diff --

We could also pass in the `JobManagerMetricGroup` into the `JobMaster`, but 
I think by only exposing the `JobManagerJobMetricGroup` to it, gives a better 
separation of concerns.


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect 

[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415076#comment-16415076
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5755#discussion_r177317210
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Implementation of {@link JobManagerJobMetricGroupFactory} which makes 
sure that there is always
+ * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} 
registered.
+ */
+public class DefaultJobManagerJobMetricGroupFactory implements 
JobManagerJobMetricGroupFactory {
+
+   private final JobManagerMetricGroup jobManagerMetricGroup;
+
+   public DefaultJobManagerJobMetricGroupFactory(@Nonnull 
JobManagerMetricGroup jobManagerMetricGroup) {
+   this.jobManagerMetricGroup = jobManagerMetricGroup;
+   }
+
+   @Override
+   public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) {
+   jobManagerMetricGroup.removeJob(jobGraph.getJobID());
--- End diff --

Yes, but the `JobMaster` does not have access to the 
`JobManagerMetricGroup`. That's why it is necessary to remove it here. Can 
change the return type of `JobManagerMetricGroup#removeJob` but I think it is 
not strictly necessary.


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be 

[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415075#comment-16415075
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5755#discussion_r177317071
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Implementation of {@link JobManagerJobMetricGroupFactory} which makes 
sure that there is always
+ * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} 
registered.
--- End diff --

True, will change it such that it says that it will remove previously 
registered `JobManagerJobMetricGroups`.


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect that there may be other related issues than just the metrics, but a 
> code change that seems to fix the issue is that, during recovery, to remove 
> the existing registered Job Metrics:
> {code:java}
> if (isRecovery) {
>log.info(s"Removing metrics for $jobId, new will be added during recover")
>jobManagerMetricGroup.removeJob(jobId)
> }{code}
> I'd be happy to submit this in a PR if that is acceptable to open up the 
> discussion, but I am not sure the consequences of not closing the previous 
> JMMG or perhaps simply not re-registering job-level metrics during recovery. 
> Doing this would seem to 

[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413852#comment-16413852
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5755#discussion_r177090323
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Implementation of {@link JobManagerJobMetricGroupFactory} which makes 
sure that there is always
+ * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} 
registered.
+ */
+public class DefaultJobManagerJobMetricGroupFactory implements 
JobManagerJobMetricGroupFactory {
+
+   private final JobManagerMetricGroup jobManagerMetricGroup;
+
+   public DefaultJobManagerJobMetricGroupFactory(@Nonnull 
JobManagerMetricGroup jobManagerMetricGroup) {
+   this.jobManagerMetricGroup = jobManagerMetricGroup;
+   }
+
+   @Override
+   public JobManagerJobMetricGroup create(@Nonnull JobGraph jobGraph) {
+   jobManagerMetricGroup.removeJob(jobGraph.getJobID());
--- End diff --

This should only be necessary if the group wasn't properly cleaned up 
previously by the JobMaster, correct? If so I suggest for debugging purposes to 
modify `removeJob` to return a boolean indicating whether something was 
actually deleted and log that as a warning.


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 

[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413853#comment-16413853
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5755#discussion_r177087225
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobManagerJobMetricGroupFactory.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Implementation of {@link JobManagerJobMetricGroupFactory} which makes 
sure that there is always
+ * at most one {@link JobManagerJobMetricGroup} for a given {@link JobID} 
registered.
--- End diff --

This comment is misleading as the `JobManagerMetricGroup` already ensures 
that at most one group is _registered_.


> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect that there may be other related issues than just the metrics, but a 
> code change that seems to fix the issue is that, during recovery, to remove 
> the existing registered Job Metrics:
> {code:java}
> if (isRecovery) {
>log.info(s"Removing metrics for $jobId, new will be added during recover")
>jobManagerMetricGroup.removeJob(jobId)
> }{code}
> I'd be happy to submit this in a PR if that is acceptable to open up the 
> discussion, but I am not sure the consequences of not closing the previous 
> JMMG or perhaps simply not re-registering job-level metrics during recovery. 
> Doing this would seem to entail 

[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411511#comment-16411511
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5755

[FLINK-8740] [metrics] Create new JobManagerJobMetricGroup when creating a 
new ExecutionGraph

## What is the purpose of the change

Closes the JobManagerJobMetricGroup when suspending the `JobManager`. This 
allows to reregister the job metrics when the `JobManager` regains its 
leadership.

## Verifying this change

- Tested manually

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink jobLevelMetricsHA

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5755


commit ef2de68bcad3c4715941a1e61d0a460b4c609520
Author: Till Rohrmann 
Date:   2018-03-23T13:08:49Z

[FLINK-8740] [metrics] Create new JobManagerJobMetricGroup when creating a 
new ExecutionGraph

commit 98a49cd032346a2b09641ee6d30baadf9a98855f
Author: Till Rohrmann 
Date:   2018-03-23T14:43:20Z

[hotfix] Create ExecutionGraph when JobMaster is started

The ExecutionGraph is not a final resource in the JobMaster. For example, 
it is necessary
to create a new ExecutionGraph when rescaling the job or when the JobMaster 
loses and
regains its leadership.

commit dcddfeb0b8883964505bed55d5b3730cae9abe60
Author: Till Rohrmann 
Date:   2018-03-23T14:46:29Z

[hotfix] Remove unused fields in JobMaster




> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect that there may be other related issues than just the metrics, but a 
> code change that seems to fix the issue is that, during recovery, to remove 
> the existing registered Job