Repository: zeppelin Updated Branches: refs/heads/master 621c5be2d -> e3315b2d2
ZEPPELIN-2251. Support getProgress for livy interpreter ### What is this PR for? Livy 0.4 will support getting progress from statement, this PR would integrate this feature into livy interpreter. ### What type of PR is it? [Improvement | Feature] ### Todos * [ ] - Task ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-2251 ### How should this be tested? Tested with latest livy 0.4-SNAPSHOT ### Screenshots (if appropriate) ![zeppelin_livy_progress](https://cloud.githubusercontent.com/assets/164491/23883496/4ae31dee-08a2-11e7-8ffd-86b989ad1a6a.gif) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2130 from zjffdu/ZEPPELIN-2251 and squashes the following commits: 509432a [Jeff Zhang] ZEPPELIN-2251. Support getProgress for livy interpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e3315b2d Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e3315b2d Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e3315b2d Branch: refs/heads/master Commit: e3315b2d201d07d6f573237261e58bf89a51a391 Parents: 621c5be Author: Jeff Zhang <zjf...@apache.org> Authored: Tue Mar 14 10:31:12 2017 +0800 Committer: Felix Cheung <felixche...@apache.org> Committed: Wed Mar 22 23:53:18 2017 -0700 ---------------------------------------------------------------------- .../zeppelin/livy/BaseLivyInterprereter.java | 20 +++++++++++++++----- .../org/apache/zeppelin/livy/LivyVersion.java | 5 +++++ 2 files changed, 20 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e3315b2d/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java index ecb5d77..c580901 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java @@ -59,7 +59,9 @@ public abstract class BaseLivyInterprereter extends Interpreter { // keep tracking the mapping between paragraphId and statementId, so that we can cancel the // statement after we execute it. - private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMapping = new ConcurrentHashMap<>(); + private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap = + new ConcurrentHashMap<>(); public BaseLivyInterprereter(Properties property) { super(property); @@ -151,7 +153,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { public void cancel(InterpreterContext context) { if (livyVersion.isCancelSupported()) { String paraId = context.getParagraphId(); - Integer stmtId = paragraphId2StmtIdMapping.get(paraId); + Integer stmtId = paragraphId2StmtIdMap.get(paraId); try { if (stmtId != null) { cancelStatement(stmtId); @@ -159,7 +161,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { } catch (LivyException e) { LOGGER.error("Fail to cancel statement " + stmtId + " for paragraph " + paraId, e); } finally { - paragraphId2StmtIdMapping.remove(paraId); + paragraphId2StmtIdMap.remove(paraId); } } else { LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion); @@ -173,6 +175,11 @@ public abstract class BaseLivyInterprereter extends Interpreter { @Override public int getProgress(InterpreterContext context) { + if (livyVersion.isGetProgressSupported()) { + String paraId = context.getParagraphId(); + Integer progress = paragraphId2StmtProgressMap.get(paraId); + return progress == null ? 0 : progress; + } return 0; } @@ -243,7 +250,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { stmtInfo = executeStatement(new ExecuteRequest(code)); } if (paragraphId != null) { - paragraphId2StmtIdMapping.put(paragraphId, stmtInfo.id); + paragraphId2StmtIdMap.put(paragraphId, stmtInfo.id); } // pull the statement status while (!stmtInfo.isAvailable()) { @@ -254,6 +261,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { throw new LivyException(e); } stmtInfo = getStatementInfo(stmtInfo.id); + paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100)); } if (appendSessionExpired) { return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo), @@ -263,7 +271,8 @@ public abstract class BaseLivyInterprereter extends Interpreter { } } finally { if (paragraphId != null) { - paragraphId2StmtIdMapping.remove(paragraphId); + paragraphId2StmtIdMap.remove(paragraphId); + paragraphId2StmtProgressMap.remove(paragraphId); } } } @@ -537,6 +546,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { private static class StatementInfo { public Integer id; public String state; + public double progress; public StatementOutput output; public StatementInfo() { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e3315b2d/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java index f56100f..7cfecfb 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java @@ -28,6 +28,7 @@ public class LivyVersion { protected static final LivyVersion LIVY_0_2_0 = LivyVersion.fromVersionString("0.2.0"); protected static final LivyVersion LIVY_0_3_0 = LivyVersion.fromVersionString("0.3.0"); + protected static final LivyVersion LIVY_0_4_0 = LivyVersion.fromVersionString("0.4.0"); private int version; private String versionString; @@ -74,6 +75,10 @@ public class LivyVersion { return this.newerThanEquals(LIVY_0_3_0); } + public boolean isGetProgressSupported() { + return this.newerThanEquals(LIVY_0_4_0); + } + public boolean equals(Object versionToCompare) { return version == ((LivyVersion) versionToCompare).version; }