rkhachatryan commented on a change in pull request #17387:
URL: https://github.com/apache/flink/pull/17387#discussion_r744578502



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
##########
@@ -93,49 +96,24 @@ public TaskMetricGroup addTaskForJob(
 
         String resolvedJobName = jobName == null || jobName.isEmpty() ? 
jobId.toString() : jobName;
 
-        // we cannot strictly lock both our map modification and the job group 
modification
-        // because it might lead to a deadlock
-        while (true) {
-            // get or create a jobs metric group
-            TaskManagerJobMetricGroup currentJobGroup;
-            synchronized (this) {
-                currentJobGroup = jobs.get(jobId);
-
-                if (currentJobGroup == null || currentJobGroup.isClosed()) {
-                    currentJobGroup =
-                            new TaskManagerJobMetricGroup(registry, this, 
jobId, resolvedJobName);
-                    jobs.put(jobId, currentJobGroup);
-                }
-            }
-
-            // try to add another task. this may fail if we found a 
pre-existing job metrics
-            // group and it is closed concurrently
-            TaskMetricGroup taskGroup =
-                    currentJobGroup.addTask(
-                            jobVertexId, executionAttemptId, taskName, 
subtaskIndex, attemptNumber);
-
-            if (taskGroup != null) {
-                // successfully added the next task
-                return taskGroup;
-            }
+        TaskManagerJobMetricGroup jobGroup = jobs.get(jobId);
 
-            // else fall through the loop
-        }
-    }
-
-    public void removeJobMetricsGroup(JobID jobId, TaskManagerJobMetricGroup 
group) {
-        if (jobId == null || group == null || !group.isClosed()) {
-            return;
+        if (jobGroup == null) {
+            jobGroup = new TaskManagerJobMetricGroup(registry, this, jobId, 
resolvedJobName);
+            jobs.put(jobId, jobGroup);
         }
 
-        synchronized (this) {
-            // optimistically remove the currently contained group, and check 
later if it was
-            // correct
-            TaskManagerJobMetricGroup containedGroup = jobs.remove(jobId);
+        // note that a pre-existing job group can NOT be closed concurrently - 
this is done by the
+        // same TM thread in removeJobMetricsGroup
+        return jobGroup.addTask(
+                jobVertexId, executionAttemptId, taskName, subtaskIndex, 
attemptNumber);
+    }
 
-            // check if another group was actually contained, and restore that 
one
-            if (containedGroup != null && containedGroup != group) {
-                jobs.put(jobId, containedGroup);
+    public void removeJobMetricsGroup(JobID jobId) {

Review comment:
       The method is called from `TaskExecutor.releaseJobResources` which is 
covered by `TaskExecutorTest.testReleasingJobResources` (new test).
   IMO, the method is too simple to have it's own unit test. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to