Repository: flink Updated Branches: refs/heads/master 0a286d0ff -> 9829ca00d
[FLINK-7704] [flip6] Add JobPlanHandler for new RestServerEndpoint This closes #4768. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9829ca00 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9829ca00 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9829ca00 Branch: refs/heads/master Commit: 9829ca00dff201879724847b498fe0432219cb53 Parents: 0a286d0 Author: yew1eb <[email protected]> Authored: Wed Oct 4 01:07:49 2017 +0800 Committer: Till Rohrmann <[email protected]> Committed: Tue Oct 10 18:44:06 2017 +0200 ---------------------------------------------------------------------- .../dispatcher/DispatcherRestEndpoint.java | 11 ++ .../rest/handler/job/JobPlanHandler.java | 61 ++++++++++ .../runtime/rest/messages/JobPlanHeaders.java | 71 ++++++++++++ .../runtime/rest/messages/JobPlanInfo.java | 113 +++++++++++++++++++ .../legacy/messages/JobPlanInfoTest.java | 41 +++++++ 5 files changed, 297 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 2a2d9be..6297b41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler; import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; +import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler; import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; @@ -51,6 +52,7 @@ import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders; import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.JobConfigHeaders; +import org.apache.flink.runtime.rest.messages.JobPlanHeaders; import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders; @@ -186,6 +188,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { executor, checkpointStatsCache); + JobPlanHandler jobPlanHandler = new JobPlanHandler( + restAddressFuture, + leaderRetriever, + timeout, + JobPlanHeaders.getInstance(), + executionGraphCache, + executor); + final File tmpDir = restConfiguration.getTmpDir(); Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent; @@ -210,6 +220,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler)); handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler)); handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler)); + handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler)); BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout); handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler)); http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java new file mode 100644 index 0000000..c8e6f8b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java @@ -0,0 +1,61 @@ +package org.apache.flink.runtime.rest.handler.job; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Handler serving the job execution plan. + */ +public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters> { + + public JobPlanHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor) { + + super( + localRestAddress, + leaderRetriever, + timeout, + messageHeaders, + executionGraphCache, + executor); + } + + @Override + protected JobPlanInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException { + return new JobPlanInfo(executionGraph.getJsonPlan()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java new file mode 100644 index 0000000..17204bb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link JobPlanHandler}. + */ +public class JobPlanHeaders implements MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> { + + private static final JobPlanHeaders INSTANCE = new JobPlanHeaders(); + + public static final String URL = "/jobs/:jobid/plan"; + + private JobPlanHeaders() { + } + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class<JobPlanInfo> getResponseClass() { + return JobPlanInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobMessageParameters getUnresolvedMessageParameters() { + return new JobMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static JobPlanHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java new file mode 100644 index 0000000..3987723 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; +import java.util.Objects; + +/** + * Response type of the {@link JobPlanHandler}. + */ +@JsonSerialize(using = JobPlanInfo.Serializer.class) +@JsonDeserialize(using = JobPlanInfo.Deserializer.class) +public class JobPlanInfo implements ResponseBody { + + private final String jsonPlan; + + public JobPlanInfo(String jsonPlan) { + this.jsonPlan = Preconditions.checkNotNull(jsonPlan); + } + + public String getJsonPlan() { + return jsonPlan; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobPlanInfo that = (JobPlanInfo) o; + return Objects.equals(jsonPlan, that.jsonPlan); + } + + @Override + public int hashCode() { + return Objects.hash(jsonPlan); + } + + //--------------------------------------------------------------------------------- + // Static helper classes + //--------------------------------------------------------------------------------- + + /** + * Json serializer for the {@link JobPlanInfo}. + */ + public static final class Serializer extends StdSerializer<JobPlanInfo> { + + private static final long serialVersionUID = -1551666039618928811L; + + public Serializer() { + super(JobPlanInfo.class); + } + + @Override + public void serialize( + JobPlanInfo jobPlanInfo, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeString(jobPlanInfo.getJsonPlan()); + } + } + + /** + * Json deserializer for the {@link JobPlanInfo}. + */ + public static final class Deserializer extends StdDeserializer<JobPlanInfo> { + + private static final long serialVersionUID = -3580088509877177213L; + + public Deserializer() { + super(JobPlanInfo.class); + } + + @Override + public JobPlanInfo deserialize( + JsonParser jsonParser, + DeserializationContext deserializationContext) throws IOException { + final String jsonPlan = jsonParser.getText(); + return new JobPlanInfo(jsonPlan); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java new file mode 100644 index 0000000..1fe51d0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; + +/** + * Tests that the {@link JobPlanInfo} can be marshalled and unmarshalled. + */ +public class JobPlanInfoTest extends RestResponseMarshallingTestBase<JobPlanInfo> { + + @Override + protected Class<JobPlanInfo> getTestResponseClass() { + return JobPlanInfo.class; + } + + @Override + protected JobPlanInfo getTestResponseInstance() { + JobID jobID = new JobID(); + String jobName = "job_007"; + String jsonPlan = "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}"; + return new JobPlanInfo(jsonPlan); + } +}
