Repository: zeppelin
Updated Branches:
  refs/heads/master 621c5be2d -> e3315b2d2


ZEPPELIN-2251. Support getProgress for livy interpreter

### What is this PR for?
Livy 0.4 will support getting progress from statement, this PR would integrate 
this feature into livy interpreter.

### What type of PR is it?
[Improvement | Feature]

### Todos
* [ ] - Task

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-2251

### How should this be tested?
Tested with latest livy 0.4-SNAPSHOT

### Screenshots (if appropriate)
![zeppelin_livy_progress](https://cloud.githubusercontent.com/assets/164491/23883496/4ae31dee-08a2-11e7-8ffd-86b989ad1a6a.gif)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2130 from zjffdu/ZEPPELIN-2251 and squashes the following commits:

509432a [Jeff Zhang] ZEPPELIN-2251. Support getProgress for livy interpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e3315b2d
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e3315b2d
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e3315b2d

Branch: refs/heads/master
Commit: e3315b2d201d07d6f573237261e58bf89a51a391
Parents: 621c5be
Author: Jeff Zhang <zjf...@apache.org>
Authored: Tue Mar 14 10:31:12 2017 +0800
Committer: Felix Cheung <felixche...@apache.org>
Committed: Wed Mar 22 23:53:18 2017 -0700

----------------------------------------------------------------------
 .../zeppelin/livy/BaseLivyInterprereter.java    | 20 +++++++++++++++-----
 .../org/apache/zeppelin/livy/LivyVersion.java   |  5 +++++
 2 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e3315b2d/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
----------------------------------------------------------------------
diff --git 
a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java 
b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
index ecb5d77..c580901 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
@@ -59,7 +59,9 @@ public abstract class BaseLivyInterprereter extends 
Interpreter {
 
   // keep tracking the mapping between paragraphId and statementId, so that we 
can cancel the
   // statement after we execute it.
-  private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMapping = new 
ConcurrentHashMap<>();
+  private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMap = new 
ConcurrentHashMap<>();
+  private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap =
+      new ConcurrentHashMap<>();
 
   public BaseLivyInterprereter(Properties property) {
     super(property);
@@ -151,7 +153,7 @@ public abstract class BaseLivyInterprereter extends 
Interpreter {
   public void cancel(InterpreterContext context) {
     if (livyVersion.isCancelSupported()) {
       String paraId = context.getParagraphId();
-      Integer stmtId = paragraphId2StmtIdMapping.get(paraId);
+      Integer stmtId = paragraphId2StmtIdMap.get(paraId);
       try {
         if (stmtId != null) {
           cancelStatement(stmtId);
@@ -159,7 +161,7 @@ public abstract class BaseLivyInterprereter extends 
Interpreter {
       } catch (LivyException e) {
         LOGGER.error("Fail to cancel statement " + stmtId + " for paragraph " 
+ paraId, e);
       } finally {
-        paragraphId2StmtIdMapping.remove(paraId);
+        paragraphId2StmtIdMap.remove(paraId);
       }
     } else {
       LOGGER.warn("cancel is not supported for this version of livy: " + 
livyVersion);
@@ -173,6 +175,11 @@ public abstract class BaseLivyInterprereter extends 
Interpreter {
 
   @Override
   public int getProgress(InterpreterContext context) {
+    if (livyVersion.isGetProgressSupported()) {
+      String paraId = context.getParagraphId();
+      Integer progress = paragraphId2StmtProgressMap.get(paraId);
+      return progress == null ? 0 : progress;
+    }
     return 0;
   }
 
@@ -243,7 +250,7 @@ public abstract class BaseLivyInterprereter extends 
Interpreter {
         stmtInfo = executeStatement(new ExecuteRequest(code));
       }
       if (paragraphId != null) {
-        paragraphId2StmtIdMapping.put(paragraphId, stmtInfo.id);
+        paragraphId2StmtIdMap.put(paragraphId, stmtInfo.id);
       }
       // pull the statement status
       while (!stmtInfo.isAvailable()) {
@@ -254,6 +261,7 @@ public abstract class BaseLivyInterprereter extends 
Interpreter {
           throw new LivyException(e);
         }
         stmtInfo = getStatementInfo(stmtInfo.id);
+        paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress 
* 100));
       }
       if (appendSessionExpired) {
         return appendSessionExpire(getResultFromStatementInfo(stmtInfo, 
displayAppInfo),
@@ -263,7 +271,8 @@ public abstract class BaseLivyInterprereter extends 
Interpreter {
       }
     } finally {
       if (paragraphId != null) {
-        paragraphId2StmtIdMapping.remove(paragraphId);
+        paragraphId2StmtIdMap.remove(paragraphId);
+        paragraphId2StmtProgressMap.remove(paragraphId);
       }
     }
   }
@@ -537,6 +546,7 @@ public abstract class BaseLivyInterprereter extends 
Interpreter {
   private static class StatementInfo {
     public Integer id;
     public String state;
+    public double progress;
     public StatementOutput output;
 
     public StatementInfo() {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e3315b2d/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java 
b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
index f56100f..7cfecfb 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
@@ -28,6 +28,7 @@ public class LivyVersion {
 
   protected static final LivyVersion LIVY_0_2_0 = 
LivyVersion.fromVersionString("0.2.0");
   protected static final LivyVersion LIVY_0_3_0 = 
LivyVersion.fromVersionString("0.3.0");
+  protected static final LivyVersion LIVY_0_4_0 = 
LivyVersion.fromVersionString("0.4.0");
 
   private int version;
   private String versionString;
@@ -74,6 +75,10 @@ public class LivyVersion {
     return this.newerThanEquals(LIVY_0_3_0);
   }
 
+  public boolean isGetProgressSupported() {
+    return this.newerThanEquals(LIVY_0_4_0);
+  }
+
   public boolean equals(Object versionToCompare) {
     return version == ((LivyVersion) versionToCompare).version;
   }

Reply via email to