rkhachatryan commented on a change in pull request #17387:
URL: https://github.com/apache/flink/pull/17387#discussion_r744562540
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
##########
@@ -78,46 +80,35 @@ public TaskMetricGroup addTask(
checkNotNull(executionAttemptID);
checkNotNull(taskName);
- synchronized (this) {
- if (!isClosed()) {
- TaskMetricGroup prior = tasks.get(executionAttemptID);
- if (prior != null) {
- return prior;
- } else {
- TaskMetricGroup task =
- new TaskMetricGroup(
- registry,
- this,
- jobVertexId,
- executionAttemptID,
- taskName,
- subtaskIndex,
- attemptNumber);
- tasks.put(executionAttemptID, task);
- return task;
- }
+ if (!isClosed()) {
+ TaskMetricGroup prior = tasks.get(executionAttemptID);
+ if (prior != null) {
+ return prior;
} else {
- return null;
+ TaskMetricGroup task =
+ new TaskMetricGroup(
+ registry,
+ this,
+ jobVertexId,
+ executionAttemptID,
+ taskName,
+ subtaskIndex,
+ attemptNumber);
+ tasks.put(executionAttemptID, task);
+ return task;
}
+ } else {
+ return null;
}
}
public void removeTaskMetricGroup(ExecutionAttemptID executionId) {
checkNotNull(executionId);
- boolean removeFromParent = false;
- synchronized (this) {
- if (!isClosed() && tasks.remove(executionId) != null &&
tasks.isEmpty()) {
- // this call removed the last task. close this group.
- removeFromParent = true;
- close();
- }
- }
-
- // IMPORTANT: removing from the parent must not happen while holding
the this group's lock,
- // because it would violate the "first parent then subgroup" lock
acquisition order
- if (removeFromParent) {
- parent.removeJobMetricsGroup(jobId, this);
+ if (!isClosed()) {
Review comment:
Thanks, I'll remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]