This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 69028bdbba733c0a3308a303b864285453b5e31e Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Jan 23 15:33:06 2020 +0100 [hotfix][tests] Make JarHandlers re-usable --- .../runtime/webmonitor/handlers/JarHandlers.java | 155 +++++++++++++++++++++ .../webmonitor/handlers/JarSubmissionITCase.java | 132 +----------------- 2 files changed, 160 insertions(+), 127 deletions(-) diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlers.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlers.java new file mode 100644 index 0000000..f5405af --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlers.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * TODO: Add javadoc. + */ +public class JarHandlers { + + final JarUploadHandler uploadHandler; + final JarListHandler listHandler; + final JarPlanHandler planHandler; + final JarRunHandler runHandler; + final JarDeleteHandler deleteHandler; + + JarHandlers(final Path jarDir, final TestingDispatcherGateway restfulGateway) { + final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway); + final Time timeout = Time.seconds(10); + final Map<String, String> responseHeaders = Collections.emptyMap(); + final Executor executor = TestingUtils.defaultExecutor(); + + uploadHandler = new JarUploadHandler( + gatewayRetriever, + timeout, + responseHeaders, + JarUploadHeaders.getInstance(), + jarDir, + executor); + + listHandler = new JarListHandler( + gatewayRetriever, + timeout, + responseHeaders, + JarListHeaders.getInstance(), + CompletableFuture.completedFuture("shazam://localhost:12345"), + jarDir.toFile(), + new Configuration(), + executor); + + planHandler = new JarPlanHandler( + gatewayRetriever, + timeout, + responseHeaders, + JarPlanGetHeaders.getInstance(), + jarDir, + new Configuration(), + executor); + + runHandler = new JarRunHandler( + gatewayRetriever, + timeout, + responseHeaders, + JarRunHeaders.getInstance(), + jarDir, + new Configuration(), + executor); + + deleteHandler = new JarDeleteHandler( + gatewayRetriever, + timeout, + responseHeaders, + JarDeleteHeaders.getInstance(), + jarDir, + executor); + } + + public static String uploadJar(JarUploadHandler handler, Path jar, RestfulGateway restfulGateway) throws Exception { + HandlerRequest<EmptyRequestBody, EmptyMessageParameters> uploadRequest = new HandlerRequest<>( + EmptyRequestBody.getInstance(), + EmptyMessageParameters.getInstance(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.singletonList(jar.toFile())); + final JarUploadResponseBody uploadResponse = handler.handleRequest(uploadRequest, restfulGateway) + .get(); + return uploadResponse.getFilename(); + } + + public static JarListInfo listJars(JarListHandler handler, RestfulGateway restfulGateway) throws Exception { + HandlerRequest<EmptyRequestBody, EmptyMessageParameters> listRequest = new HandlerRequest<>( + EmptyRequestBody.getInstance(), + EmptyMessageParameters.getInstance()); + return handler.handleRequest(listRequest, restfulGateway) + .get(); + } + + public static JobPlanInfo showPlan(JarPlanHandler handler, String jarName, RestfulGateway restfulGateway) throws Exception { + JarPlanMessageParameters planParameters = JarPlanGetHeaders.getInstance().getUnresolvedMessageParameters(); + HandlerRequest<JarPlanRequestBody, JarPlanMessageParameters> planRequest = new HandlerRequest<>( + new JarPlanRequestBody(), + planParameters, + Collections.singletonMap(planParameters.jarIdPathParameter.getKey(), jarName), + Collections.emptyMap(), + Collections.emptyList()); + return handler.handleRequest(planRequest, restfulGateway) + .get(); + } + + public static JarRunResponseBody runJar(JarRunHandler handler, String jarName, DispatcherGateway restfulGateway) throws Exception { + final JarRunMessageParameters runParameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters(); + HandlerRequest<JarRunRequestBody, JarRunMessageParameters> runRequest = new HandlerRequest<>( + new JarRunRequestBody(), + runParameters, + Collections.singletonMap(runParameters.jarIdPathParameter.getKey(), jarName), + Collections.emptyMap(), + Collections.emptyList()); + return handler.handleRequest(runRequest, restfulGateway) + .get(); + } + + public static void deleteJar(JarDeleteHandler handler, String jarName, RestfulGateway restfulGateway) throws Exception { + JarDeleteMessageParameters deleteParameters = JarDeleteHeaders.getInstance().getUnresolvedMessageParameters(); + HandlerRequest<EmptyRequestBody, JarDeleteMessageParameters> deleteRequest = new HandlerRequest<>( + EmptyRequestBody.getInstance(), + deleteParameters, + Collections.singletonMap(deleteParameters.jarIdPathParameter.getKey(), jarName), + Collections.emptyMap(), + Collections.emptyList()); + handler.handleRequest(deleteRequest, restfulGateway) + .get(); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java index 972f6d2..dc920db 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java @@ -18,19 +18,10 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rest.handler.HandlerRequest; -import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; -import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobPlanInfo; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.BlobServerResource; -import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.TestLogger; @@ -44,11 +35,13 @@ import org.junit.rules.TemporaryFolder; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Collections; -import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; +import static org.apache.flink.runtime.webmonitor.handlers.JarHandlers.deleteJar; +import static org.apache.flink.runtime.webmonitor.handlers.JarHandlers.listJars; +import static org.apache.flink.runtime.webmonitor.handlers.JarHandlers.runJar; +import static org.apache.flink.runtime.webmonitor.handlers.JarHandlers.showPlan; +import static org.apache.flink.runtime.webmonitor.handlers.JarHandlers.uploadJar; import static org.hamcrest.Matchers.containsString; /** @@ -104,119 +97,4 @@ public class JarSubmissionITCase extends TestLogger { final JarListInfo postDeleteListResponse = listJars(listHandler, restfulGateway); Assert.assertEquals(0, postDeleteListResponse.jarFileList.size()); } - - private static String uploadJar(JarUploadHandler handler, Path jar, RestfulGateway restfulGateway) throws Exception { - HandlerRequest<EmptyRequestBody, EmptyMessageParameters> uploadRequest = new HandlerRequest<>( - EmptyRequestBody.getInstance(), - EmptyMessageParameters.getInstance(), - Collections.emptyMap(), - Collections.emptyMap(), - Collections.singletonList(jar.toFile())); - final JarUploadResponseBody uploadResponse = handler.handleRequest(uploadRequest, restfulGateway) - .get(); - return uploadResponse.getFilename(); - } - - private static JarListInfo listJars(JarListHandler handler, RestfulGateway restfulGateway) throws Exception { - HandlerRequest<EmptyRequestBody, EmptyMessageParameters> listRequest = new HandlerRequest<>( - EmptyRequestBody.getInstance(), - EmptyMessageParameters.getInstance()); - return handler.handleRequest(listRequest, restfulGateway) - .get(); - } - - private static JobPlanInfo showPlan(JarPlanHandler handler, String jarName, RestfulGateway restfulGateway) throws Exception { - JarPlanMessageParameters planParameters = JarPlanGetHeaders.getInstance().getUnresolvedMessageParameters(); - HandlerRequest<JarPlanRequestBody, JarPlanMessageParameters> planRequest = new HandlerRequest<>( - new JarPlanRequestBody(), - planParameters, - Collections.singletonMap(planParameters.jarIdPathParameter.getKey(), jarName), - Collections.emptyMap(), - Collections.emptyList()); - return handler.handleRequest(planRequest, restfulGateway) - .get(); - } - - private static JarRunResponseBody runJar(JarRunHandler handler, String jarName, DispatcherGateway restfulGateway) throws Exception { - final JarRunMessageParameters runParameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters(); - HandlerRequest<JarRunRequestBody, JarRunMessageParameters> runRequest = new HandlerRequest<>( - new JarRunRequestBody(), - runParameters, - Collections.singletonMap(runParameters.jarIdPathParameter.getKey(), jarName), - Collections.emptyMap(), - Collections.emptyList()); - return handler.handleRequest(runRequest, restfulGateway) - .get(); - } - - private static void deleteJar(JarDeleteHandler handler, String jarName, RestfulGateway restfulGateway) throws Exception { - JarDeleteMessageParameters deleteParameters = JarDeleteHeaders.getInstance().getUnresolvedMessageParameters(); - HandlerRequest<EmptyRequestBody, JarDeleteMessageParameters> deleteRequest = new HandlerRequest<>( - EmptyRequestBody.getInstance(), - deleteParameters, - Collections.singletonMap(deleteParameters.jarIdPathParameter.getKey(), jarName), - Collections.emptyMap(), - Collections.emptyList()); - handler.handleRequest(deleteRequest, restfulGateway) - .get(); - } - - private static class JarHandlers { - final JarUploadHandler uploadHandler; - final JarListHandler listHandler; - final JarPlanHandler planHandler; - final JarRunHandler runHandler; - final JarDeleteHandler deleteHandler; - - JarHandlers(final Path jarDir, final TestingDispatcherGateway restfulGateway) { - final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway); - final Time timeout = Time.seconds(10); - final Map<String, String> responseHeaders = Collections.emptyMap(); - final Executor executor = TestingUtils.defaultExecutor(); - - uploadHandler = new JarUploadHandler( - gatewayRetriever, - timeout, - responseHeaders, - JarUploadHeaders.getInstance(), - jarDir, - executor); - - listHandler = new JarListHandler( - gatewayRetriever, - timeout, - responseHeaders, - JarListHeaders.getInstance(), - CompletableFuture.completedFuture("shazam://localhost:12345"), - jarDir.toFile(), - new Configuration(), - executor); - - planHandler = new JarPlanHandler( - gatewayRetriever, - timeout, - responseHeaders, - JarPlanGetHeaders.getInstance(), - jarDir, - new Configuration(), - executor); - - runHandler = new JarRunHandler( - gatewayRetriever, - timeout, - responseHeaders, - JarRunHeaders.getInstance(), - jarDir, - new Configuration(), - executor); - - deleteHandler = new JarDeleteHandler( - gatewayRetriever, - timeout, - responseHeaders, - JarDeleteHeaders.getInstance(), - jarDir, - executor); - } - } }