[FLINK-6183] [metrics] Prevent some cases of TaskMG not being closed

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dabb0bac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dabb0bac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dabb0bac

Branch: refs/heads/table-retraction
Commit: dabb0bac0f724d50dcab5b3b767f38dc5feeb407
Parents: dc13500
Author: zentol <ches...@apache.org>
Authored: Fri Mar 24 19:39:31 2017 +0100
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 5 23:18:08 2017 +0200

----------------------------------------------------------------------
 .../groups/TaskManagerJobMetricGroup.java       | 25 ++++++++++++--------
 .../apache/flink/runtime/taskmanager/Task.java  |  9 +++++++
 2 files changed, 24 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dabb0bac/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
index 1ac8140..79a87d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
@@ -72,16 +72,21 @@ public class TaskManagerJobMetricGroup extends 
JobMetricGroup<TaskManagerMetricG
 
                synchronized (this) {
                        if (!isClosed()) {
-                               TaskMetricGroup task = new TaskMetricGroup(
-                                       registry,
-                                       this,
-                                       jobVertexId,
-                                       executionAttemptID,
-                                       taskName,
-                                       subtaskIndex,
-                                       attemptNumber);
-                               tasks.put(executionAttemptID, task);
-                               return task;
+                               TaskMetricGroup prior = 
tasks.get(executionAttemptID);
+                               if (prior != null) {
+                                       return prior;
+                               } else {
+                                       TaskMetricGroup task = new 
TaskMetricGroup(
+                                               registry,
+                                               this,
+                                               jobVertexId,
+                                               executionAttemptID,
+                                               taskName,
+                                               subtaskIndex,
+                                               attemptNumber);
+                                       tasks.put(executionAttemptID, task);
+                                       return task;
+                               }
                        } else {
                                return null;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/dabb0bac/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index b0f0eb8..ef934de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -526,16 +526,25 @@ public class Task implements Runnable, TaskActions {
                        else if (current == ExecutionState.FAILED) {
                                // we were immediately failed. tell the 
TaskManager that we reached our final state
                                notifyFinalState();
+                               if (metrics != null) {
+                                       metrics.close();
+                               }
                                return;
                        }
                        else if (current == ExecutionState.CANCELING) {
                                if (transitionState(ExecutionState.CANCELING, 
ExecutionState.CANCELED)) {
                                        // we were immediately canceled. tell 
the TaskManager that we reached our final state
                                        notifyFinalState();
+                                       if (metrics != null) {
+                                               metrics.close();
+                                       }
                                        return;
                                }
                        }
                        else {
+                               if (metrics != null) {
+                                       metrics.close();
+                               }
                                throw new IllegalStateException("Invalid state 
for beginning of operation of task " + this + '.');
                        }
                }

Reply via email to