zeppelin git commit: ZEPPELIN-2151. Add integration test for livy cancel api
Repository: zeppelin Updated Branches: refs/heads/branch-0.7 b9bc2b810 -> 59c1e1b0a ZEPPELIN-2151. Add integration test for livy cancel api ### What is this PR for? Just add integration test for livy cancel api. Only do it for livy 0.3 as the cancel api is only available in livy 0.3 ### What type of PR is it? [ Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2151 ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff ZhangCloses #2056 from zjffdu/ZEPPELIN-2151 and squashes the following commits: b7ca7b3 [Jeff Zhang] ZEPPELIN-2151. Add integration test for livy cancel api (cherry picked from commit 152147122b9797baef20a382eb880eadcf7cdc0f) Signed-off-by: Jeff Zhang Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/59c1e1b0 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/59c1e1b0 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/59c1e1b0 Branch: refs/heads/branch-0.7 Commit: 59c1e1b0abc68ac978fc2c4a0284e852a93f4fc7 Parents: b9bc2b8 Author: Jeff Zhang Authored: Wed Feb 22 19:59:47 2017 +0800 Committer: Jeff Zhang Committed: Fri Apr 28 08:48:51 2017 +0800 -- .../zeppelin/livy/BaseLivyInterprereter.java| 2 +- .../apache/zeppelin/livy/LivyInterpreterIT.java | 85 ++-- 2 files changed, 80 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/59c1e1b0/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java -- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java index 7f92127..ecb5d77 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java @@ -55,7 +55,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { private int pullStatusInterval; protected boolean displayAppInfo; private AtomicBoolean sessionExpired = new AtomicBoolean(false); - private LivyVersion livyVersion; + protected LivyVersion livyVersion; // keep tracking the mapping between paragraphId and statementId, so that we can cancel the // statement after we execute it. http://git-wip-us.apache.org/repos/asf/zeppelin/blob/59c1e1b0/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java -- diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index aec2742..7df9b20 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -82,13 +82,13 @@ public class LivyInterpreterIT { } InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); interpreterGroup.put("session_1", new ArrayList()); -LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); +final LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); sparkInterpreter.setInterpreterGroup(interpreterGroup); interpreterGroup.get("session_1").add(sparkInterpreter); AuthenticationInfo authInfo = new AuthenticationInfo("user1"); MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); InterpreterOutput output = new InterpreterOutput(outputListener); -InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", +final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", "title", "text", authInfo, null, null, null, null, null, output); sparkInterpreter.open(); @@ -158,6 +158,31 @@ public class LivyInterpreterIT { assertEquals(InterpreterResult.Code.ERROR, result.code()); assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); assertTrue(result.message().get(0).getData().contains("incomplete statement")); + + // cancel + if (sparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) { +Thread cancelThread = new Thread() { + @Override + public void run() { +// invoke cancel after 3 seconds to wait job starting +try { + Thread.sleep(3000); +} catch (InterruptedException e) { + e.printStackTrace(); +
zeppelin git commit: ZEPPELIN-2151. Add integration test for livy cancel api
Repository: zeppelin Updated Branches: refs/heads/master df1c654e0 -> 152147122 ZEPPELIN-2151. Add integration test for livy cancel api ### What is this PR for? Just add integration test for livy cancel api. Only do it for livy 0.3 as the cancel api is only available in livy 0.3 ### What type of PR is it? [ Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2151 ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff ZhangCloses #2056 from zjffdu/ZEPPELIN-2151 and squashes the following commits: b7ca7b3 [Jeff Zhang] ZEPPELIN-2151. Add integration test for livy cancel api Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/15214712 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/15214712 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/15214712 Branch: refs/heads/master Commit: 152147122b9797baef20a382eb880eadcf7cdc0f Parents: df1c654 Author: Jeff Zhang Authored: Wed Feb 22 19:59:47 2017 +0800 Committer: Felix Cheung Committed: Sun Mar 12 11:06:58 2017 -0700 -- .../zeppelin/livy/BaseLivyInterprereter.java| 2 +- .../apache/zeppelin/livy/LivyInterpreterIT.java | 85 ++-- 2 files changed, 80 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/15214712/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java -- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java index 7f92127..ecb5d77 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java @@ -55,7 +55,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { private int pullStatusInterval; protected boolean displayAppInfo; private AtomicBoolean sessionExpired = new AtomicBoolean(false); - private LivyVersion livyVersion; + protected LivyVersion livyVersion; // keep tracking the mapping between paragraphId and statementId, so that we can cancel the // statement after we execute it. http://git-wip-us.apache.org/repos/asf/zeppelin/blob/15214712/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java -- diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index c8f355c..1a8e8df 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -82,13 +82,13 @@ public class LivyInterpreterIT { } InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); interpreterGroup.put("session_1", new ArrayList()); -LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); +final LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); sparkInterpreter.setInterpreterGroup(interpreterGroup); interpreterGroup.get("session_1").add(sparkInterpreter); AuthenticationInfo authInfo = new AuthenticationInfo("user1"); MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); InterpreterOutput output = new InterpreterOutput(outputListener); -InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", +final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", "title", "text", authInfo, null, null, null, null, null, output); sparkInterpreter.open(); @@ -158,6 +158,31 @@ public class LivyInterpreterIT { assertEquals(InterpreterResult.Code.ERROR, result.code()); assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); assertTrue(result.message().get(0).getData().contains("incomplete statement")); + + // cancel + if (sparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) { +Thread cancelThread = new Thread() { + @Override + public void run() { +// invoke cancel after 3 seconds to wait job starting +try { + Thread.sleep(3000); +} catch (InterruptedException e) { + e.printStackTrace(); +} +sparkInterpreter.cancel(context); + } +}; +cancelThread.start(); +