[FLINK-6183] [metrics] Prevent some cases of TaskMG not being closed
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dabb0bac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dabb0bac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dabb0bac Branch: refs/heads/table-retraction Commit: dabb0bac0f724d50dcab5b3b767f38dc5feeb407 Parents: dc13500 Author: zentol <ches...@apache.org> Authored: Fri Mar 24 19:39:31 2017 +0100 Committer: zentol <ches...@apache.org> Committed: Wed Apr 5 23:18:08 2017 +0200 ---------------------------------------------------------------------- .../groups/TaskManagerJobMetricGroup.java | 25 ++++++++++++-------- .../apache/flink/runtime/taskmanager/Task.java | 9 +++++++ 2 files changed, 24 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dabb0bac/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java index 1ac8140..79a87d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java @@ -72,16 +72,21 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup<TaskManagerMetricG synchronized (this) { if (!isClosed()) { - TaskMetricGroup task = new TaskMetricGroup( - registry, - this, - jobVertexId, - executionAttemptID, - taskName, - subtaskIndex, - attemptNumber); - tasks.put(executionAttemptID, task); - return task; + TaskMetricGroup prior = tasks.get(executionAttemptID); + if (prior != null) { + return prior; + } else { + TaskMetricGroup task = new TaskMetricGroup( + registry, + this, + jobVertexId, + executionAttemptID, + taskName, + subtaskIndex, + attemptNumber); + tasks.put(executionAttemptID, task); + return task; + } } else { return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/dabb0bac/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index b0f0eb8..ef934de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -526,16 +526,25 @@ public class Task implements Runnable, TaskActions { else if (current == ExecutionState.FAILED) { // we were immediately failed. tell the TaskManager that we reached our final state notifyFinalState(); + if (metrics != null) { + metrics.close(); + } return; } else if (current == ExecutionState.CANCELING) { if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) { // we were immediately canceled. tell the TaskManager that we reached our final state notifyFinalState(); + if (metrics != null) { + metrics.close(); + } return; } } else { + if (metrics != null) { + metrics.close(); + } throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.'); } }