This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f436b20429b55ada2a9e8936a5e80fc672a397de Author: Gen Luo <luogen...@gmail.com> AuthorDate: Sun Jul 31 22:11:46 2022 +0800 [FLINK-28588][rest] Acquire information of all current executions in REST handlers if applicable This closes #20296. --- .../src/test/resources/rest_api_v1.snapshot | 31 +++ .../handler/job/AbstractSubtaskAttemptHandler.java | 15 +- .../rest/handler/job/JobDetailsHandler.java | 2 + .../rest/handler/job/JobExceptionsHandler.java | 33 ++-- .../handler/job/JobVertexBackPressureHandler.java | 87 +++++++-- .../rest/handler/job/JobVertexDetailsHandler.java | 17 +- .../handler/job/JobVertexTaskManagersHandler.java | 73 +++++--- .../job/SubtaskCurrentAttemptDetailsHandler.java | 19 +- ...SubtaskExecutionAttemptAccumulatorsHandler.java | 38 ++-- .../job/SubtaskExecutionAttemptDetailsHandler.java | 52 +++--- .../job/SubtasksAllAccumulatorsHandler.java | 32 ++-- .../rest/handler/job/SubtasksTimesHandler.java | 3 +- .../rest/messages/JobVertexBackPressureInfo.java | 45 ++++- .../job/SubtaskExecutionAttemptDetailsInfo.java | 105 +++++++---- .../threadinfo/JobVertexThreadInfoTracker.java | 24 +-- .../job/JobVertexBackPressureHandlerTest.java | 207 +++++++++++++++++++++ .../SubtaskCurrentAttemptDetailsHandlerTest.java | 3 +- .../SubtaskExecutionAttemptDetailsHandlerTest.java | 3 +- .../messages/AggregatedTaskDetailsInfoTest.java | 3 +- .../messages/JobVertexBackPressureInfoTest.java | 24 ++- .../rest/messages/JobVertexDetailsInfoTest.java | 24 ++- .../SubtaskExecutionAttemptDetailsInfoTest.java | 3 +- 22 files changed, 651 insertions(+), 192 deletions(-) diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 873e5062d7d..85337fa8795 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -2403,6 +2403,13 @@ "type" : "integer" } }, + "other-concurrent-attempts" : { + "type" : "array", + "items" : { + "type" : "object", + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo" + } + }, "start_time" : { "type" : "integer" } @@ -2522,6 +2529,9 @@ "subtask" : { "type" : "integer" }, + "attempt-number" : { + "type" : "integer" + }, "backpressure-level" : { "type" : "string", "enum" : [ "ok", "low", "high" ] @@ -2534,6 +2544,13 @@ }, "busyRatio" : { "type" : "number" + }, + "other-concurrent-attempts" : { + "type" : "array", + "items" : { + "type" : "object", + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexBackPressureInfo:SubtaskBackPressureInfo" + } } } } @@ -2803,6 +2820,13 @@ "type" : "integer" } }, + "other-concurrent-attempts" : { + "type" : "array", + "items" : { + "type" : "object", + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo" + } + }, "start_time" : { "type" : "integer" } @@ -2904,6 +2928,13 @@ "type" : "integer" } }, + "other-concurrent-attempts" : { + "type" : "array", + "items" : { + "type" : "object", + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo" + } + }, "start_time" : { "type" : "integer" } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java index 15d128e43e2..04b066860ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; @@ -88,11 +89,17 @@ public abstract class AbstractSubtaskAttemptHandler< throws RestHandlerException { final Integer attemptNumber = request.getPathParameter(SubtaskAttemptPathParameter.class); - final AccessExecution currentAttempt = executionVertex.getCurrentExecutionAttempt(); + final Collection<AccessExecution> currentExecutions = + executionVertex.getCurrentExecutions(); final ExecutionHistory executionHistory = executionVertex.getExecutionHistory(); - if (attemptNumber == currentAttempt.getAttemptNumber()) { - return handleRequest(request, currentAttempt); - } else if (executionHistory.isValidAttemptNumber(attemptNumber)) { + + for (AccessExecution currentExecution : currentExecutions) { + if (attemptNumber == currentExecution.getAttemptNumber()) { + return handleRequest(request, currentExecution); + } + } + + if (executionHistory.isValidAttemptNumber(attemptNumber)) { final Optional<? extends AccessExecution> execution = executionHistory.getHistoricalExecution(attemptNumber); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java index 4f850bc0712..f38a7688ab9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java @@ -202,6 +202,8 @@ public class JobDetailsHandler MutableIOMetrics counts = new MutableIOMetrics(); for (AccessExecutionVertex vertex : ejv.getTaskVertices()) { + // Here we use the metrics of one of the current attempts to represent the subtask, + // rather than the aggregation of all attempts. counts.addIOMetrics( vertex.getCurrentExecutionAttempt(), metricFetcher, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index c500792c6aa..4ae57f6bc28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; @@ -127,22 +128,24 @@ public class JobExceptionsHandler List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new ArrayList<>(); boolean truncated = false; for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) { - Optional<ErrorInfo> failure = task.getFailureInfo(); - if (failure.isPresent()) { - if (taskExceptionList.size() >= exceptionToReportMaxSize) { - truncated = true; - break; + for (AccessExecution execution : task.getCurrentExecutions()) { + Optional<ErrorInfo> failure = execution.getFailureInfo(); + if (failure.isPresent()) { + if (taskExceptionList.size() >= exceptionToReportMaxSize) { + truncated = true; + break; + } + + TaskManagerLocation location = execution.getAssignedResourceLocation(); + String locationString = toString(location); + long timestamp = execution.getStateTimestamp(ExecutionState.FAILED); + taskExceptionList.add( + new JobExceptionsInfo.ExecutionExceptionInfo( + failure.get().getExceptionAsString(), + task.getTaskNameWithSubtaskIndex(), + locationString, + timestamp == 0 ? -1 : timestamp)); } - - TaskManagerLocation location = task.getCurrentAssignedResourceLocation(); - String locationString = toString(location); - long timestamp = task.getStateTimestamp(ExecutionState.FAILED); - taskExceptionList.add( - new JobExceptionsInfo.ExecutionExceptionInfo( - failure.get().getExceptionAsString(), - task.getTaskNameWithSubtaskIndex(), - locationString, - timestamp == 0 ? -1 : timestamp)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java index 895fe7011ba..898b74ba0d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.ComponentMetricStore; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.SubtaskMetricStore; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.TaskMetricStore; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; @@ -39,6 +40,7 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Comparator; @@ -80,18 +82,23 @@ public class JobVertexBackPressureHandler metricFetcher .getMetricStore() .getTaskMetricStore(jobId.toString(), jobVertexId.toString()); + Map<String, Map<Integer, Integer>> jobCurrentExecutions = + metricFetcher.getMetricStore().getCurrentExecutionAttempts().get(jobId.toString()); + Map<Integer, Integer> currentExecutionAttempts = + jobCurrentExecutions != null + ? jobCurrentExecutions.get(jobVertexId.toString()) + : null; return CompletableFuture.completedFuture( taskMetricStore != null - ? createJobVertexBackPressureInfo( - taskMetricStore.getAllSubtaskMetricStores()) + ? createJobVertexBackPressureInfo(taskMetricStore, currentExecutionAttempts) : JobVertexBackPressureInfo.deprecated()); } private JobVertexBackPressureInfo createJobVertexBackPressureInfo( - Map<Integer, ComponentMetricStore> allSubtaskMetricStores) { + TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) { List<SubtaskBackPressureInfo> subtaskBackPressureInfos = - createSubtaskBackPressureInfo(allSubtaskMetricStores); + createSubtaskBackPressureInfo(taskMetricStore, currentExecutionAttempts); return new JobVertexBackPressureInfo( JobVertexBackPressureInfo.VertexBackPressureStatus.OK, getBackPressureLevel(getMaxBackPressureRatio(subtaskBackPressureInfos)), @@ -100,26 +107,72 @@ public class JobVertexBackPressureHandler } private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo( - Map<Integer, ComponentMetricStore> subtaskMetricStores) { + TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) { + Map<Integer, SubtaskMetricStore> subtaskMetricStores = + taskMetricStore.getAllSubtaskMetricStores(); List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size()); - for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) { + for (Map.Entry<Integer, SubtaskMetricStore> entry : subtaskMetricStores.entrySet()) { int subtaskIndex = entry.getKey(); - ComponentMetricStore subtaskMetricStore = entry.getValue(); - double backPressureRatio = getBackPressureRatio(subtaskMetricStore); - double idleRatio = getIdleRatio(subtaskMetricStore); - double busyRatio = getBusyRatio(subtaskMetricStore); - result.add( - new SubtaskBackPressureInfo( - subtaskIndex, - getBackPressureLevel(backPressureRatio), - backPressureRatio, - idleRatio, - busyRatio)); + SubtaskMetricStore subtaskMetricStore = entry.getValue(); + Map<Integer, ComponentMetricStore> allAttemptsMetricStores = + subtaskMetricStore.getAllAttemptsMetricStores(); + if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) { + result.add( + createSubtaskAttemptBackpressureInfo( + subtaskIndex, null, subtaskMetricStore, null)); + } else { + int currentAttempt = + currentExecutionAttempts == null + ? -1 + : currentExecutionAttempts.getOrDefault(subtaskIndex, -1); + if (!allAttemptsMetricStores.containsKey(currentAttempt)) { + // allAttemptsMetricStores is not empty here + currentAttempt = allAttemptsMetricStores.keySet().iterator().next(); + } + List<SubtaskBackPressureInfo> otherConcurrentAttempts = + new ArrayList<>(allAttemptsMetricStores.size() - 1); + for (Map.Entry<Integer, ComponentMetricStore> attemptStore : + allAttemptsMetricStores.entrySet()) { + if (attemptStore.getKey() == currentAttempt) { + continue; + } + otherConcurrentAttempts.add( + createSubtaskAttemptBackpressureInfo( + subtaskIndex, + attemptStore.getKey(), + attemptStore.getValue(), + null)); + } + result.add( + createSubtaskAttemptBackpressureInfo( + subtaskIndex, + currentAttempt, + allAttemptsMetricStores.get(currentAttempt), + otherConcurrentAttempts)); + } } result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask)); return result; } + private SubtaskBackPressureInfo createSubtaskAttemptBackpressureInfo( + int subtaskIndex, + @Nullable Integer attemptNumber, + ComponentMetricStore metricStore, + @Nullable List<SubtaskBackPressureInfo> otherConcurrentAttempts) { + double backPressureRatio = getBackPressureRatio(metricStore); + double idleRatio = getIdleRatio(metricStore); + double busyRatio = getBusyRatio(metricStore); + return new SubtaskBackPressureInfo( + subtaskIndex, + attemptNumber, + getBackPressureLevel(backPressureRatio), + backPressureRatio, + idleRatio, + busyRatio, + otherConcurrentAttempts); + } + private double getMaxBackPressureRatio(List<SubtaskBackPressureInfo> subtaskBackPressureInfos) { return subtaskBackPressureInfos.stream() .mapToDouble(backPressureInfo -> backPressureInfo.getBackPressuredRatio()) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java index e4ac7708489..5877567db77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java @@ -120,9 +120,24 @@ public class JobVertexDetailsHandler for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { final AccessExecution execution = vertex.getCurrentExecutionAttempt(); final JobVertexID jobVertexID = jobVertex.getJobVertexId(); + + final Collection<AccessExecution> attempts = vertex.getCurrentExecutions(); + List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null; + + if (attempts.size() > 1) { + otherConcurrentAttempts = new ArrayList<>(); + for (AccessExecution attempt : attempts) { + if (attempt.getAttemptNumber() != execution.getAttemptNumber()) { + otherConcurrentAttempts.add( + SubtaskExecutionAttemptDetailsInfo.create( + attempt, metricFetcher, jobID, jobVertexID, null)); + } + } + } + subtasks.add( SubtaskExecutionAttemptDetailsInfo.create( - execution, metricFetcher, jobID, jobVertexID)); + execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts)); } return new JobVertexDetailsInfo( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java index 3feea441d2f..450535f8c7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; @@ -55,8 +56,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.stream.Collectors; @@ -127,40 +130,48 @@ public class JobVertexTaskManagersHandler AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher metricFetcher) { - // Build a map that groups tasks by TaskManager + // Build a map that groups task executions by TaskManager Map<String, String> taskManagerId2Host = new HashMap<>(); - Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>(); + Map<String, List<AccessExecution>> taskManagerExecutions = new HashMap<>(); + Set<AccessExecution> representativeExecutions = new HashSet<>(); for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { - TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); - String taskManagerHost = - location == null - ? "(unassigned)" - : location.getHostname() + ':' + location.dataPort(); - String taskmanagerId = - location == null ? "(unassigned)" : location.getResourceID().toString(); - taskManagerId2Host.put(taskmanagerId, taskManagerHost); - List<AccessExecutionVertex> vertices = - taskManagerVertices.computeIfAbsent( - taskmanagerId, ignored -> new ArrayList<>(4)); - vertices.add(vertex); + AccessExecution representativeAttempt = vertex.getCurrentExecutionAttempt(); + representativeExecutions.add(representativeAttempt); + + for (AccessExecution execution : vertex.getCurrentExecutions()) { + TaskManagerLocation location = execution.getAssignedResourceLocation(); + String taskManagerHost = + location == null + ? "(unassigned)" + : location.getHostname() + ':' + location.dataPort(); + String taskmanagerId = + location == null ? "(unassigned)" : location.getResourceID().toString(); + taskManagerId2Host.put(taskmanagerId, taskManagerHost); + List<AccessExecution> executions = + taskManagerExecutions.computeIfAbsent( + taskmanagerId, ignored -> new ArrayList<>()); + executions.add(execution); + } } final long now = System.currentTimeMillis(); List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>(4); - for (Map.Entry<String, List<AccessExecutionVertex>> entry : - taskManagerVertices.entrySet()) { + for (Map.Entry<String, List<AccessExecution>> entry : taskManagerExecutions.entrySet()) { String taskmanagerId = entry.getKey(); String host = taskManagerId2Host.get(taskmanagerId); - List<AccessExecutionVertex> taskVertices = entry.getValue(); + List<AccessExecution> executions = entry.getValue(); List<IOMetricsInfo> ioMetricsInfos = new ArrayList<>(); List<Map<ExecutionState, Long>> status = - taskVertices.stream() - .map(AccessExecutionVertex::getCurrentExecutionAttempt) + executions.stream() .map(StatusDurationUtils::getExecutionStateDuration) .collect(Collectors.toList()); + // executionsPerState counts attempts of a subtask separately + int[] executionsPerState = new int[ExecutionState.values().length]; + // tasksPerState counts only the representative attempts, and is used to aggregate the + // task manager state int[] tasksPerState = new int[ExecutionState.values().length]; long startTime = Long.MAX_VALUE; @@ -169,27 +180,32 @@ public class JobVertexTaskManagersHandler MutableIOMetrics counts = new MutableIOMetrics(); - for (AccessExecutionVertex vertex : taskVertices) { - final ExecutionState state = vertex.getExecutionState(); - tasksPerState[state.ordinal()]++; + int representativeAttemptsCount = 0; + for (AccessExecution execution : executions) { + final ExecutionState state = execution.getState(); + executionsPerState[state.ordinal()]++; + if (representativeExecutions.contains(execution)) { + tasksPerState[state.ordinal()]++; + representativeAttemptsCount++; + } // take the earliest start time - long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING); + long started = execution.getStateTimestamp(ExecutionState.DEPLOYING); if (started > 0) { startTime = Math.min(startTime, started); } allFinished &= state.isTerminal(); - endTime = Math.max(endTime, vertex.getStateTimestamp(state)); + endTime = Math.max(endTime, execution.getStateTimestamp(state)); counts.addIOMetrics( - vertex.getCurrentExecutionAttempt(), + execution, metricFetcher, jobID.toString(), jobVertex.getJobVertexId().toString()); MutableIOMetrics current = new MutableIOMetrics(); current.addIOMetrics( - vertex.getCurrentExecutionAttempt(), + execution, metricFetcher, jobID.toString(), jobVertex.getJobVertexId().toString()); @@ -222,9 +238,10 @@ public class JobVertexTaskManagersHandler duration = -1L; } + // Safe when tasksPerState are all zero and representativeAttemptsCount is zero ExecutionState jobVertexState = ExecutionJobVertex.getAggregateJobVertexState( - tasksPerState, taskVertices.size()); + tasksPerState, representativeAttemptsCount); final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo( @@ -243,7 +260,7 @@ public class JobVertexTaskManagersHandler Map<ExecutionState, Integer> statusCounts = new HashMap<>(ExecutionState.values().length); for (ExecutionState state : ExecutionState.values()) { - statusCounts.put(state, tasksPerState[state.ordinal()]); + statusCounts.put(state, executionsPerState[state.ordinal()]); } taskManagersInfoList.add( new JobVertexTaskManagersInfo.TaskManagersInfo( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java index ba25ac5c24f..0e52d088158 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java @@ -37,6 +37,9 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.Preconditions; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -81,7 +84,21 @@ public class SubtaskCurrentAttemptDetailsHandler final JobID jobID = request.getPathParameter(JobIDPathParameter.class); final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class); + final Collection<AccessExecution> attempts = executionVertex.getCurrentExecutions(); + List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null; + + if (attempts.size() > 1) { + otherConcurrentAttempts = new ArrayList<>(); + for (AccessExecution attempt : attempts) { + if (attempt.getAttemptNumber() != execution.getAttemptNumber()) { + otherConcurrentAttempts.add( + SubtaskExecutionAttemptDetailsInfo.create( + attempt, metricFetcher, jobID, jobVertexID, null)); + } + } + } + return SubtaskExecutionAttemptDetailsInfo.create( - execution, metricFetcher, jobID, jobVertexID); + execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java index 7093308bf3b..a188a19cb5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -99,25 +99,25 @@ public class SubtaskExecutionAttemptAccumulatorsHandler List<ArchivedJson> archive = new ArrayList<>(16); for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { for (AccessExecutionVertex subtask : task.getTaskVertices()) { - ResponseBody curAttemptJson = - createAccumulatorInfo(subtask.getCurrentExecutionAttempt()); - String curAttemptPath = - getMessageHeaders() - .getTargetRestEndpointURL() - .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) - .replace( - ':' + JobVertexIdPathParameter.KEY, - task.getJobVertexId().toString()) - .replace( - ':' + SubtaskIndexPathParameter.KEY, - String.valueOf(subtask.getParallelSubtaskIndex())) - .replace( - ':' + SubtaskAttemptPathParameter.KEY, - String.valueOf( - subtask.getCurrentExecutionAttempt() - .getAttemptNumber())); - - archive.add(new ArchivedJson(curAttemptPath, curAttemptJson)); + for (AccessExecution attempt : subtask.getCurrentExecutions()) { + ResponseBody curAttemptJson = createAccumulatorInfo(attempt); + String curAttemptPath = + getMessageHeaders() + .getTargetRestEndpointURL() + .replace( + ':' + JobIDPathParameter.KEY, + graph.getJobID().toString()) + .replace( + ':' + JobVertexIdPathParameter.KEY, + task.getJobVertexId().toString()) + .replace( + ':' + SubtaskIndexPathParameter.KEY, + String.valueOf(subtask.getParallelSubtaskIndex())) + .replace( + ':' + SubtaskAttemptPathParameter.KEY, + String.valueOf(attempt.getAttemptNumber())); + archive.add(new ArchivedJson(curAttemptPath, curAttemptJson)); + } for (AccessExecution attempt : subtask.getExecutionHistory().getHistoricalExecutions()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java index 398a10c57b4..d4eaa00906e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java @@ -102,7 +102,7 @@ public class SubtaskExecutionAttemptDetailsHandler final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class); return SubtaskExecutionAttemptDetailsInfo.create( - execution, metricFetcher, jobID, jobVertexID); + execution, metricFetcher, jobID, jobVertexID, null); } @Override @@ -111,36 +111,38 @@ public class SubtaskExecutionAttemptDetailsHandler List<ArchivedJson> archive = new ArrayList<>(16); for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { for (AccessExecutionVertex subtask : task.getTaskVertices()) { - ResponseBody curAttemptJson = - SubtaskExecutionAttemptDetailsInfo.create( - subtask.getCurrentExecutionAttempt(), - null, - graph.getJobID(), - task.getJobVertexId()); - String curAttemptPath = - getMessageHeaders() - .getTargetRestEndpointURL() - .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) - .replace( - ':' + JobVertexIdPathParameter.KEY, - task.getJobVertexId().toString()) - .replace( - ':' + SubtaskIndexPathParameter.KEY, - String.valueOf(subtask.getParallelSubtaskIndex())) - .replace( - ':' + SubtaskAttemptPathParameter.KEY, - String.valueOf( - subtask.getCurrentExecutionAttempt() - .getAttemptNumber())); - - archive.add(new ArchivedJson(curAttemptPath, curAttemptJson)); + for (AccessExecution attempt : subtask.getCurrentExecutions()) { + ResponseBody curAttemptJson = + SubtaskExecutionAttemptDetailsInfo.create( + attempt, null, graph.getJobID(), task.getJobVertexId(), null); + String curAttemptPath = + getMessageHeaders() + .getTargetRestEndpointURL() + .replace( + ':' + JobIDPathParameter.KEY, + graph.getJobID().toString()) + .replace( + ':' + JobVertexIdPathParameter.KEY, + task.getJobVertexId().toString()) + .replace( + ':' + SubtaskIndexPathParameter.KEY, + String.valueOf(subtask.getParallelSubtaskIndex())) + .replace( + ':' + SubtaskAttemptPathParameter.KEY, + String.valueOf(attempt.getAttemptNumber())); + archive.add(new ArchivedJson(curAttemptPath, curAttemptJson)); + } for (AccessExecution attempt : subtask.getExecutionHistory().getHistoricalExecutions()) { if (attempt != null) { ResponseBody json = SubtaskExecutionAttemptDetailsInfo.create( - attempt, null, graph.getJobID(), task.getJobVertexId()); + attempt, + null, + graph.getJobID(), + task.getJobVertexId(), + null); String path = getMessageHeaders() .getTargetRestEndpointURL() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java index 88c3cc851ad..a47ad6a46be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -75,23 +76,24 @@ public class SubtasksAllAccumulatorsHandler new ArrayList<>(); for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { - TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); - String locationString = location == null ? "(unassigned)" : location.getHostname(); + for (AccessExecution execution : vertex.getCurrentExecutions()) { + TaskManagerLocation location = execution.getAssignedResourceLocation(); + String locationString = location == null ? "(unassigned)" : location.getHostname(); - StringifiedAccumulatorResult[] accs = - vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified(); - List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length); - for (StringifiedAccumulatorResult acc : accs) { - userAccumulators.add( - new UserAccumulator(acc.getName(), acc.getType(), acc.getValue())); - } + StringifiedAccumulatorResult[] accs = execution.getUserAccumulatorsStringified(); + List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length); + for (StringifiedAccumulatorResult acc : accs) { + userAccumulators.add( + new UserAccumulator(acc.getName(), acc.getType(), acc.getValue())); + } - subtaskAccumulatorsInfos.add( - new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo( - vertex.getCurrentExecutionAttempt().getParallelSubtaskIndex(), - vertex.getCurrentExecutionAttempt().getAttemptNumber(), - locationString, - userAccumulators)); + subtaskAccumulatorsInfos.add( + new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo( + execution.getParallelSubtaskIndex(), + execution.getAttemptNumber(), + locationString, + userAccumulators)); + } } return new SubtasksAllAccumulatorsInfo(jobVertexId, parallelism, subtaskAccumulatorsInfos); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java index 32f04899810..ac9c8322c9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java @@ -101,7 +101,8 @@ public class SubtasksTimesHandler int num = 0; for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { - + // Use one of the current execution attempts to represent the subtask, rather than + // adding times info of all attempts. long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps(); ExecutionState status = vertex.getExecutionState(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java index 5415c297cbf..c475b6d7cd5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue; @@ -121,14 +122,22 @@ public class JobVertexBackPressureInfo implements ResponseBody { public static final class SubtaskBackPressureInfo { public static final String FIELD_NAME_SUBTASK = "subtask"; + public static final String FIELD_NAME_ATTEMPT_NUMBER = "attempt-number"; public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; public static final String FIELD_NAME_BACK_PRESSURED_RATIO = "ratio"; public static final String FIELD_NAME_IDLE_RATIO = "idleRatio"; public static final String FIELD_NAME_BUSY_RATIO = "busyRatio"; + public static final String FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS = + "other-concurrent-attempts"; @JsonProperty(FIELD_NAME_SUBTASK) private final int subtask; + @JsonProperty(FIELD_NAME_ATTEMPT_NUMBER) + @JsonInclude(Include.NON_NULL) + @Nullable + private final Integer attemptNumber; + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) private final VertexBackPressureLevel backpressureLevel; @@ -141,18 +150,30 @@ public class JobVertexBackPressureInfo implements ResponseBody { @JsonProperty(FIELD_NAME_BUSY_RATIO) private final double busyRatio; + @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS) + @JsonInclude(Include.NON_EMPTY) + @Nullable + private final List<SubtaskBackPressureInfo> otherConcurrentAttempts; + + // otherConcurrentAttempts and attemptNumber are Nullable since Jackson will assign null if + // the fields are absent while parsing public SubtaskBackPressureInfo( @JsonProperty(FIELD_NAME_SUBTASK) int subtask, + @JsonProperty(FIELD_NAME_ATTEMPT_NUMBER) @Nullable Integer attemptNumber, @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) VertexBackPressureLevel backpressureLevel, @JsonProperty(FIELD_NAME_BACK_PRESSURED_RATIO) double backPressuredRatio, @JsonProperty(FIELD_NAME_IDLE_RATIO) double idleRatio, - @JsonProperty(FIELD_NAME_BUSY_RATIO) double busyRatio) { + @JsonProperty(FIELD_NAME_BUSY_RATIO) double busyRatio, + @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS) @Nullable + List<SubtaskBackPressureInfo> otherConcurrentAttempts) { this.subtask = subtask; + this.attemptNumber = attemptNumber; this.backpressureLevel = checkNotNull(backpressureLevel); this.backPressuredRatio = backPressuredRatio; this.idleRatio = idleRatio; this.busyRatio = busyRatio; + this.otherConcurrentAttempts = otherConcurrentAttempts; } @Override @@ -165,16 +186,24 @@ public class JobVertexBackPressureInfo implements ResponseBody { } SubtaskBackPressureInfo that = (SubtaskBackPressureInfo) o; return subtask == that.subtask + && Objects.equals(attemptNumber, that.attemptNumber) && backPressuredRatio == that.backPressuredRatio && idleRatio == that.idleRatio && busyRatio == that.busyRatio - && Objects.equals(backpressureLevel, that.backpressureLevel); + && Objects.equals(backpressureLevel, that.backpressureLevel) + && Objects.equals(otherConcurrentAttempts, that.otherConcurrentAttempts); } @Override public int hashCode() { return Objects.hash( - subtask, backpressureLevel, backPressuredRatio, idleRatio, busyRatio); + subtask, + attemptNumber, + backpressureLevel, + backPressuredRatio, + idleRatio, + busyRatio, + otherConcurrentAttempts); } public int getSubtask() { @@ -196,6 +225,16 @@ public class JobVertexBackPressureInfo implements ResponseBody { public double getBusyRatio() { return busyRatio; } + + @Nullable + public Integer getAttemptNumber() { + return attemptNumber; + } + + @Nullable + public List<SubtaskBackPressureInfo> getOtherConcurrentAttempts() { + return otherConcurrentAttempts; + } } /** Status of vertex back-pressure. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java index 56577fb260b..1a8463ab559 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java @@ -30,12 +30,16 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import io.swagger.v3.oas.annotations.Hidden; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -66,6 +70,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { public static final String FIELD_NAME_STATUS_DURATION = "status-duration"; + public static final String FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS = "other-concurrent-attempts"; + @JsonProperty(FIELD_NAME_SUBTASK_INDEX) private final int subtaskIndex; @@ -100,7 +106,13 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { @JsonProperty(FIELD_NAME_STATUS_DURATION) private final Map<ExecutionState, Long> statusDuration; + @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS) + @JsonInclude(Include.NON_EMPTY) + @Nullable + private final List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts; + @JsonCreator + // blocked is Nullable since Jackson will assign null if the field is absent while parsing public SubtaskExecutionAttemptDetailsInfo( @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex, @JsonProperty(FIELD_NAME_STATUS) ExecutionState status, @@ -111,7 +123,9 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { @JsonProperty(FIELD_NAME_DURATION) long duration, @JsonProperty(FIELD_NAME_METRICS) IOMetricsInfo ioMetricsInfo, @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId, - @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration) { + @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration, + @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS) @Nullable + List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts) { this.subtaskIndex = subtaskIndex; this.status = Preconditions.checkNotNull(status); @@ -124,6 +138,7 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { this.ioMetricsInfo = Preconditions.checkNotNull(ioMetricsInfo); this.taskmanagerId = Preconditions.checkNotNull(taskmanagerId); this.statusDuration = Preconditions.checkNotNull(statusDuration); + this.otherConcurrentAttempts = otherConcurrentAttempts; } public int getSubtaskIndex() { @@ -174,51 +189,16 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { return taskmanagerId; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - SubtaskExecutionAttemptDetailsInfo that = (SubtaskExecutionAttemptDetailsInfo) o; - - return subtaskIndex == that.subtaskIndex - && status == that.status - && attempt == that.attempt - && Objects.equals(host, that.host) - && startTime == that.startTime - && startTimeCompatible == that.startTimeCompatible - && endTime == that.endTime - && duration == that.duration - && Objects.equals(ioMetricsInfo, that.ioMetricsInfo) - && Objects.equals(taskmanagerId, that.taskmanagerId) - && Objects.equals(statusDuration, that.statusDuration); - } - - @Override - public int hashCode() { - return Objects.hash( - subtaskIndex, - status, - attempt, - host, - startTime, - startTimeCompatible, - endTime, - duration, - ioMetricsInfo, - taskmanagerId, - statusDuration); + public List<SubtaskExecutionAttemptDetailsInfo> getOtherConcurrentAttempts() { + return otherConcurrentAttempts == null ? new ArrayList<>() : otherConcurrentAttempts; } public static SubtaskExecutionAttemptDetailsInfo create( AccessExecution execution, @Nullable MetricFetcher metricFetcher, JobID jobID, - JobVertexID jobVertexID) { + JobVertexID jobVertexID, + @Nullable List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts) { final ExecutionState status = execution.getState(); final long now = System.currentTimeMillis(); @@ -261,6 +241,49 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { duration, ioMetricsInfo, taskmanagerId, - getExecutionStateDuration(execution)); + getExecutionStateDuration(execution), + otherConcurrentAttempts); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SubtaskExecutionAttemptDetailsInfo that = (SubtaskExecutionAttemptDetailsInfo) o; + + return subtaskIndex == that.subtaskIndex + && status == that.status + && attempt == that.attempt + && Objects.equals(host, that.host) + && startTime == that.startTime + && startTimeCompatible == that.startTimeCompatible + && endTime == that.endTime + && duration == that.duration + && Objects.equals(ioMetricsInfo, that.ioMetricsInfo) + && Objects.equals(taskmanagerId, that.taskmanagerId) + && Objects.equals(statusDuration, that.statusDuration) + && Objects.equals(otherConcurrentAttempts, that.otherConcurrentAttempts); + } + + @Override + public int hashCode() { + return Objects.hash( + subtaskIndex, + status, + attempt, + host, + startTime, + startTimeCompatible, + endTime, + duration, + ioMetricsInfo, + taskmanagerId, + statusDuration, + otherConcurrentAttempts); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java index 76ec84509ec..45b469632b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -257,18 +258,19 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert executionVertex.getExecutionState()); continue; } - TaskManagerLocation tmLocation = executionVertex.getCurrentAssignedResourceLocation(); - if (tmLocation == null) { - LOG.trace("ExecutionVertex {} is currently not assigned", executionVertex); - continue; - } - Set<ExecutionAttemptID> groupedAttemptIds = - executionAttemptsByLocation.getOrDefault(tmLocation, new HashSet<>()); + for (AccessExecution execution : executionVertex.getCurrentExecutions()) { + TaskManagerLocation tmLocation = execution.getAssignedResourceLocation(); + if (tmLocation == null) { + LOG.trace("ExecutionVertex {} is currently not assigned", executionVertex); + continue; + } + Set<ExecutionAttemptID> groupedAttemptIds = + executionAttemptsByLocation.getOrDefault(tmLocation, new HashSet<>()); - ExecutionAttemptID attemptId = - executionVertex.getCurrentExecutionAttempt().getAttemptId(); - groupedAttemptIds.add(attemptId); - executionAttemptsByLocation.put(tmLocation, ImmutableSet.copyOf(groupedAttemptIds)); + ExecutionAttemptID attemptId = execution.getAttemptId(); + groupedAttemptIds.add(attemptId); + executionAttemptsByLocation.put(tmLocation, ImmutableSet.copyOf(groupedAttemptIds)); + } } return executionAttemptsByLocation.entrySet().stream() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java index 85bf3a44cd5..93714132681 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java @@ -46,6 +46,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -141,6 +142,63 @@ class JobVertexBackPressureHandlerTest { }); } + private static Collection<MetricDump> getMultipleAttemptsMetricDumps() { + Collection<MetricDump> dumps = new ArrayList<>(); + TaskQueryScopeInfo task0 = + new TaskQueryScopeInfo( + TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), + TEST_JOB_VERTEX_ID.toString(), + 0, + 0); + dumps.add(new GaugeDump(task0, MetricNames.TASK_BACK_PRESSURED_TIME, "1000")); + dumps.add(new GaugeDump(task0, MetricNames.TASK_IDLE_TIME, "0")); + dumps.add(new GaugeDump(task0, MetricNames.TASK_BUSY_TIME, "0")); + + TaskQueryScopeInfo speculativeTask0 = + new TaskQueryScopeInfo( + TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), + TEST_JOB_VERTEX_ID.toString(), + 0, + 1); + dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_BACK_PRESSURED_TIME, "200")); + dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_IDLE_TIME, "100")); + dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_BUSY_TIME, "800")); + + TaskQueryScopeInfo task1 = + new TaskQueryScopeInfo( + TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), + TEST_JOB_VERTEX_ID.toString(), + 1, + 0); + dumps.add(new GaugeDump(task1, MetricNames.TASK_BACK_PRESSURED_TIME, "500")); + dumps.add(new GaugeDump(task1, MetricNames.TASK_IDLE_TIME, "100")); + dumps.add(new GaugeDump(task1, MetricNames.TASK_BUSY_TIME, "900")); + + TaskQueryScopeInfo speculativeTask1 = + new TaskQueryScopeInfo( + TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), + TEST_JOB_VERTEX_ID.toString(), + 1, + 1); + dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_BACK_PRESSURED_TIME, "900")); + dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_IDLE_TIME, "0")); + dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_BUSY_TIME, "100")); + + // missing task2 + + TaskQueryScopeInfo task3 = + new TaskQueryScopeInfo( + TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), + TEST_JOB_VERTEX_ID.toString(), + 3, + 0); + dumps.add(new GaugeDump(task3, MetricNames.TASK_BACK_PRESSURED_TIME, "100")); + dumps.add(new GaugeDump(task3, MetricNames.TASK_IDLE_TIME, "200")); + dumps.add(new GaugeDump(task3, MetricNames.TASK_BUSY_TIME, "700")); + + return dumps; + } + @Test void testGetBackPressure() throws Exception { final Map<String, String> pathParameters = new HashMap<>(); @@ -220,4 +278,153 @@ class JobVertexBackPressureHandlerTest { assertThat(jobVertexBackPressureInfo.getStatus()) .isEqualTo(VertexBackPressureStatus.DEPRECATED); } + + @Test + void testGetBackPressureFromMultipleCurrentAttempts() throws Exception { + MetricStore multipleAttemptsMetricStore = new MetricStore(); + for (MetricDump metricDump : getMultipleAttemptsMetricDumps()) { + multipleAttemptsMetricStore.add(metricDump); + } + // Update currentExecutionAttempts directly without JobDetails. + Map<Integer, Integer> currentExecutionAttempts = new HashMap<>(); + currentExecutionAttempts.put(0, 1); + currentExecutionAttempts.put(1, 0); + multipleAttemptsMetricStore + .getCurrentExecutionAttempts() + .put( + TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), + Collections.singletonMap( + TEST_JOB_VERTEX_ID.toString(), currentExecutionAttempts)); + + JobVertexBackPressureHandler jobVertexBackPressureHandler = + new JobVertexBackPressureHandler( + () -> CompletableFuture.completedFuture(restfulGateway), + Time.seconds(10), + Collections.emptyMap(), + JobVertexBackPressureHeaders.getInstance(), + new MetricFetcher() { + private long updateCount = 0; + + @Override + public MetricStore getMetricStore() { + return multipleAttemptsMetricStore; + } + + @Override + public void update() { + updateCount++; + } + + @Override + public long getLastUpdateTime() { + return updateCount; + } + }); + + final Map<String, String> pathParameters = new HashMap<>(); + pathParameters.put( + JobIDPathParameter.KEY, TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString()); + pathParameters.put(JobVertexIdPathParameter.KEY, TEST_JOB_VERTEX_ID.toString()); + + final HandlerRequest<EmptyRequestBody> request = + HandlerRequest.resolveParametersAndCreate( + EmptyRequestBody.getInstance(), + new JobVertexMessageParameters(), + pathParameters, + Collections.emptyMap(), + Collections.emptyList()); + + final CompletableFuture<JobVertexBackPressureInfo> + jobVertexBackPressureInfoCompletableFuture = + jobVertexBackPressureHandler.handleRequest(request, restfulGateway); + final JobVertexBackPressureInfo jobVertexBackPressureInfo = + jobVertexBackPressureInfoCompletableFuture.get(); + + assertThat(jobVertexBackPressureInfo.getStatus()).isEqualTo(VertexBackPressureStatus.OK); + assertThat(jobVertexBackPressureInfo.getBackpressureLevel()).isEqualTo(LOW); + + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getAttemptNumber) + .collect(Collectors.toList())) + .containsExactly(1, 0, null); + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .map(SubtaskBackPressureInfo::getAttemptNumber) + .collect(Collectors.toList())) + .containsExactly(0, 1); + + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getBackPressuredRatio) + .collect(Collectors.toList())) + .containsExactly(0.2, 0.5, 0.1); + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .map(SubtaskBackPressureInfo::getBackPressuredRatio) + .collect(Collectors.toList())) + .containsExactly(1.0, 0.9); + + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getIdleRatio) + .collect(Collectors.toList())) + .containsExactly(0.1, 0.1, 0.2); + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .map(SubtaskBackPressureInfo::getIdleRatio) + .collect(Collectors.toList())) + .containsExactly(0.0, 0.0); + + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getBusyRatio) + .collect(Collectors.toList())) + .containsExactly(0.8, 0.9, 0.7); + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .map(SubtaskBackPressureInfo::getBusyRatio) + .collect(Collectors.toList())) + .containsExactly(0.0, 0.1); + + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getBackpressureLevel) + .collect(Collectors.toList())) + .containsExactly(LOW, LOW, OK); + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .map(SubtaskBackPressureInfo::getBackpressureLevel) + .collect(Collectors.toList())) + .containsExactly(HIGH, HIGH); + + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getSubtask) + .collect(Collectors.toList())) + .containsExactly(0, 1, 3); + assertThat( + jobVertexBackPressureInfo.getSubtasks().stream() + .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .map(SubtaskBackPressureInfo::getSubtask) + .collect(Collectors.toList())) + .containsExactly(0, 1); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java index 4c267329a74..70f46c1d302 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -192,7 +192,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { finishedTs - deployingTs, ioMetricsInfo, assignedResourceLocation.getResourceID().getResourceIdString(), - statusDuration); + statusDuration, + null); assertEquals(expectedDetailsInfo, detailsInfo); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java index a044f3f87ff..40f2286d722 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -195,7 +195,8 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger { -1L, ioMetricsInfo, "(unassigned)", - statusDuration); + statusDuration, + null); assertEquals(expectedDetailsInfo, detailsInfo); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java index 3490a05d029..df3e3fef220 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java @@ -77,7 +77,8 @@ public class AggregatedTaskDetailsInfoTest Math.abs(random.nextLong()), ioMetricsInfo, "taskmanagerId", - statusDuration))); + statusDuration, + null))); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java index 0dc78e48a54..566f668837a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java @@ -34,13 +34,31 @@ public class JobVertexBackPressureInfoTest List<JobVertexBackPressureInfo.SubtaskBackPressureInfo> subtaskList = new ArrayList<>(); subtaskList.add( new JobVertexBackPressureInfo.SubtaskBackPressureInfo( - 0, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, 0.1, 0.5, 0.4)); + 0, + 0, + JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, + 0.1, + 0.5, + 0.4, + null)); subtaskList.add( new JobVertexBackPressureInfo.SubtaskBackPressureInfo( - 1, JobVertexBackPressureInfo.VertexBackPressureLevel.OK, 0.4, 0.3, 0.3)); + 1, + 0, + JobVertexBackPressureInfo.VertexBackPressureLevel.OK, + 0.4, + 0.3, + 0.3, + null)); subtaskList.add( new JobVertexBackPressureInfo.SubtaskBackPressureInfo( - 2, JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, 0.9, 0.0, 0.1)); + 2, + 0, + JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, + 0.9, + 0.0, + 0.1, + null)); return new JobVertexBackPressureInfo( JobVertexBackPressureInfo.VertexBackPressureStatus.OK, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java index 11682f37ceb..00b12f7c68d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetails import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -71,11 +72,12 @@ public class JobVertexDetailsInfoTest 1L, jobVertexMetrics, "taskmanagerId1", - statusDuration)); + statusDuration, + null)); vertexTaskDetailList.add( new SubtaskExecutionAttemptDetailsInfo( 1, - ExecutionState.FAILED, + ExecutionState.RUNNING, random.nextInt(), "local2", System.currentTimeMillis(), @@ -83,7 +85,20 @@ public class JobVertexDetailsInfoTest 1L, jobVertexMetrics, "taskmanagerId2", - statusDuration)); + statusDuration, + Collections.singletonList( + new SubtaskExecutionAttemptDetailsInfo( + 1, + ExecutionState.FAILED, + random.nextInt(), + "local2", + System.currentTimeMillis(), + System.currentTimeMillis(), + 1L, + jobVertexMetrics, + "taskmanagerId2", + statusDuration, + null)))); vertexTaskDetailList.add( new SubtaskExecutionAttemptDetailsInfo( 2, @@ -95,7 +110,8 @@ public class JobVertexDetailsInfoTest 1L, jobVertexMetrics, "taskmanagerId3", - statusDuration)); + statusDuration, + null)); int parallelism = 1 + (random.nextInt() / 3); return new JobVertexDetailsInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java index 48158c05b56..c4110811e0c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java @@ -70,6 +70,7 @@ public class SubtaskExecutionAttemptDetailsInfoTest Math.abs(random.nextLong()), ioMetricsInfo, "taskmanagerId", - statusDuration); + statusDuration, + null); } }