Repository: flink
Updated Branches:
  refs/heads/release-1.1 3cf494a3e -> f7ec1efb3


[FLINK-4488][yarn] only automatically shutdown clusters for detached jobs

- add check to yarn tests to verify cluster hasn't been shutdown prematurely

This closes #2419


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28da9954
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28da9954
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28da9954

Branch: refs/heads/release-1.1
Commit: 28da995494ad21223f6911f27eb46187294f311a
Parents: 3cf494a
Author: Maximilian Michels <m...@apache.org>
Authored: Wed Aug 17 15:41:12 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Mon Aug 29 18:18:56 2016 +0200

----------------------------------------------------------------------
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  2 +-
 .../org/apache/flink/yarn/YarnTestBase.java     | 92 ++++++++++++++++----
 .../apache/flink/yarn/YarnClusterClient.java    | 10 +--
 3 files changed, 79 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28da9954/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 510a048..d03d9eb 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -150,7 +150,7 @@ public class FlinkYarnSessionCliTest {
 
                FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
                AbstractYarnClusterDescriptor descriptor = 
yarnCLI.createDescriptor("", runOptions.getCommandLine());
-               System.out.println(descriptor.getZookeeperNamespace());
+
                Assert.assertEquals(zkNamespaceCliInput, 
descriptor.getZookeeperNamespace());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28da9954/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 76b5d31..0243012 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.yarn;
 
+import akka.actor.Identify;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
@@ -43,10 +47,14 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.verification.VerificationMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -61,6 +69,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Scanner;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 
@@ -562,28 +571,49 @@ public abstract class YarnTestBase extends TestLogger {
 
                @Override
                public void run() {
-                       switch(type) {
-                               case YARN_SESSION:
-                                       yCli = new FlinkYarnSessionCli("", "", 
false);
-                                       returnValue = yCli.run(args);
-                                       break;
-                               case CLI_FRONTEND:
-                                       try {
-                                               CliFrontend cli = new 
CliFrontend();
-                                               returnValue = 
cli.parseParameters(args);
-                                       } catch (Exception e) {
-                                               throw new RuntimeException(e);
-                                       }
-                                       break;
-                               default:
-                                       throw new RuntimeException("Unknown 
type " + type);
-                       }
+                       try {
+                               switch (type) {
+                                       case YARN_SESSION:
+                                               yCli = new 
FlinkYarnSessionCli("", "", false);
+                                               returnValue = yCli.run(args);
+                                               break;
+                                       case CLI_FRONTEND:
+                                               TestingCLI cli;
+                                               try {
+                                                       cli = new TestingCLI();
+                                                       returnValue = 
cli.parseParameters(args);
+                                               } catch (Exception e) {
+                                                       throw new 
RuntimeException("Failed to execute the following args with CliFrontend: "
+                                                               + 
Arrays.toString(args), e);
+                                               }
 
-                       if(returnValue != 0) {
-                               Assert.fail("The YARN session returned with 
non-null value="+returnValue);
+                                               final ClusterClient client = 
cli.getClusterClient();
+                                               try {
+                                                       // check if the 
JobManager is still alive after running the job
+                                                       final FiniteDuration 
finiteDuration = new FiniteDuration(10, TimeUnit.SECONDS);
+                                                       ActorGateway 
jobManagerGateway = client.getJobManagerGateway();
+                                                       
Await.ready(jobManagerGateway.ask(new Identify(true), finiteDuration), 
finiteDuration);
+                                               } catch (Exception e) {
+                                                       throw new 
RuntimeException("It seems like the JobManager died although it should still be 
alive");
+                                               }
+                                               // verify we would have shut 
down anyways and then shutdown
+                                               
Mockito.verify(cli.getSpiedClusterClient()).shutdown();
+                                               client.shutdown();
+
+                                               break;
+                                       default:
+                                               throw new 
RuntimeException("Unknown type " + type);
+                               }
+
+                               if (returnValue != 0) {
+                                       Assert.fail("The YARN session returned 
with non-null value=" + returnValue);
+                               }
+                       } catch (Throwable t) {
+                               Assert.fail(t.getMessage());
                        }
                }
 
+               /** Stops the Yarn session */
                public void sendStop() {
                        if(yCli != null) {
                                yCli.stop();
@@ -623,4 +653,30 @@ public abstract class YarnTestBase extends TestLogger {
                return System.getenv("TRAVIS") != null && 
System.getenv("TRAVIS").equals("true");
        }
 
+       private static class TestingCLI extends CliFrontend {
+
+               private ClusterClient originalClusterClient;
+               private ClusterClient spiedClusterClient;
+
+               public TestingCLI() throws Exception {}
+
+               @Override
+               protected ClusterClient createClient(CommandLineOptions 
options, String programName) throws Exception {
+                       // mock the returned ClusterClient to disable shutdown 
and verify shutdown behavior later on
+                       originalClusterClient = super.createClient(options, 
programName);
+                       spiedClusterClient = Mockito.spy(originalClusterClient);
+                       Mockito.doNothing().when(spiedClusterClient).shutdown();
+                       return spiedClusterClient;
+               }
+
+               public ClusterClient getClusterClient() {
+                       return originalClusterClient;
+               }
+
+               public ClusterClient getSpiedClusterClient() {
+                       return spiedClusterClient;
+               }
+
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28da9954/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index dfc71e0..e76b7e8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -198,11 +198,10 @@ public class YarnClusterClient extends ClusterClient {
 
        @Override
        protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
-               if (perJobCluster) {
-                       stopAfterJob(jobGraph.getJobID());
-               }
-
                if (isDetached()) {
+                       if (perJobCluster) {
+                               stopAfterJob(jobGraph.getJobID());
+                       }
                        return super.runDetached(jobGraph, classLoader);
                } else {
                        return super.run(jobGraph, classLoader);
@@ -248,7 +247,7 @@ public class YarnClusterClient extends ClusterClient {
                        throw new RuntimeException("Unable to get ClusterClient 
status from Application Client", e);
                }
                if(clusterStatus instanceof None$) {
-                       return null;
+                       throw new RuntimeException("Unable to get ClusterClient 
status from Application Client");
                } else if(clusterStatus instanceof Some) {
                        return (GetClusterStatusResponse) (((Some) 
clusterStatus).get());
                } else {
@@ -572,7 +571,6 @@ public class YarnClusterClient extends ClusterClient {
                                                        Thread.sleep(250);
                                                } catch (InterruptedException 
e) {
                                                        LOG.error("Interrupted 
while waiting for TaskManagers");
-                                                       
System.err.println("Thread is interrupted");
                                                        throw new 
RuntimeException("Interrupted while waiting for TaskManagers", e);
                                                }
                                        }

Reply via email to