Repository: zeppelin Updated Branches: refs/heads/master e5922b6bb -> 861f1d88f
[ZEPPELIN-2355] Livy cancel enhancements ### What is this PR for? The Cancel functionality for the Livy interpreter has few issues. One issue is because a variable is not published correctly. Second issue is observed when there is a delay in launching the application. Any cancel before application launch is ignored. The third issue is that Cancel is not correctly implemented for SparkSQLInterpreter. ### What type of PR is it? Bug Fix ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-2355 ### How should this be tested? The test cases are modified to test the changes. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Benoy Antony <be...@apache.org> Closes #2223 from benoyantony/livy-cancel-enhancement and squashes the following commits: 244e6d3 [Benoy Antony] clear the cancel requests if livy doesnt't support cancellation and modified testcase 75fe574 [Benoy Antony] added testcase for cancellation support on LivySparkSQLInterpreter and moved the removal to finally block 9fc6dbf [Benoy Antony] remove unrelated changes in imports 8673acf [Benoy Antony] ZEPPELIN-2355 Fix race conditions while cancelling a paragraph Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/861f1d88 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/861f1d88 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/861f1d88 Branch: refs/heads/master Commit: 861f1d88fe2ea105df6892abef14142327c49f6f Parents: e5922b6 Author: Benoy Antony <be...@apache.org> Authored: Tue Apr 11 20:55:36 2017 -0700 Committer: Jeff Zhang <zjf...@apache.org> Committed: Mon Apr 17 09:16:37 2017 +0800 ---------------------------------------------------------------------- .../zeppelin/livy/BaseLivyInterprereter.java | 52 ++++++----- .../zeppelin/livy/LivySparkSQLInterpreter.java | 5 ++ .../apache/zeppelin/livy/LivyInterpreterIT.java | 94 ++++++++++++++++++-- 3 files changed, 123 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/861f1d88/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java index 8fd0648..43cd507 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java @@ -43,13 +43,17 @@ import javax.net.ssl.SSLContext; import java.io.FileInputStream; import java.io.IOException; import java.security.KeyStore; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ConcurrentHashMap; + + /** * Base class for livy interpreters. */ @@ -68,9 +72,8 @@ public abstract class BaseLivyInterprereter extends Interpreter { protected LivyVersion livyVersion; private RestTemplate restTemplate; - // keep tracking the mapping between paragraphId and statementId, so that we can cancel the - // statement after we execute it. - private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMap = new ConcurrentHashMap<>(); + Set<Object> paragraphsToCancel = Collections.newSetFromMap( + new ConcurrentHashMap<Object, Boolean>()); private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap = new ConcurrentHashMap<>(); @@ -163,21 +166,8 @@ public abstract class BaseLivyInterprereter extends Interpreter { @Override public void cancel(InterpreterContext context) { - if (livyVersion.isCancelSupported()) { - String paraId = context.getParagraphId(); - Integer stmtId = paragraphId2StmtIdMap.get(paraId); - try { - if (stmtId != null) { - cancelStatement(stmtId); - } - } catch (LivyException e) { - LOGGER.error("Fail to cancel statement " + stmtId + " for paragraph " + paraId, e); - } finally { - paragraphId2StmtIdMap.remove(paraId); - } - } else { - LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion); - } + paragraphsToCancel.add(context.getParagraphId()); + LOGGER.info("Added paragraph " + context.getParagraphId() + " for cancellation."); } @Override @@ -261,11 +251,12 @@ public abstract class BaseLivyInterprereter extends Interpreter { } stmtInfo = executeStatement(new ExecuteRequest(code)); } - if (paragraphId != null) { - paragraphId2StmtIdMap.put(paragraphId, stmtInfo.id); - } // pull the statement status while (!stmtInfo.isAvailable()) { + if (paragraphId != null && paragraphsToCancel.contains(paragraphId)) { + cancel(stmtInfo.id, paragraphId); + return new InterpreterResult(InterpreterResult.Code.ERROR, "Job is cancelled"); + } try { Thread.sleep(pullStatusInterval); } catch (InterruptedException e) { @@ -285,9 +276,26 @@ public abstract class BaseLivyInterprereter extends Interpreter { } } finally { if (paragraphId != null) { - paragraphId2StmtIdMap.remove(paragraphId); paragraphId2StmtProgressMap.remove(paragraphId); + paragraphsToCancel.remove(paragraphId); + } + } + } + + private void cancel(int id, String paragraphId) { + if (livyVersion.isCancelSupported()) { + try { + LOGGER.info("Cancelling statement " + id); + cancelStatement(id); + } catch (LivyException e) { + LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e); } + finally { + paragraphsToCancel.remove(paragraphId); + } + } else { + LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion); + paragraphsToCancel.clear(); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/861f1d88/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index cdd4eac..9c0d359 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -230,6 +230,11 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter { } @Override + public void cancel(InterpreterContext context) { + sparkInterpreter.cancel(context); + } + + @Override public void close() { this.sparkInterpreter.close(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/861f1d88/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java ---------------------------------------------------------------------- diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index 3da908c..6537125 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -162,9 +162,9 @@ public class LivyInterpreterIT { Thread cancelThread = new Thread() { @Override public void run() { - // invoke cancel after 3 seconds to wait job starting + // invoke cancel after 1 millisecond to wait job starting try { - Thread.sleep(3000); + Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } @@ -306,6 +306,88 @@ public class LivyInterpreterIT { } } + + @Test + public void testSparkSQLCancellation() { + if (!checkPreCondition()) { + return; + } + InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); + interpreterGroup.put("session_1", new ArrayList<Interpreter>()); + LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); + sparkInterpreter.setInterpreterGroup(interpreterGroup); + interpreterGroup.get("session_1").add(sparkInterpreter); + AuthenticationInfo authInfo = new AuthenticationInfo("user1"); + MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); + InterpreterOutput output = new InterpreterOutput(outputListener); + final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", + "title", "text", authInfo, null, null, null, null, null, output); + sparkInterpreter.open(); + + final LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties); + interpreterGroup.get("session_1").add(sqlInterpreter); + sqlInterpreter.setInterpreterGroup(interpreterGroup); + sqlInterpreter.open(); + + try { + // detect spark version + InterpreterResult result = sparkInterpreter.interpret("sc.version", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + + boolean isSpark2 = isSpark2(sparkInterpreter, context); + + // test DataFrame api + if (!isSpark2) { + result = sparkInterpreter.interpret( + "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); + } else { + result = sparkInterpreter.interpret( + "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); + } + sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); + + // cancel + if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) { + Thread cancelThread = new Thread() { + @Override + public void run() { + sqlInterpreter.cancel(context); + } + }; + cancelThread.start(); + //sleep so that cancelThread performs a cancel. + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + result = sqlInterpreter + .interpret("select count(1) from df", context); + if (result.code().equals(InterpreterResult.Code.ERROR)) { + String message = result.message().get(0).getData(); + // 2 possibilities, sometimes livy doesn't return the real cancel exception + assertTrue(message.contains("cancelled part of cancelled job group") || + message.contains("Job is cancelled")); + } + } + } catch (LivyException e) { + } finally { + sparkInterpreter.close(); + sqlInterpreter.close(); + } + } + @Test public void testStringWithTruncation() { if (!checkPreCondition()) { @@ -495,9 +577,9 @@ public class LivyInterpreterIT { Thread cancelThread = new Thread() { @Override public void run() { - // invoke cancel after 3 seconds to wait job starting + // invoke cancel after 1 millisecond to wait job starting try { - Thread.sleep(3000); + Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } @@ -586,9 +668,9 @@ public class LivyInterpreterIT { Thread cancelThread = new Thread() { @Override public void run() { - // invoke cancel after 3 seconds to wait job starting + // invoke cancel after 1 millisecond to wait job starting try { - Thread.sleep(3000); + Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }