This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0709bf1039078867c9165806a70be7e8429cd2c1 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Jan 23 15:33:25 2020 +0100 [FLINK-15651][tests] Refactor JarHandlerTest --- flink-runtime-web/pom.xml | 19 +++ .../webmonitor/handlers/JarHandlerTest.java | 133 +++++---------------- .../runtime/webmonitor/handlers/JarHandlers.java | 2 +- .../handlers/utils/OutputTestProgram.java | 29 +++++ 4 files changed, 82 insertions(+), 101 deletions(-) diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml index b7fadbe..f152abd 100644 --- a/flink-runtime-web/pom.xml +++ b/flink-runtime-web/pom.xml @@ -214,6 +214,25 @@ under the License. <finalName>${test.ParameterProgramNoManifest.name}</finalName> </configuration> </execution> + <execution> + <!-- Used for JarHandler tests --> + <id>test-output-program-jar</id> + <phase>process-test-classes</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <includes> + <include>org/apache/flink/runtime/webmonitor/handlers/utils/OutputTestProgram.java</include> + </includes> + <archive> + <manifest> + <mainClass>org.apache.flink.runtime.webmonitor.handlers.utils.OutputTestProgram</mainClass> + </manifest> + </archive> + <finalName>output-test-program</finalName> + </configuration> + </execution> </executions> </plugin> diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerTest.java index c565322..33ada39 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerTest.java @@ -18,33 +18,21 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.RestOptions; -import org.apache.flink.configuration.WebOptions; -import org.apache.flink.runtime.rest.RestClient; -import org.apache.flink.runtime.rest.RestClientConfiguration; -import org.apache.flink.runtime.rest.messages.MessageHeaders; -import org.apache.flink.runtime.rest.util.RestClientException; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.testutils.MiniClusterResource; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; -import java.io.IOException; -import java.net.URI; -import java.nio.file.FileSystem; -import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; -import java.util.HashMap; +import java.nio.file.Paths; import java.util.Optional; import static org.hamcrest.CoreMatchers.containsString; @@ -56,6 +44,8 @@ import static org.hamcrest.MatcherAssert.assertThat; @Category(AlsoRunWithLegacyScheduler.class) public class JarHandlerTest extends TestLogger { + private static final String JAR_NAME = "output-test-program.jar"; + @ClassRule public static final TemporaryFolder TMP = new TemporaryFolder(); @@ -75,96 +65,39 @@ public class JarHandlerTest extends TestLogger { } private static void runTest(Type type, String expectedCapturedStdOut, String expectedCapturedStdErr) throws Exception { - Path uploadDir = TMP.newFolder().toPath(); - - Path actualUploadDir = uploadDir.resolve("flink-web-upload"); - Files.createDirectory(actualUploadDir); + final TestingDispatcherGateway restfulGateway = new TestingDispatcherGateway.Builder().build(); - Path emptyJar = actualUploadDir.resolve("empty.jar"); - createJarFile(emptyJar); + final JarHandlers handlers = new JarHandlers(TMP.newFolder().toPath(), restfulGateway); - Configuration config = new Configuration(); - config.setString(WebOptions.UPLOAD_DIR, uploadDir.toString()); + final Path originalJar = Paths.get(System.getProperty("targetDir")).resolve(JAR_NAME); + final Path jar = Files.copy(originalJar, TMP.newFolder().toPath().resolve(JAR_NAME)); - MiniClusterResource clusterResource = new MiniClusterResource( - new MiniClusterResourceConfiguration.Builder() - .setConfiguration(config) - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(1) - .build()); - clusterResource.before(); + final String storedJarPath = JarHandlers.uploadJar(handlers.uploadHandler, jar, restfulGateway); + final String storedJarName = Paths.get(storedJarPath).getFileName().toString(); try { - Configuration clientConfig = clusterResource.getClientConfiguration(); - RestClient client = new RestClient(RestClientConfiguration.fromConfiguration(clientConfig), TestingUtils.defaultExecutor()); - - try { - final MessageHeaders headers; - final JarMessageParameters parameters; - if (type == Type.RUN) { - headers = JarRunHeaders.getInstance(); - parameters = ((JarRunHeaders) headers).getUnresolvedMessageParameters(); - } else if (type == Type.PLAN) { - headers = JarPlanGetHeaders.getInstance(); - parameters = ((JarPlanGetHeaders) headers).getUnresolvedMessageParameters(); - } else { - throw new RuntimeException("Invalid type: " + type); - } - parameters.jarIdPathParameter.resolve(emptyJar.getFileName().toString()); - - String host = clientConfig.getString(RestOptions.ADDRESS); - int port = clientConfig.getInteger(RestOptions.PORT); - - try { - client.sendRequest(host, port, headers, parameters, new JarPlanRequestBody()).get(); - } catch (Exception e) { - Optional<RestClientException> expected = ExceptionUtils.findThrowable(e, RestClientException.class); - if (expected.isPresent()) { - String message = expected.get().getMessage(); - // implies the job was actually submitted - assertThat(message, containsString("ProgramInvocationException")); - // original cause is preserved in stack trace - assertThat(message, containsString("The program plan could not be fetched - the program aborted pre-maturely")); - // implies the jar was registered for the job graph (otherwise the jar name would not occur in the exception) - // implies the jar was uploaded (otherwise the file would not be found at all) - assertThat(message, containsString("empty.jar")); - // ensure that no stdout/stderr has been captured - assertThat(message, containsString("System.out: " + expectedCapturedStdOut)); - assertThat(message, containsString("System.err: " + expectedCapturedStdErr)); - } else { - throw e; - } - } - } finally { - client.shutdown(Time.milliseconds(10)); + switch (type) { + case RUN: + JarHandlers.runJar(handlers.runHandler, storedJarName, restfulGateway); + break; + case PLAN: + JarHandlers.showPlan(handlers.planHandler, storedJarName, restfulGateway); + } + Assert.fail("Should have failed with an exception."); + } catch (Exception e) { + Optional<ProgramInvocationException> expected = ExceptionUtils.findThrowable(e, ProgramInvocationException.class); + if (expected.isPresent()) { + String message = expected.get().getMessage(); + // original cause is preserved in stack trace + assertThat(message, containsString("The program plan could not be fetched - the program aborted pre-maturely")); + // implies the jar was registered for the job graph (otherwise the jar name would not occur in the exception) + assertThat(message, containsString(JAR_NAME)); + // ensure that no stdout/stderr has been captured + assertThat(message, containsString("System.out: " + expectedCapturedStdOut)); + assertThat(message, containsString("System.err: " + expectedCapturedStdErr)); + } else { + throw e; } - } finally { - clusterResource.after(); - } - } - - private static void createJarFile(Path zipFile) throws IOException { - URI uri = URI.create("jar:file:" + zipFile.toString()); - HashMap<String, Object> env = new HashMap<>(); - // We need this to ensure the file will be created if it does not exist - env.put("create", "true"); - try (FileSystem zipfs = FileSystems.newFileSystem(uri, env)) { - Files.createDirectory(zipfs.getPath("META-INF")); - Path manifest = zipfs.getPath("META-INF/MANIFEST.MF"); - Files.write(manifest, "Manifest-Version: 1.0\nCreated-By: Apache Flink\nMain-Class: HelloWorld\n".getBytes()); - - Path content = zipfs.getPath("HelloWorld.class"); - Files.write(content, new byte[] { - /* // This byte array is the byte code of the following program: - * public class HelloWorld { - * public static void main(String[] args) { - * System.out.println("hello out!"); - * System.err.println("hello err!"); - * } - * } - */ - -54, -2, -70, -66, 0, 0, 0, 52, 0, 39, 10, 0, 8, 0, 22, 9, 0, 23, 0, 24, 8, 0, 25, 10, 0, 26, 0, 27, 9, 0, 23, 0, 28, 8, 0, 29, 7, 0, 30, 7, 0, 31, 1, 0, 6, 60, 105, 110, 105, 116, 62, 1, 0, 3, 40, 41, 86, 1, 0, 4, 67, 111, 100, 101, 1, 0, 15, 76, 105, 110, 101, 78, 117, 109, 98, 101, 114, 84, 97, 98, 108, 101, 1, 0, 18, 76, 111, 99, 97, 108, 86, 97, 114, 105, 97, 98, 108, 101, 84, 97, 98, 108, 101, 1, 0, 4, 116, 104, 105, 115, 1, 0, 12, 76, 72, 101, 108, 108, 111, 87, 111, 114, 108, [...] - }); } } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlers.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlers.java index f5405af..8308a9c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlers.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlers.java @@ -36,7 +36,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; /** - * TODO: Add javadoc. + * Test setup for all jar-submission related handlers. */ public class JarHandlers { diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/OutputTestProgram.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/OutputTestProgram.java new file mode 100644 index 0000000..a64e694 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/OutputTestProgram.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.utils; + +/** + * Simple test program that prints to stdout/stderr. + */ +public class OutputTestProgram { + public static void main(String[] args) throws Exception { + System.out.println("hello out!"); + System.err.println("hello err!"); + } +}