Repository: zeppelin Updated Branches: refs/heads/master df1c654e0 -> 152147122
ZEPPELIN-2151. Add integration test for livy cancel api ### What is this PR for? Just add integration test for livy cancel api. Only do it for livy 0.3 as the cancel api is only available in livy 0.3 ### What type of PR is it? [ Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2151 ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2056 from zjffdu/ZEPPELIN-2151 and squashes the following commits: b7ca7b3 [Jeff Zhang] ZEPPELIN-2151. Add integration test for livy cancel api Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/15214712 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/15214712 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/15214712 Branch: refs/heads/master Commit: 152147122b9797baef20a382eb880eadcf7cdc0f Parents: df1c654 Author: Jeff Zhang <zjf...@apache.org> Authored: Wed Feb 22 19:59:47 2017 +0800 Committer: Felix Cheung <felixche...@apache.org> Committed: Sun Mar 12 11:06:58 2017 -0700 ---------------------------------------------------------------------- .../zeppelin/livy/BaseLivyInterprereter.java | 2 +- .../apache/zeppelin/livy/LivyInterpreterIT.java | 85 ++++++++++++++++++-- 2 files changed, 80 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/15214712/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java index 7f92127..ecb5d77 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java @@ -55,7 +55,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { private int pullStatusInterval; protected boolean displayAppInfo; private AtomicBoolean sessionExpired = new AtomicBoolean(false); - private LivyVersion livyVersion; + protected LivyVersion livyVersion; // keep tracking the mapping between paragraphId and statementId, so that we can cancel the // statement after we execute it. http://git-wip-us.apache.org/repos/asf/zeppelin/blob/15214712/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java ---------------------------------------------------------------------- diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index c8f355c..1a8e8df 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -82,13 +82,13 @@ public class LivyInterpreterIT { } InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); interpreterGroup.put("session_1", new ArrayList<Interpreter>()); - LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); + final LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); sparkInterpreter.setInterpreterGroup(interpreterGroup); interpreterGroup.get("session_1").add(sparkInterpreter); AuthenticationInfo authInfo = new AuthenticationInfo("user1"); MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); InterpreterOutput output = new InterpreterOutput(outputListener); - InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", + final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", "title", "text", authInfo, null, null, null, null, null, output); sparkInterpreter.open(); @@ -158,6 +158,31 @@ public class LivyInterpreterIT { assertEquals(InterpreterResult.Code.ERROR, result.code()); assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); assertTrue(result.message().get(0).getData().contains("incomplete statement")); + + // cancel + if (sparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) { + Thread cancelThread = new Thread() { + @Override + public void run() { + // invoke cancel after 3 seconds to wait job starting + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + sparkInterpreter.cancel(context); + } + }; + cancelThread.start(); + result = sparkInterpreter + .interpret("sc.parallelize(1 to 10).foreach(e=>Thread.sleep(10*1000))", context); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + String message = result.message().get(0).getData(); + // 2 possibilities, sometimes livy doesn't return the real cancel exception + assertTrue(message.contains("cancelled part of cancelled job group") || + message.contains("Job is cancelled")); + } + } finally { sparkInterpreter.close(); } @@ -289,11 +314,11 @@ public class LivyInterpreterIT { return; } - LivyPySparkInterpreter pysparkInterpreter = new LivyPySparkInterpreter(properties); + final LivyPySparkInterpreter pysparkInterpreter = new LivyPySparkInterpreter(properties); AuthenticationInfo authInfo = new AuthenticationInfo("user1"); MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); InterpreterOutput output = new InterpreterOutput(outputListener); - InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.pyspark", + final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.pyspark", "title", "text", authInfo, null, null, null, null, null, output); pysparkInterpreter.open(); @@ -341,6 +366,31 @@ public class LivyInterpreterIT { assertEquals(InterpreterResult.Code.ERROR, result.code()); assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); assertTrue(result.message().get(0).getData().contains("name 'a' is not defined")); + + // cancel + if (pysparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) { + Thread cancelThread = new Thread() { + @Override + public void run() { + // invoke cancel after 3 seconds to wait job starting + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + pysparkInterpreter.cancel(context); + } + }; + cancelThread.start(); + result = pysparkInterpreter + .interpret("import time\n" + + "sc.range(1, 10).foreach(lambda a: time.sleep(10))", context); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + String message = result.message().get(0).getData(); + // 2 possibilities, sometimes livy doesn't return the real cancel exception + assertTrue(message.contains("cancelled part of cancelled job group") || + message.contains("Job is cancelled")); + } } finally { pysparkInterpreter.close(); } @@ -384,7 +434,7 @@ public class LivyInterpreterIT { return; } - LivySparkRInterpreter sparkRInterpreter = new LivySparkRInterpreter(properties); + final LivySparkRInterpreter sparkRInterpreter = new LivySparkRInterpreter(properties); try { sparkRInterpreter.getLivyVersion(); } catch (APINotFoundException e) { @@ -394,7 +444,7 @@ public class LivyInterpreterIT { AuthenticationInfo authInfo = new AuthenticationInfo("user1"); MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); InterpreterOutput output = new InterpreterOutput(outputListener); - InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sparkr", + final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sparkr", "title", "text", authInfo, null, null, null, null, null, output); sparkRInterpreter.open(); @@ -408,6 +458,29 @@ public class LivyInterpreterIT { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, result.message().size()); assertTrue(result.message().get(0).getData().contains("eruptions waiting")); + + // cancel + Thread cancelThread = new Thread() { + @Override + public void run() { + // invoke cancel after 3 seconds to wait job starting + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + sparkRInterpreter.cancel(context); + } + }; + cancelThread.start(); + result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\n" + + "df1 <- dapplyCollect(df, function(x) " + + "{ Sys.sleep(10); x <- cbind(x, x$waiting * 60) })", context); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + String message = result.message().get(0).getData(); + // 2 possibilities, sometimes livy doesn't return the real cancel exception + assertTrue(message.contains("cancelled part of cancelled job group") || + message.contains("Job is cancelled")); } else { result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)" + "\nhead(df)", context);