[FLINK-7858][flip6] Return with HTTP 404 if job or jobvertex are unknown Annotate AccessExecutionGraph#getJobVertex(JobVertexID) with @Nullable. Throw NotFoundException in JobVertexTaskManagersHandler if jobvertexId is unknown. Throw NotFoundException in AbstractExecutionGraphHandler if jobId is unknown. Copy Javadoc from legacy JobVertexTaskManagersHandler.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37b4e2ce Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37b4e2ce Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37b4e2ce Branch: refs/heads/master Commit: 37b4e2cef687160f2bc7cedb7d2360825089569e Parents: 056c72a Author: gyao <[email protected]> Authored: Wed Jan 24 12:24:35 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Jan 25 15:55:54 2018 +0100 ---------------------------------------------------------------------- .../executiongraph/AccessExecutionGraph.java | 3 +- .../flink/runtime/rest/NotFoundException.java | 4 ++ .../job/AbstractExecutionGraphHandler.java | 15 ++++++- .../job/JobVertexTaskManagersHandler.java | 41 +++++++++++++------- .../messages/JobVertexTaskManagersHeaders.java | 2 + .../messages/JobVertexTaskManagersInfo.java | 12 +++--- 6 files changed, 54 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java index 362afa1..8d1fa1d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java @@ -76,8 +76,9 @@ public interface AccessExecutionGraph { * Returns the job vertex for the given {@link JobVertexID}. * * @param id id of job vertex to be returned - * @return job vertex for the given id, or null + * @return job vertex for the given id, or {@code null} */ + @Nullable AccessExecutionJobVertex getJobVertex(JobVertexID id); /** http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java index 50060b0..f9db334 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java @@ -33,4 +33,8 @@ public class NotFoundException extends RestHandlerException { public NotFoundException(String message) { super(message, HttpResponseStatus.NOT_FOUND); } + + public NotFoundException(String message, Throwable cause) { + super(message, HttpResponseStatus.NOT_FOUND, cause); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java index 7192832..7c42af1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.rest.NotFoundException; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; @@ -32,6 +34,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; @@ -79,8 +82,16 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody, M ex } catch (RestHandlerException rhe) { throw new CompletionException(rhe); } - }, - executor); + }, executor) + .exceptionally(throwable -> { + throwable = ExceptionUtils.stripCompletionException(throwable); + if (throwable instanceof FlinkJobNotFoundException) { + throw new CompletionException( + new NotFoundException(String.format("Job %s not found", jobId), throwable)); + } else { + throw new CompletionException(throwable); + } + }); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java index 9b59e8d..24650a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.NotFoundException; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; @@ -41,6 +42,7 @@ import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; import java.util.ArrayList; import java.util.HashMap; @@ -50,7 +52,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; /** - * Request handler for the job vertex task managers. + * A request handler that provides the details of a job vertex, including id, name, and the + * runtime and metrics of all its subtasks aggregated by TaskManager. */ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> { private MetricFetcher<?> metricFetcher; @@ -65,7 +68,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler< Executor executor, MetricFetcher<?> metricFetcher) { super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); - this.metricFetcher = metricFetcher; + this.metricFetcher = Preconditions.checkNotNull(metricFetcher); } @Override @@ -76,23 +79,24 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler< JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class); AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID); + if (jobVertex == null) { + throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID)); + } + // Build a map that groups tasks by TaskManager Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>(); for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); - String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort(); - List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager); - if (vertices == null) { - vertices = new ArrayList<>(); - taskManagerVertices.put(taskManager, vertices); - } - + String taskManager = location == null ? "(unassigned)" : location.getHostname() + ':' + location.dataPort(); + List<AccessExecutionVertex> vertices = taskManagerVertices.computeIfAbsent( + taskManager, + ignored -> new ArrayList<>(4)); vertices.add(vertex); } final long now = System.currentTimeMillis(); - List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>(); + List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>(4); for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) { String host = entry.getKey(); List<AccessExecutionVertex> taskVertices = entry.getValue(); @@ -141,8 +145,10 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler< duration = -1L; } - ExecutionState jobVertexState = - ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size()); + ExecutionState jobVertexState = ExecutionJobVertex.getAggregateJobVertexState( + tasksPerState, + taskVertices.size()); + final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo( counts.getNumBytesInLocal() + counts.getNumBytesInRemote(), counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(), @@ -153,11 +159,18 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler< counts.getNumRecordsOut(), counts.isNumRecordsOutComplete()); - Map<ExecutionState, Integer> statusCounts = new HashMap<>(); + Map<ExecutionState, Integer> statusCounts = new HashMap<>(ExecutionState.values().length); for (ExecutionState state : ExecutionState.values()) { statusCounts.put(state, tasksPerState[state.ordinal()]); } - taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(host, jobVertexState, startTime, endTime, duration, jobVertexMetrics, statusCounts)); + taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo( + host, + jobVertexState, + startTime, + endTime, + duration, + jobVertexMetrics, + statusCounts)); } return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList); http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java index 311d047..8424095 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java @@ -36,6 +36,8 @@ public class JobVertexTaskManagersHeaders implements MessageHeaders<EmptyRequest "/:" + JobVertexIdPathParameter.KEY + "/taskmanagers"; + private JobVertexTaskManagersHeaders() {} + @Override public Class<EmptyRequestBody> getRequestClass() { return EmptyRequestBody.class; http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java index fc30155..75ff570 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java @@ -30,7 +30,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; -import java.util.List; +import java.util.Collection; import java.util.Map; import java.util.Objects; @@ -56,18 +56,18 @@ public class JobVertexTaskManagersInfo implements ResponseBody { private final long now; @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) - private List<TaskManagersInfo> taskManagers; + private Collection<TaskManagersInfo> taskManagerInfos; @JsonCreator public JobVertexTaskManagersInfo( @JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(VERTEX_TASK_FIELD_ID) JobVertexID jobVertexID, @JsonProperty(VERTEX_TASK_FIELD_NAME) String name, @JsonProperty(VERTEX_TASK_FIELD_NOW) long now, - @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) List<TaskManagersInfo> taskManagers) { + @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) Collection<TaskManagersInfo> taskManagerInfos) { this.jobVertexID = checkNotNull(jobVertexID); this.name = checkNotNull(name); this.now = now; - this.taskManagers = checkNotNull(taskManagers); + this.taskManagerInfos = checkNotNull(taskManagerInfos); } @Override @@ -82,12 +82,12 @@ public class JobVertexTaskManagersInfo implements ResponseBody { return Objects.equals(jobVertexID, that.jobVertexID) && Objects.equals(name, that.name) && now == that.now && - Objects.equals(taskManagers, that.taskManagers); + Objects.equals(taskManagerInfos, that.taskManagerInfos); } @Override public int hashCode() { - return Objects.hash(jobVertexID, name, now, taskManagers); + return Objects.hash(jobVertexID, name, now, taskManagerInfos); } // ---------------------------------------------------
