Repository: zeppelin
Updated Branches:
  refs/heads/master df1c654e0 -> 152147122


ZEPPELIN-2151. Add integration test for livy cancel api

### What is this PR for?

Just add integration test for livy cancel api. Only do it for livy 0.3 as the 
cancel api is only available in livy 0.3

### What type of PR is it?
[ Improvement]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2151

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2056 from zjffdu/ZEPPELIN-2151 and squashes the following commits:

b7ca7b3 [Jeff Zhang] ZEPPELIN-2151. Add integration test for livy cancel api


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/15214712
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/15214712
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/15214712

Branch: refs/heads/master
Commit: 152147122b9797baef20a382eb880eadcf7cdc0f
Parents: df1c654
Author: Jeff Zhang <zjf...@apache.org>
Authored: Wed Feb 22 19:59:47 2017 +0800
Committer: Felix Cheung <felixche...@apache.org>
Committed: Sun Mar 12 11:06:58 2017 -0700

----------------------------------------------------------------------
 .../zeppelin/livy/BaseLivyInterprereter.java    |  2 +-
 .../apache/zeppelin/livy/LivyInterpreterIT.java | 85 ++++++++++++++++++--
 2 files changed, 80 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/15214712/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
----------------------------------------------------------------------
diff --git 
a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java 
b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
index 7f92127..ecb5d77 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
@@ -55,7 +55,7 @@ public abstract class BaseLivyInterprereter extends 
Interpreter {
   private int pullStatusInterval;
   protected boolean displayAppInfo;
   private AtomicBoolean sessionExpired = new AtomicBoolean(false);
-  private LivyVersion livyVersion;
+  protected LivyVersion livyVersion;
 
   // keep tracking the mapping between paragraphId and statementId, so that we 
can cancel the
   // statement after we execute it.

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/15214712/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java 
b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index c8f355c..1a8e8df 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -82,13 +82,13 @@ public class LivyInterpreterIT {
     }
     InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
     interpreterGroup.put("session_1", new ArrayList<Interpreter>());
-    LivySparkInterpreter sparkInterpreter = new 
LivySparkInterpreter(properties);
+    final LivySparkInterpreter sparkInterpreter = new 
LivySparkInterpreter(properties);
     sparkInterpreter.setInterpreterGroup(interpreterGroup);
     interpreterGroup.get("session_1").add(sparkInterpreter);
     AuthenticationInfo authInfo = new AuthenticationInfo("user1");
     MyInterpreterOutputListener outputListener = new 
MyInterpreterOutputListener();
     InterpreterOutput output = new InterpreterOutput(outputListener);
-    InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.spark",
+    final InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.spark",
         "title", "text", authInfo, null, null, null, null, null, output);
     sparkInterpreter.open();
 
