[FLINK-9212][REST] Port SubtasksAllAccumulatorsHandler to new REST endpoint
This closes #5893. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/210abeeb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/210abeeb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/210abeeb Branch: refs/heads/release-1.5 Commit: 210abeebe8dd915f38b51a68884e9f42321af8cf Parents: 300dc4c Author: zhouhai02 <zhouha...@meituan.com> Authored: Sun Apr 22 18:59:11 2018 +0800 Committer: zentol <ches...@apache.org> Committed: Wed Apr 25 09:33:21 2018 +0200 ---------------------------------------------------------------------- .../job/SubtasksAllAccumulatorsHandler.java | 82 +++++++++++ .../SubtasksAllAccumulatorsHandlers.java | 75 ++++++++++ .../job/SubtasksAllAccumulatorsInfo.java | 144 +++++++++++++++++++ .../runtime/webmonitor/WebMonitorEndpoint.java | 12 ++ .../job/SubtasksAllAccumulatorsInfoTest.java | 58 ++++++++ 5 files changed, 371 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java new file mode 100644 index 0000000..51efba2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.job.SubtasksAllAccumulatorsInfo; +import org.apache.flink.runtime.rest.messages.job.UserAccumulator; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler for the subtasks all accumulators. + */ +public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexHandler<SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> { + + public SubtasksAllAccumulatorsHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); + } + + @Override + protected SubtasksAllAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionJobVertex jobVertex) throws RestHandlerException { + JobVertexID jobVertexId = jobVertex.getJobVertexId(); + int parallelism = jobVertex.getParallelism(); + + final List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos = new ArrayList<>(); + + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { + TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); + String locationString = location == null ? "(unassigned)" : location.getHostname(); + + StringifiedAccumulatorResult[] accs = vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified(); + List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length); + for (StringifiedAccumulatorResult acc : accs) { + userAccumulators.add(new UserAccumulator(acc.getName(), acc.getType(), acc.getValue())); + } + + subtaskAccumulatorsInfos.add( + new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo( + vertex.getCurrentExecutionAttempt().getParallelSubtaskIndex(), + vertex.getCurrentExecutionAttempt().getAttemptNumber(), + locationString, + userAccumulators + )); + } + + return new SubtasksAllAccumulatorsInfo(jobVertexId, parallelism, subtaskAccumulatorsInfos); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java new file mode 100644 index 0000000..e178c93 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler; +import org.apache.flink.runtime.rest.messages.job.SubtasksAllAccumulatorsInfo; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link SubtasksAllAccumulatorsHandler}. + */ +public class SubtasksAllAccumulatorsHandlers implements MessageHeaders<EmptyRequestBody, SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> { + + private static final SubtasksAllAccumulatorsHandlers INSTANCE = new SubtasksAllAccumulatorsHandlers(); + + public static final String URL = "/jobs" + + "/:" + JobIDPathParameter.KEY + + "/vertices" + + "/:" + JobVertexIdPathParameter.KEY + + "/subtasks/accumulators"; + + private SubtasksAllAccumulatorsHandlers() {} + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class<SubtasksAllAccumulatorsInfo> getResponseClass() { + return SubtasksAllAccumulatorsInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobVertexMessageParameters getUnresolvedMessageParameters() { + return new JobVertexMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static SubtasksAllAccumulatorsHandlers getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java new file mode 100644 index 0000000..ee2535f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.util.Collection; +import java.util.Objects; + +/** + * Response type of the {@link SubtasksAllAccumulatorsHandler}. + */ +public class SubtasksAllAccumulatorsInfo implements ResponseBody { + + public static final String FIELD_NAME_JOB_VERTEX_ID = "id"; + public static final String FIELD_NAME_PARALLELISM = "parallelism"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_JOB_VERTEX_ID) + @JsonSerialize(using = JobVertexIDSerializer.class) + private final JobVertexID jobVertexId; + + @JsonProperty(FIELD_NAME_PARALLELISM) + private final int parallelism; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final Collection<SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos; + + @JsonCreator + public SubtasksAllAccumulatorsInfo( + @JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_VERTEX_ID) JobVertexID jobVertexId, + @JsonProperty(FIELD_NAME_PARALLELISM) int parallelism, + @JsonProperty(FIELD_NAME_SUBTASKS) Collection<SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos) { + this.jobVertexId = Preconditions.checkNotNull(jobVertexId); + this.parallelism = parallelism; + this.subtaskAccumulatorsInfos = Preconditions.checkNotNull(subtaskAccumulatorsInfos); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SubtasksAllAccumulatorsInfo that = (SubtasksAllAccumulatorsInfo) o; + return Objects.equals(jobVertexId, that.jobVertexId) && + parallelism == that.parallelism && + Objects.equals(subtaskAccumulatorsInfos, that.subtaskAccumulatorsInfos); + } + + @Override + public int hashCode() { + return Objects.hash(jobVertexId, parallelism, subtaskAccumulatorsInfos); + } + + // --------------------------------------------------- + // Static inner classes + // --------------------------------------------------- + + /** + * Detailed information about subtask accumulators. + */ + public static class SubtaskAccumulatorsInfo { + public static final String FIELD_NAME_SUBTASK_INDEX = "subtask"; + public static final String FIELD_NAME_ATTEMPT_NUM = "attempt"; + public static final String FIELD_NAME_HOST = "host"; + public static final String FIELD_NAME_USER_ACCUMULATORS = "user-accumulators"; + + + @JsonProperty(FIELD_NAME_SUBTASK_INDEX) + private final int subtaskIndex; + + @JsonProperty(FIELD_NAME_ATTEMPT_NUM) + private final int attemptNum; + + @JsonProperty(FIELD_NAME_HOST) + private final String host; + + @JsonProperty(FIELD_NAME_USER_ACCUMULATORS) + private final Collection<UserAccumulator> userAccumulators; + + @JsonCreator + public SubtaskAccumulatorsInfo( + @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex, + @JsonProperty(FIELD_NAME_ATTEMPT_NUM) int attemptNum, + @JsonProperty(FIELD_NAME_HOST) String host, + @JsonProperty(FIELD_NAME_USER_ACCUMULATORS) Collection<UserAccumulator> userAccumulators) { + + this.subtaskIndex = subtaskIndex; + this.attemptNum = attemptNum; + this.host = Preconditions.checkNotNull(host); + this.userAccumulators = Preconditions.checkNotNull(userAccumulators); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SubtaskAccumulatorsInfo that = (SubtaskAccumulatorsInfo) o; + return subtaskIndex == that.subtaskIndex && + attemptNum == that.attemptNum && + Objects.equals(host, that.host) && + Objects.equals(userAccumulators, that.userAccumulators); + } + + @Override + public int hashCode() { + return Objects.hash(subtaskIndex, attemptNum, host, userAccumulators); + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 0ea7550..1a67d92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler; import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler; import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler; import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler; import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; @@ -94,6 +95,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders; import org.apache.flink.runtime.rest.messages.JobVertexDetailsHeaders; import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersHeaders; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; +import org.apache.flink.runtime.rest.messages.SubtasksAllAccumulatorsHandlers; import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders; import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; import org.apache.flink.runtime.rest.messages.YarnCancelJobTerminationHeaders; @@ -318,6 +320,15 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp executionGraphCache, executor); + SubtasksAllAccumulatorsHandler subtasksAllAccumulatorsHandler = new SubtasksAllAccumulatorsHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders, + SubtasksAllAccumulatorsHandlers.getInstance(), + executionGraphCache, + executor); + TaskManagersHandler taskManagersHandler = new TaskManagersHandler( restAddressFuture, leaderRetriever, @@ -575,6 +586,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler)); handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler)); handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler)); + handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), subtasksAllAccumulatorsHandler)); handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler)); handlers.add(Tuple2.of(JobAccumulatorsHeaders.getInstance(), jobAccumulatorsHandler)); handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler)); http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java new file mode 100644 index 0000000..2a71239 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests (un)marshalling of the {@link SubtasksAllAccumulatorsInfo}. + */ +public class SubtasksAllAccumulatorsInfoTest extends RestResponseMarshallingTestBase<SubtasksAllAccumulatorsInfo> { + @Override + protected Class<SubtasksAllAccumulatorsInfo> getTestResponseClass() { + return SubtasksAllAccumulatorsInfo.class; + } + + @Override + protected SubtasksAllAccumulatorsInfo getTestResponseInstance() throws Exception { + List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos = new ArrayList<>(3); + + List<UserAccumulator> userAccumulators = new ArrayList<>(2); + userAccumulators.add(new UserAccumulator("test name1", "test type1", "test value1")); + userAccumulators.add(new UserAccumulator("test name2", "test type2", "test value2")); + + for (int i = 0; i < 3; ++i) { + subtaskAccumulatorsInfos.add( + new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo( + i, + i, + "host-" + String.valueOf(i), + userAccumulators + )); + + } + return new SubtasksAllAccumulatorsInfo(new JobVertexID(), + 4, + subtaskAccumulatorsInfos); + } +}