http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java new file mode 100644 index 0000000..cb402a3 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; + +import org.apache.log4j.Level; + +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; + +import static org.apache.flink.yarn.UtilsTest.addTestAppender; +import static org.apache.flink.yarn.UtilsTest.checkForLogString; + + +/** + * This test starts a MiniYARNCluster with a FIFO scheduler. + * There are no queues for that scheduler. + */ +public class YARNSessionFIFOITCase extends YarnTestBase { + private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class); + + /* + Override init with FIFO scheduler. + */ + @BeforeClass + public static void setup() { + yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); + yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768); + yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo"); + startYARNWithConfig(yarnConfiguration); + } + + @After + public void checkForProhibitedLogContents() { + ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS); + } + + /** + * Test regular operation, including command line parameter parsing. + */ + @Test(timeout=60000) // timeout after a minute. + public void testDetachedMode() { + LOG.info("Starting testDetachedMode()"); + addTestAppender(FlinkYarnSessionCli.class, Level.INFO); + Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), + "-t", flinkLibFolder.getAbsolutePath(), + "-n", "1", + "-jm", "768", + "-tm", "1024", + "--name", "MyCustomName", // test setting a custom name + "--detached"}, + "Flink JobManager is now running on", RunTypes.YARN_SESSION); + + checkForLogString("The Flink YARN client has been started in detached mode"); + + Assert.assertFalse("The runner should detach.", runner.isAlive()); + + LOG.info("Waiting until two containers are running"); + // wait until two containers are running + while(getRunningContainers() < 2) { + sleep(500); + } + LOG.info("Two containers are running. Killing the application"); + + // kill application "externally". + try { + YarnClient yc = YarnClient.createYarnClient(); + yc.init(yarnConfiguration); + yc.start(); + List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); + Assert.assertEquals(1, apps.size()); // Only one running + ApplicationReport app = apps.get(0); + + Assert.assertEquals("MyCustomName", app.getName()); + ApplicationId id = app.getApplicationId(); + yc.killApplication(id); + + while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) { + sleep(500); + } + } catch(Throwable t) { + LOG.warn("Killing failed", t); + Assert.fail(); + } + + LOG.info("Finished testDetachedMode()"); + } + + /** + * Test querying the YARN cluster. + * + * This test validates through 666*2 cores in the "cluster". + */ + @Test + public void testQueryCluster() { + LOG.info("Starting testQueryCluster()"); + runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332",null, RunTypes.YARN_SESSION, 0); // we have 666*2 cores. + LOG.info("Finished testQueryCluster()"); + } + + /** + * Test deployment to non-existing queue. (user-reported error) + * Deployment to the queue is possible because there are no queues, so we don't check. + */ + @Test + public void testNonexistingQueue() { + LOG.info("Starting testNonexistingQueue()"); + runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), + "-t", flinkLibFolder.getAbsolutePath(), + "-n", "1", + "-jm", "768", + "-tm", "1024", + "-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0); + LOG.info("Finished testNonexistingQueue()"); + } + + /** + * The test cluster has the following resources: + * - 2 Nodes with 4096 MB each. + * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512 + * + * We allocate: + * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb) + * 5 TaskManagers with 1585 MB + * + * user sees a total request of: 8181 MB (fits) + * system sees a total request of: 8437 (doesn't fit due to min alloc mb) + */ + @Ignore("The test is too resource consuming (8.5 GB of memory)") + @Test + public void testResourceComputation() { + addTestAppender(FlinkYarnClient.class, Level.WARN); + LOG.info("Starting testResourceComputation()"); + runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), + "-n", "5", + "-jm", "256", + "-tm", "1585"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0); + LOG.info("Finished testResourceComputation()"); + checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available."); + } + + /** + * The test cluster has the following resources: + * - 2 Nodes with 4096 MB each. + * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512 + * + * We allocate: + * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb) + * 2 TaskManagers with 3840 MB + * + * the user sees a total request of: 7936 MB (fits) + * the system sees a request of: 8192 MB (fits) + * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit. + * + * --> check if the system properly rejects allocating this session. + */ + @Ignore("The test is too resource consuming (8 GB of memory)") + @Test + public void testfullAlloc() { + addTestAppender(FlinkYarnClient.class, Level.WARN); + LOG.info("Starting testfullAlloc()"); + runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), + "-n", "2", + "-jm", "256", + "-tm", "3840"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0); + LOG.info("Finished testfullAlloc()"); + checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" + + "After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]"); + } + + /** + * Test the YARN Java API + */ + @Test + public void testJavaAPI() { + final int WAIT_TIME = 15; + LOG.info("Starting testJavaAPI()"); + + AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient(); + Assert.assertNotNull("unable to get yarn client", flinkYarnClient); + flinkYarnClient.setTaskManagerCount(1); + flinkYarnClient.setJobManagerMemory(768); + flinkYarnClient.setTaskManagerMemory(1024); + flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); + flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles())); + String confDirPath = System.getenv("FLINK_CONF_DIR"); + flinkYarnClient.setConfigurationDirectory(confDirPath); + flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration()); + flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); + + // deploy + AbstractFlinkYarnCluster yarnCluster = null; + try { + yarnCluster = flinkYarnClient.deploy(); + yarnCluster.connectToCluster(); + } catch (Exception e) { + LOG.warn("Failing test", e); + Assert.fail("Error while deploying YARN cluster: "+e.getMessage()); + } + GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1); + for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever" + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted", e); + } + GetClusterStatusResponse status = yarnCluster.getClusterStatus(); + if(status != null && status.equals(expectedStatus)) { + LOG.info("Cluster reached status " + status); + break; // all good, cluster started + } + if(second > WAIT_TIME) { + // we waited for 15 seconds. cluster didn't come up correctly + Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds"); + } + } + + // use the cluster + Assert.assertNotNull(yarnCluster.getJobManagerAddress()); + Assert.assertNotNull(yarnCluster.getWebInterfaceURL()); + + LOG.info("Shutting down cluster. All tests passed"); + // shutdown cluster + yarnCluster.shutdown(false); + LOG.info("Finished testJavaAPI()"); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java new file mode 100644 index 0000000..fc1e5bc --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -0,0 +1,619 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.TestLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Scanner; +import java.util.concurrent.ConcurrentMap; + + +/** + * This base class allows to use the MiniYARNCluster. + * The cluster is re-used for all tests. + * + * This class is located in a different package which is build after flink-dist. This way, + * we can use the YARN uberjar of flink to start a Flink YARN session. + * + * The test is not thread-safe. Parallel execution of tests is not possible! + */ +public abstract class YarnTestBase extends TestLogger { + private static final Logger LOG = LoggerFactory.getLogger(YarnTestBase.class); + + protected final static PrintStream originalStdout = System.out; + protected final static PrintStream originalStderr = System.err; + + protected static String TEST_CLUSTER_NAME_KEY = "flink-yarn-minicluster-name"; + + protected final static int NUM_NODEMANAGERS = 2; + + /** The tests are scanning for these strings in the final output. */ + protected final static String[] PROHIBITED_STRINGS = { + "Exception", // we don't want any exceptions to happen + "Started SelectChannelConnector@0.0.0.0:8081" // Jetty should start on a random port in YARN mode. + }; + + /** These strings are white-listed, overriding teh prohibited strings */ + protected final static String[] WHITELISTED_STRINGS = { + "akka.remote.RemoteTransportExceptionNoStackTrace", + // workaround for annoying InterruptedException logging: + // https://issues.apache.org/jira/browse/YARN-1022 + "java.lang.InterruptedException" + }; + + // Temp directory which is deleted after the unit test. + @ClassRule + public static TemporaryFolder tmp = new TemporaryFolder(); + + protected static MiniYARNCluster yarnCluster = null; + + /** + * Uberjar (fat jar) file of Flink + */ + protected static File flinkUberjar; + + protected static final Configuration yarnConfiguration; + + /** + * lib/ folder of the flink distribution. + */ + protected static File flinkLibFolder; + + static { + yarnConfiguration = new YarnConfiguration(); + yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways + yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); + yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2); + yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4); + yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); + yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); + yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. + // so we have to change the number of cores for testing. + yarnConfiguration.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN). + } + + + + /** + * Sleep a bit between the tests (we are re-using the YARN cluster for the tests) + */ + @After + public void sleep() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Assert.fail("Should not happen"); + } + } + + private YarnClient yarnClient = null; + @Before + public void checkClusterEmpty() throws IOException, YarnException { + if(yarnClient == null) { + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(yarnConfiguration); + yarnClient.start(); + } + + List<ApplicationReport> apps = yarnClient.getApplications(); + for(ApplicationReport app : apps) { + if(app.getYarnApplicationState() != YarnApplicationState.FINISHED + && app.getYarnApplicationState() != YarnApplicationState.KILLED + && app.getYarnApplicationState() != YarnApplicationState.FAILED) { + Assert.fail("There is at least one application on the cluster is not finished." + + "App "+app.getApplicationId()+" is in state "+app.getYarnApplicationState()); + } + } + } + + /** + * Locate a file or directory + */ + public static File findFile(String startAt, FilenameFilter fnf) { + File root = new File(startAt); + String[] files = root.list(); + if(files == null) { + return null; + } + for(String file : files) { + File f = new File(startAt + File.separator + file); + if(f.isDirectory()) { + File r = findFile(f.getAbsolutePath(), fnf); + if(r != null) { + return r; + } + } else if (fnf.accept(f.getParentFile(), f.getName())) { + return f; + } + } + return null; + } + + /** + * Filter to find root dir of the flink-yarn dist. + */ + public static class RootDirFilenameFilter implements FilenameFilter { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("flink-dist") && name.endsWith(".jar") && dir.toString().contains("/lib"); + } + } + + public static class ContainsName implements FilenameFilter { + private String[] names; + private String excludeInPath = null; + + /** + * @param names which have to be included in the filename. + */ + public ContainsName(String[] names) { + this.names = names; + } + + public ContainsName(String[] names, String excludeInPath) { + this.names = names; + this.excludeInPath = excludeInPath; + } + + @Override + public boolean accept(File dir, String name) { + if(excludeInPath == null) { + for(String n: names) { + if(!name.contains(n)) { + return false; + } + } + return true; + } else { + for(String n: names) { + if(!name.contains(n)) { + return false; + } + } + return !dir.toString().contains(excludeInPath); + } + } + } + + public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException { + tmp.create(); + File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml"); + + FileWriter writer = new FileWriter(yarnSiteXML); + yarnConf.writeXml(writer); + writer.flush(); + writer.close(); + return yarnSiteXML; + } + + /** + * This method checks the written TaskManager and JobManager log files + * for exceptions. + * + * WARN: Please make sure the tool doesn't find old logfiles from previous test runs. + * So always run "mvn clean" before running the tests here. + * + */ + public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[] whitelisted) { + File cwd = new File("target/" + yarnConfiguration.get(TEST_CLUSTER_NAME_KEY)); + Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists()); + Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory()); + + File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + // scan each file for prohibited strings. + File f = new File(dir.getAbsolutePath()+ "/" + name); + try { + Scanner scanner = new Scanner(f); + while (scanner.hasNextLine()) { + final String lineFromFile = scanner.nextLine(); + for (String aProhibited : prohibited) { + if (lineFromFile.contains(aProhibited)) { + + boolean whitelistedFound = false; + for (String white : whitelisted) { + if (lineFromFile.contains(white)) { + whitelistedFound = true; + break; + } + } + + if (!whitelistedFound) { + // logging in FATAL to see the actual message in TRAVIS tests. + Marker fatal = MarkerFactory.getMarker("FATAL"); + LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, lineFromFile); + return true; + } + } + } + + } + } catch (FileNotFoundException e) { + LOG.warn("Unable to locate file: "+e.getMessage()+" file: "+f.getAbsolutePath()); + } + + return false; + } + }); + if(foundFile != null) { + Scanner scanner = null; + try { + scanner = new Scanner(foundFile); + } catch (FileNotFoundException e) { + Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+foundFile.getAbsolutePath()); + } + LOG.warn("Found a file with a prohibited string. Printing contents:"); + while (scanner.hasNextLine()) { + LOG.warn("LINE: "+scanner.nextLine()); + } + Assert.fail("Found a file "+foundFile+" with a prohibited string: "+Arrays.toString(prohibited)); + } + } + + public static void sleep(int time) { + try { + Thread.sleep(time); + } catch (InterruptedException e) { + LOG.warn("Interruped",e); + } + } + + public static int getRunningContainers() { + int count = 0; + for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) { + NodeManager nm = yarnCluster.getNodeManager(nmId); + ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers(); + count += containers.size(); + } + return count; + } + + public static void startYARNWithConfig(Configuration conf) { + // set the home directory to a tmp directory. Flink on YARN is using the home dir to distribute the file + File homeDir = null; + try { + homeDir = tmp.newFolder(); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + System.setProperty("user.home", homeDir.getAbsolutePath()); + String uberjarStartLoc = ".."; + LOG.info("Trying to locate uberjar in {}", new File(uberjarStartLoc)); + flinkUberjar = findFile(uberjarStartLoc, new RootDirFilenameFilter()); + Assert.assertNotNull("Flink uberjar not found", flinkUberjar); + String flinkDistRootDir = flinkUberjar.getParentFile().getParent(); + flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/ + Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder); + Assert.assertTrue("lib folder not found", flinkLibFolder.exists()); + Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory()); + + if (!flinkUberjar.exists()) { + Assert.fail("Unable to locate yarn-uberjar.jar"); + } + + try { + LOG.info("Starting up MiniYARNCluster"); + if (yarnCluster == null) { + yarnCluster = new MiniYARNCluster(conf.get(YarnTestBase.TEST_CLUSTER_NAME_KEY), NUM_NODEMANAGERS, 1, 1); + + yarnCluster.init(conf); + yarnCluster.start(); + } + + Map<String, String> map = new HashMap<String, String>(System.getenv()); + + File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"})); + Assert.assertNotNull(flinkConfDirPath); + + map.put("FLINK_CONF_DIR", flinkConfDirPath.getParent()); + + File yarnConfFile = writeYarnSiteConfigXML(conf); + map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath()); + map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos + TestBaseUtils.setEnv(map); + + Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED); + + // wait for the nodeManagers to connect + while(!yarnCluster.waitForNodeManagersToConnect(500)) { + LOG.info("Waiting for Nodemanagers to connect"); + } + } catch (Exception ex) { + ex.printStackTrace(); + LOG.error("setup failure", ex); + Assert.fail(); + } + } + + /** + * Default @BeforeClass impl. Overwrite this for passing a different configuration + */ + @BeforeClass + public static void setup() { + startYARNWithConfig(yarnConfiguration); + } + + // -------------------------- Runner -------------------------- // + + protected static ByteArrayOutputStream outContent; + protected static ByteArrayOutputStream errContent; + enum RunTypes { + YARN_SESSION, CLI_FRONTEND + } + + /** + * This method returns once the "startedAfterString" has been seen. + */ + protected Runner startWithArgs(String[] args, String startedAfterString, RunTypes type) { + LOG.info("Running with args {}", Arrays.toString(args)); + + outContent = new ByteArrayOutputStream(); + errContent = new ByteArrayOutputStream(); + System.setOut(new PrintStream(outContent)); + System.setErr(new PrintStream(errContent)); + + + final int START_TIMEOUT_SECONDS = 60; + + Runner runner = new Runner(args, type); + runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs())."); + runner.start(); + + for(int second = 0; second < START_TIMEOUT_SECONDS; second++) { + sleep(1000); + // check output for correct TaskManager startup. + if(outContent.toString().contains(startedAfterString) + || errContent.toString().contains(startedAfterString) ) { + LOG.info("Found expected output in redirected streams"); + return runner; + } + // check if thread died + if(!runner.isAlive()) { + sendOutput(); + Assert.fail("Runner thread died before the test was finished. Return value = "+runner.getReturnValue()); + } + } + + sendOutput(); + Assert.fail("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " + + "expected string did not show up"); + return null; + } + + protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) { + runWithArgs(args,terminateAfterString, failOnStrings, type, returnCode, false); + } + /** + * The test has been passed once the "terminateAfterString" has been seen. + * @param args Command line arguments for the runner + * @param terminateAfterString the runner is searching the stdout and stderr for this string. as soon as it appears, the test has passed + * @param failOnStrings The runner is searching stdout and stderr for the strings specified here. If one appears, the test has failed + * @param type Set the type of the runner + * @param returnCode Expected return code from the runner. + * @param checkLogForTerminateString If true, the runner checks also the log4j logger for the terminate string + */ + protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode, boolean checkLogForTerminateString) { + LOG.info("Running with args {}", Arrays.toString(args)); + + outContent = new ByteArrayOutputStream(); + errContent = new ByteArrayOutputStream(); + System.setOut(new PrintStream(outContent)); + System.setErr(new PrintStream(errContent)); + + + // we wait for at most three minutes + final int START_TIMEOUT_SECONDS = 180; + final long deadline = System.currentTimeMillis() + (START_TIMEOUT_SECONDS * 1000); + + Runner runner = new Runner(args, type); + runner.start(); + + boolean expectedStringSeen = false; + boolean testPassedFromLog4j = false; + do { + sleep(1000); + String outContentString = outContent.toString(); + String errContentString = errContent.toString(); + if(failOnStrings != null) { + for (String failOnString : failOnStrings) { + if (outContentString.contains(failOnString) + || errContentString.contains(failOnString)) { + LOG.warn("Failing test. Output contained illegal string '" + failOnString + "'"); + sendOutput(); + // stopping runner. + runner.sendStop(); + Assert.fail("Output contained illegal string '" + failOnString + "'"); + } + } + } + // check output for the expected terminateAfterString. + if(checkLogForTerminateString) { + LoggingEvent matchedEvent = UtilsTest.getEventContainingString(terminateAfterString); + if(matchedEvent != null) { + testPassedFromLog4j = true; + LOG.info("Found expected output in logging event {}", matchedEvent); + } + + } + + if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString) || testPassedFromLog4j ) { + expectedStringSeen = true; + LOG.info("Found expected output in redirected streams"); + // send "stop" command to command line interface + LOG.info("RunWithArgs: request runner to stop"); + runner.sendStop(); + // wait for the thread to stop + try { + runner.join(30000); + } + catch (InterruptedException e) { + LOG.warn("Interrupted while stopping runner", e); + } + LOG.warn("RunWithArgs runner stopped."); + } + else { + // check if thread died + if (!runner.isAlive()) { + if (runner.getReturnValue() != 0) { + Assert.fail("Runner thread died before the test was finished. Return value = " + + runner.getReturnValue()); + } else { + LOG.info("Runner stopped earlier than expected with return value = 0"); + } + // leave loop: the runner died, so we can not expect new strings to show up. + break; + } + } + } + while (!expectedStringSeen && System.currentTimeMillis() < deadline); + + sendOutput(); + Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " + + "expected string did not show up", expectedStringSeen); + + // check for 0 return code + Assert.assertEquals("Expected return value", returnCode, runner.getReturnValue()); + LOG.info("Test was successful"); + } + + protected static void sendOutput() { + System.setOut(originalStdout); + System.setErr(originalStderr); + + LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString()); + LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString()); + } + + public static class Runner extends Thread { + private final String[] args; + private int returnValue; + private RunTypes type; + private FlinkYarnSessionCli yCli; + + public Runner(String[] args, RunTypes type) { + this.args = args; + this.type = type; + } + + public int getReturnValue() { + return returnValue; + } + + @Override + public void run() { + switch(type) { + case YARN_SESSION: + yCli = new FlinkYarnSessionCli("", ""); + returnValue = yCli.run(args); + break; + case CLI_FRONTEND: + try { + CliFrontend cli = new CliFrontend(); + returnValue = cli.parseParameters(args); + } catch (Exception e) { + throw new RuntimeException(e); + } + break; + default: + throw new RuntimeException("Unknown type " + type); + } + + if(returnValue != 0) { + Assert.fail("The YARN session returned with non-null value="+returnValue); + } + } + + public void sendStop() { + if(yCli != null) { + yCli.stop(); + } + } + } + + // -------------------------- Tear down -------------------------- // + + @AfterClass + public static void copyOnTravis() { + // When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files) + // to <flinkRoot>/target/flink-yarn-tests-*. + // The files from there are picked up by the ./tools/travis_watchdog.sh script + // to upload them to Amazon S3. + if(isOnTravis()) { + File target = new File("../target" + yarnConfiguration.get(TEST_CLUSTER_NAME_KEY)); + if(!target.mkdirs()) { + LOG.warn("Error creating dirs to {}", target); + } + File src = tmp.getRoot(); + LOG.info("copying the final files from {} to {}", src.getAbsolutePath(), target.getAbsolutePath()); + try { + FileUtils.copyDirectoryToDirectory(src, target); + } catch (IOException e) { + LOG.warn("Error copying the final files from {} to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), e.getMessage(), e); + } + } + } + + public static boolean isOnTravis() { + return System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true"); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..ebe0d37 --- /dev/null +++ b/flink-yarn-tests/src/test/resources/log4j-test.properties @@ -0,0 +1,35 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console + +# log whats going on between the tests +log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO +log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO +log4j.logger.org.apache.flink.yarn.YarnHighAvailability=INFO +log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch=INFO +log4j.logger.org.apache.flink.runtime.leaderelection=INFO +log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala new file mode 100644 index 0000000..2f93785 --- /dev/null +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn + +import java.util.concurrent.ExecutorService + +import akka.actor.ActorRef +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler +import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.testingUtils.TestingJobManagerLike + +import scala.concurrent.duration.FiniteDuration + +/** [[YarnJobManager]] implementation which mixes in the [[TestingJobManagerLike]] mixin. + * + * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition + * instead of an anonymous class with the respective mixin to obtain a more readable logger name. + * + * @param flinkConfiguration Configuration object for the actor + * @param executorService Execution context which is used to execute concurrent tasks in the + * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] + * @param instanceManager Instance manager to manage the registered + * [[org.apache.flink.runtime.taskmanager.TaskManager]] + * @param scheduler Scheduler to schedule Flink jobs + * @param libraryCacheManager Manager to manage uploaded jar files + * @param archive Archive for finished Flink jobs + * @param restartStrategyFactory Default restart strategy for job restarts + * @param timeout Timeout for futures + * @param leaderElectionService LeaderElectionService to participate in the leader election + */ +class TestingYarnJobManager( + flinkConfiguration: Configuration, + executorService: ExecutorService, + instanceManager: InstanceManager, + scheduler: Scheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory, + savepointStore: SavepointStore, + jobRecoveryTimeout: FiniteDuration) + extends YarnJobManager( + flinkConfiguration, + executorService, + instanceManager, + scheduler, + libraryCacheManager, + archive, + restartStrategyFactory, + timeout, + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory, + savepointStore, + jobRecoveryTimeout) + with TestingJobManagerLike {} http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala new file mode 100644 index 0000000..73ab7eb --- /dev/null +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn + +import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.instance.InstanceConnectionInfo +import org.apache.flink.runtime.io.disk.iomanager.IOManager +import org.apache.flink.runtime.io.network.NetworkEnvironment +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService +import org.apache.flink.runtime.memory.MemoryManager +import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration +import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike + +/** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin. + * + * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition + * instead of an anonymous class with the respective mixin to obtain a more readable logger name. + * + * @param config Configuration object for the actor + * @param resourceID The Yarn container id + * @param connectionInfo Connection information of this actor + * @param memoryManager MemoryManager which is responsibel for Flink's managed memory allocation + * @param ioManager IOManager responsible for I/O + * @param network NetworkEnvironment for this actor + * @param numberOfSlots Number of slots for this TaskManager + * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading + * JobManager + */ +class TestingYarnTaskManager( + config: TaskManagerConfiguration, + resourceID: ResourceID, + connectionInfo: InstanceConnectionInfo, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + leaderRetrievalService: LeaderRetrievalService) + extends YarnTaskManager( + config, + resourceID, + connectionInfo, + memoryManager, + ioManager, + network, + numberOfSlots, + leaderRetrievalService) + with TestingTaskManagerLike { + + object YarnTaskManager { + + /** Entry point (main method) to run the TaskManager on YARN. + * @param args The command line arguments. + */ + def main(args: Array[String]): Unit = { + YarnTaskManagerRunner.runYarnTaskManager(args, classOf[TestingYarnTaskManager]) + } + + } +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml index 63869ee..12db578 100644 --- a/flink-yarn/pom.xml +++ b/flink-yarn/pom.xml @@ -60,6 +60,7 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_2.10</artifactId> <version>${project.version}</version> + <type>test-jar</type> <scope>test</scope> </dependency>