@@ -158,6 +158,31 @@ public class LivyInterpreterIT {
       assertEquals(InterpreterResult.Code.ERROR, result.code());
       assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
       assertTrue(result.message().get(0).getData().contains("incomplete 
statement"));
+
+      // cancel
+      if 
(sparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) {
+        Thread cancelThread = new Thread() {
+          @Override
+          public void run() {
+            // invoke cancel after 3 seconds to wait job starting
+            try {
+              Thread.sleep(3000);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+            sparkInterpreter.cancel(context);
+          }
+        };
+        cancelThread.start();
+        result = sparkInterpreter
+            .interpret("sc.parallelize(1 to 
10).foreach(e=>Thread.sleep(10*1000))", context);
+        assertEquals(InterpreterResult.Code.ERROR, result.code());
+        String message = result.message().get(0).getData();
+        // 2 possibilities, sometimes livy doesn't return the real cancel 
exception
+        assertTrue(message.contains("cancelled part of cancelled job group") ||
+            message.contains("Job is cancelled"));
+      }
+
     } finally {
       sparkInterpreter.close();
     }
@@ -289,11 +314,11 @@ public class LivyInterpreterIT {
       return;
     }
 
-    LivyPySparkInterpreter pysparkInterpreter = new 
LivyPySparkInterpreter(properties);
+    final LivyPySparkInterpreter pysparkInterpreter = new 
LivyPySparkInterpreter(properties);
     AuthenticationInfo authInfo = new AuthenticationInfo("user1");
     MyInterpreterOutputListener outputListener = new 
MyInterpreterOutputListener();
     InterpreterOutput output = new InterpreterOutput(outputListener);
-    InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.pyspark",
+    final InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.pyspark",
         "title", "text", authInfo, null, null, null, null, null, output);
     pysparkInterpreter.open();
 
@@ -341,6 +366,31 @@ public class LivyInterpreterIT {
       assertEquals(InterpreterResult.Code.ERROR, result.code());
       assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
       assertTrue(result.message().get(0).getData().contains("name 'a' is not 
defined"));
+
+      // cancel
+      if 
(pysparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) {
+        Thread cancelThread = new Thread() {
+          @Override
+          public void run() {
+            // invoke cancel after 3 seconds to wait job starting
+            try {
+              Thread.sleep(3000);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+            pysparkInterpreter.cancel(context);
+          }
+        };
+        cancelThread.start();
+        result = pysparkInterpreter
+            .interpret("import time\n" +
+                "sc.range(1, 10).foreach(lambda a: time.sleep(10))", context);
+        assertEquals(InterpreterResult.Code.ERROR, result.code());
+        String message = result.message().get(0).getData();
+        // 2 possibilities, sometimes livy doesn't return the real cancel 
exception
+        assertTrue(message.contains("cancelled part of cancelled job group") ||
+            message.contains("Job is cancelled"));
+      }
     } finally {
       pysparkInterpreter.close();
     }
@@ -384,7 +434,7 @@ public class LivyInterpreterIT {
       return;
     }
 
-    LivySparkRInterpreter sparkRInterpreter = new 
LivySparkRInterpreter(properties);
+    final LivySparkRInterpreter sparkRInterpreter = new 
LivySparkRInterpreter(properties);
     try {
       sparkRInterpreter.getLivyVersion();
     } catch (APINotFoundException e) {
@@ -394,7 +444,7 @@ public class LivyInterpreterIT {
     AuthenticationInfo authInfo = new AuthenticationInfo("user1");
     MyInterpreterOutputListener outputListener = new 
MyInterpreterOutputListener();
     InterpreterOutput output = new InterpreterOutput(outputListener);
-    InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.sparkr",
+    final InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.sparkr",
         "title", "text", authInfo, null, null, null, null, null, output);
     sparkRInterpreter.open();
 
@@ -408,6 +458,29 @@ public class LivyInterpreterIT {
         assertEquals(InterpreterResult.Code.SUCCESS, result.code());
         assertEquals(1, result.message().size());
         assertTrue(result.message().get(0).getData().contains("eruptions 
waiting"));
+
+        // cancel
+        Thread cancelThread = new Thread() {
+          @Override
+          public void run() {
+            // invoke cancel after 3 seconds to wait job starting
+            try {
+              Thread.sleep(3000);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+            sparkRInterpreter.cancel(context);
+          }
+        };
+        cancelThread.start();
+        result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\n" +
+            "df1 <- dapplyCollect(df, function(x) " +
+            "{ Sys.sleep(10); x <- cbind(x, x$waiting * 60) })", context);
+        assertEquals(InterpreterResult.Code.ERROR, result.code());
+        String message = result.message().get(0).getData();
+        // 2 possibilities, sometimes livy doesn't return the real cancel 
exception
+        assertTrue(message.contains("cancelled part of cancelled job group") ||
+            message.contains("Job is cancelled"));
       } else {
         result = sparkRInterpreter.interpret("df <- 
createDataFrame(sqlContext, faithful)" +
             "\nhead(df)", context);

Reply via email to