Repository: flink Updated Branches: refs/heads/release-1.1 3cf494a3e -> f7ec1efb3
[FLINK-4488][yarn] only automatically shutdown clusters for detached jobs - add check to yarn tests to verify cluster hasn't been shutdown prematurely This closes #2419 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28da9954 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28da9954 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28da9954 Branch: refs/heads/release-1.1 Commit: 28da995494ad21223f6911f27eb46187294f311a Parents: 3cf494a Author: Maximilian Michels <m...@apache.org> Authored: Wed Aug 17 15:41:12 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Mon Aug 29 18:18:56 2016 +0200 ---------------------------------------------------------------------- .../flink/yarn/FlinkYarnSessionCliTest.java | 2 +- .../org/apache/flink/yarn/YarnTestBase.java | 92 ++++++++++++++++---- .../apache/flink/yarn/YarnClusterClient.java | 10 +-- 3 files changed, 79 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/28da9954/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index 510a048..d03d9eb 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -150,7 +150,7 @@ public class FlinkYarnSessionCliTest { FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn"); AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine()); - System.out.println(descriptor.getZookeeperNamespace()); + Assert.assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace()); } http://git-wip-us.apache.org/repos/asf/flink/blob/28da9954/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 76b5d31..0243012 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -18,9 +18,13 @@ package org.apache.flink.yarn; +import akka.actor.Identify; import org.apache.commons.io.FileUtils; import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.TestLogger; @@ -43,10 +47,14 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.mockito.verification.VerificationMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; +import scala.concurrent.Await; +import scala.concurrent.duration.FiniteDuration; import java.io.ByteArrayOutputStream; import java.io.File; @@ -61,6 +69,7 @@ import java.util.List; import java.util.Map; import java.util.Scanner; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -562,28 +571,49 @@ public abstract class YarnTestBase extends TestLogger { @Override public void run() { - switch(type) { - case YARN_SESSION: - yCli = new FlinkYarnSessionCli("", "", false); - returnValue = yCli.run(args); - break; - case CLI_FRONTEND: - try { - CliFrontend cli = new CliFrontend(); - returnValue = cli.parseParameters(args); - } catch (Exception e) { - throw new RuntimeException(e); - } - break; - default: - throw new RuntimeException("Unknown type " + type); - } + try { + switch (type) { + case YARN_SESSION: + yCli = new FlinkYarnSessionCli("", "", false); + returnValue = yCli.run(args); + break; + case CLI_FRONTEND: + TestingCLI cli; + try { + cli = new TestingCLI(); + returnValue = cli.parseParameters(args); + } catch (Exception e) { + throw new RuntimeException("Failed to execute the following args with CliFrontend: " + + Arrays.toString(args), e); + } - if(returnValue != 0) { - Assert.fail("The YARN session returned with non-null value="+returnValue); + final ClusterClient client = cli.getClusterClient(); + try { + // check if the JobManager is still alive after running the job + final FiniteDuration finiteDuration = new FiniteDuration(10, TimeUnit.SECONDS); + ActorGateway jobManagerGateway = client.getJobManagerGateway(); + Await.ready(jobManagerGateway.ask(new Identify(true), finiteDuration), finiteDuration); + } catch (Exception e) { + throw new RuntimeException("It seems like the JobManager died although it should still be alive"); + } + // verify we would have shut down anyways and then shutdown + Mockito.verify(cli.getSpiedClusterClient()).shutdown(); + client.shutdown(); + + break; + default: + throw new RuntimeException("Unknown type " + type); + } + + if (returnValue != 0) { + Assert.fail("The YARN session returned with non-null value=" + returnValue); + } + } catch (Throwable t) { + Assert.fail(t.getMessage()); } } + /** Stops the Yarn session */ public void sendStop() { if(yCli != null) { yCli.stop(); @@ -623,4 +653,30 @@ public abstract class YarnTestBase extends TestLogger { return System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true"); } + private static class TestingCLI extends CliFrontend { + + private ClusterClient originalClusterClient; + private ClusterClient spiedClusterClient; + + public TestingCLI() throws Exception {} + + @Override + protected ClusterClient createClient(CommandLineOptions options, String programName) throws Exception { + // mock the returned ClusterClient to disable shutdown and verify shutdown behavior later on + originalClusterClient = super.createClient(options, programName); + spiedClusterClient = Mockito.spy(originalClusterClient); + Mockito.doNothing().when(spiedClusterClient).shutdown(); + return spiedClusterClient; + } + + public ClusterClient getClusterClient() { + return originalClusterClient; + } + + public ClusterClient getSpiedClusterClient() { + return spiedClusterClient; + } + + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/28da9954/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index dfc71e0..e76b7e8 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -198,11 +198,10 @@ public class YarnClusterClient extends ClusterClient { @Override protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { - if (perJobCluster) { - stopAfterJob(jobGraph.getJobID()); - } - if (isDetached()) { + if (perJobCluster) { + stopAfterJob(jobGraph.getJobID()); + } return super.runDetached(jobGraph, classLoader); } else { return super.run(jobGraph, classLoader); @@ -248,7 +247,7 @@ public class YarnClusterClient extends ClusterClient { throw new RuntimeException("Unable to get ClusterClient status from Application Client", e); } if(clusterStatus instanceof None$) { - return null; + throw new RuntimeException("Unable to get ClusterClient status from Application Client"); } else if(clusterStatus instanceof Some) { return (GetClusterStatusResponse) (((Some) clusterStatus).get()); } else { @@ -572,7 +571,6 @@ public class YarnClusterClient extends ClusterClient { Thread.sleep(250); } catch (InterruptedException e) { LOG.error("Interrupted while waiting for TaskManagers"); - System.err.println("Thread is interrupted"); throw new RuntimeException("Interrupted while waiting for TaskManagers", e); } }