This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 275b621  ZEPPELIN-3617. Allow to specify saving resourceName as 
paragraph property
275b621 is described below

commit 275b62159062c877057fe452905fd52f484c099e
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Jul 12 14:28:02 2018 +0800

    ZEPPELIN-3617. Allow to specify saving resourceName as paragraph property
    
    ### What is this PR for?
    This is allow user to specify resourceName when they want to save the 
paragraph result into ResourcePool. Before this PR, user don't have control on 
what name of the saving resource name. It is associated with noteId and 
paragraphId, but this is not a good solution. Because when you clone the note, 
noteId will be changed, and you have to change the noteId in code as well. This 
PR is trying to allow user to set resource Name for the saving paragraph result.
    
    ### What type of PR is it?
    [ Feature |g]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-3617
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    ![screen shot 2018-07-12 at 2 45 54 
pm](https://user-images.githubusercontent.com/164491/42616882-5ee2fa3a-85e2-11e8-80d5-3ba45c84114b.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3065 from zjffdu/ZEPPELIN-3617 and squashes the following commits:
    
    024965e64 [Jeff Zhang] ZEPPELIN-3617. Allow to specify saving resourceName 
as paragraph property
---
 .../apache/zeppelin/python/PythonInterpreter.java  |  2 +
 .../src/main/resources/python/zeppelin_context.py  | 18 +++++-
 .../apache/zeppelin/spark/PySparkInterpreter.java  |  1 +
 .../src/main/resources/R/zeppelin_sparkr.R         |  6 ++
 .../org/apache/zeppelin/spark/SparkShimsTest.java  |  9 ++-
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala |  8 ++-
 .../zeppelin/spark/SparkZeppelinContext.scala      |  5 ++
 .../org/apache/zeppelin/spark/SparkShims.java      | 18 ++++--
 .../org/apache/zeppelin/spark/Spark1Shims.java     | 29 ++++++++-
 .../org/apache/zeppelin/spark/Spark2Shims.java     | 27 +++++++-
 .../zeppelin/integration/JdbcIntegrationTest.java  | 23 +++++++
 .../integration/ZeppelinSparkClusterTest.java      | 74 ++++++++++++++++++++--
 .../zeppelin/interpreter/BaseZeppelinContext.java  | 19 ++++++
 .../remote/RemoteInterpreterServer.java            | 29 ++++++---
 .../zeppelin/resource/DistributedResourcePool.java |  2 +
 .../org/apache/zeppelin/resource/Resource.java     |  6 +-
 .../org/apache/zeppelin/resource/ResourceId.java   |  1 +
 17 files changed, 249 insertions(+), 28 deletions(-)

diff --git 
a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java 
b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index bfa8348..cb958ab 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -559,6 +559,8 @@ public class PythonInterpreter extends Interpreter {
           InterpreterContext.get());
       if (result.code() != Code.SUCCESS) {
         throw new IOException("Fail to run bootstrap script: " + resourceName 
+ "\n" + result);
+      } else {
+        LOGGER.debug("Bootstrap python successfully.");
       }
     } catch (InterpreterException e) {
       throw new IOException(e);
diff --git a/python/src/main/resources/python/zeppelin_context.py 
b/python/src/main/resources/python/zeppelin_context.py
index e2c0b8c..4046c69 100644
--- a/python/src/main/resources/python/zeppelin_context.py
+++ b/python/src/main/resources/python/zeppelin_context.py
@@ -61,6 +61,14 @@ class PyZeppelinContext(object):
     def get(self, key):
         return self.__getitem__(key)
 
+    def getAsDataFrame(self, key):
+        value = self.get(key)
+        try:
+            import pandas as pd
+        except ImportError:
+            print("fail to call getAsDataFrame as pandas is not installed")
+        return pd.read_csv(StringIO(value), sep="\t")
+
     def angular(self, key, noteId = None, paragraphId = None):
         return self.z.angular(key, noteId, paragraphId)
 
@@ -158,6 +166,7 @@ class PyZeppelinContext(object):
 
         body_buf = StringIO("")
         rows = df.head(self.max_result).values if exceed_limit else df.values
+        rowNumber = len(rows)
         index = df.index.values
         for idx, row in zip(index, rows):
             if show_index:
@@ -167,13 +176,16 @@ class PyZeppelinContext(object):
             for cell in row[1:]:
                 body_buf.write("\t")
                 body_buf.write(str(cell))
-            body_buf.write("\n")
+            # don't print '\n' after the last row
+            if idx != (rowNumber - 1):
+                body_buf.write("\n")
         body_buf.seek(0)
         header_buf.seek(0)
         print("%table " + header_buf.read() + body_buf.read())
-        body_buf.close(); header_buf.close()
+        body_buf.close()
+        header_buf.close()
         if exceed_limit:
-            print("%html <font color=red>Results are limited by 
{}.</font>".format(self.max_result))
+            print("\n%html <font color=red>Results are limited by 
{}.</font>".format(self.max_result))
 
     def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
                         **kwargs):
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 960227a..546f74a 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -98,6 +98,7 @@ public class PySparkInterpreter extends PythonInterpreter {
       try {
         bootstrapInterpreter("python/zeppelin_pyspark.py");
       } catch (IOException e) {
+        LOGGER.error("Fail to bootstrap pyspark", e);
         throw new InterpreterException("Fail to bootstrap pyspark", e);
       }
     }
diff --git a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R 
b/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
index f720d56..8d0a8d1 100644
--- a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
+++ b/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
@@ -66,6 +66,12 @@ z.put <- function(name, object) {
 z.get <- function(name) {
   SparkR:::callJMethod(.zeppelinContext, "get", name)
 }
+
+z.getAsDataFrame <- function(name) {
+  stringValue <- z.get(name)
+  read.table(text=stringValue, header=TRUE, sep="\t")
+}
+
 z.angular <- function(name, noteId=NULL, paragraphId=NULL) {
   SparkR:::callJMethod(.zeppelinContext, "angular", name, noteId, paragraphId)
 }
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
index 1b4dd99..1143883 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
@@ -99,6 +99,11 @@ public class SparkShimsTest {
             public String showDataFrame(Object obj, int maxResult) {
               return null;
             }
+
+            @Override
+            public Object getAsDataFrame(String value) {
+              return null;
+            }
           };
       assertEquals(expected, sparkShims.supportYarn6615(version));
     }
@@ -121,9 +126,9 @@ public class SparkShimsTest {
       when(mockContext.getIntpEventClient()).thenReturn(mockIntpEventClient);
       
doNothing().when(mockIntpEventClient).onParaInfosReceived(argumentCaptor.capture());
       try {
-        sparkShims = 
SparkShims.getInstance(SparkVersion.SPARK_2_0_0.toString(), new Properties());
+        sparkShims = 
SparkShims.getInstance(SparkVersion.SPARK_2_0_0.toString(), new Properties(), 
null);
       } catch (Throwable ignore) {
-        sparkShims = 
SparkShims.getInstance(SparkVersion.SPARK_1_6_0.toString(), new Properties());
+        sparkShims = 
SparkShims.getInstance(SparkVersion.SPARK_1_6_0.toString(), new Properties(), 
null);
       }
     }
 
diff --git 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index ced1c1f..bed2ee4 100644
--- 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -303,7 +303,13 @@ abstract class BaseSparkScalaInterpreter(val conf: 
SparkConf,
   }
 
   protected def createZeppelinContext(): Unit = {
-    val sparkShims = SparkShims.getInstance(sc.version, properties)
+
+    var sparkShims: SparkShims = null
+    if (isSparkSessionPresent()) {
+      sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession)
+    } else {
+      sparkShims = SparkShims.getInstance(sc.version, properties, sc)
+    }
     var webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl");
     if (StringUtils.isBlank(webUiUrl)) {
       webUiUrl = sparkUrl;
diff --git 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index 83594d0..2de37d6 100644
--- 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -20,6 +20,7 @@ package org.apache.zeppelin.spark
 import java.util
 
 import org.apache.spark.SparkContext
+import org.apache.spark.sql.DataFrame
 import org.apache.zeppelin.annotation.ZeppelinApi
 import org.apache.zeppelin.display.AngularObjectWatcher
 import org.apache.zeppelin.display.ui.OptionInput.ParamOption
@@ -146,4 +147,8 @@ class SparkZeppelinContext(val sc: SparkContext,
     }
     angularWatch(name, noteId, w)
   }
+
+  def getAsDataFrame(name: String): Object = {
+    sparkShims.getAsDataFrame(get(name).toString)
+  }
 }
diff --git 
a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java 
b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
index 1482e38..e281b25 100644
--- a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
+++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
@@ -54,7 +54,7 @@ public abstract class SparkShims {
     this.properties = properties;
   }
 
-  private static SparkShims loadShims(String sparkVersion, Properties 
properties)
+  private static SparkShims loadShims(String sparkVersion, Properties 
properties, Object entryPoint)
       throws ReflectiveOperationException {
     Class<?> sparkShimsClass;
     if ("2".equals(sparkVersion)) {
@@ -65,15 +65,22 @@ public abstract class SparkShims {
       sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark1Shims");
     }
 
-    Constructor c = sparkShimsClass.getConstructor(Properties.class);
-    return (SparkShims) c.newInstance(properties);
+    Constructor c = sparkShimsClass.getConstructor(Properties.class, 
Object.class);
+    return (SparkShims) c.newInstance(properties, entryPoint);
   }
 
-  public static SparkShims getInstance(String sparkVersion, Properties 
properties) {
+  /**
+   *
+   * @param sparkVersion
+   * @param properties
+   * @param entryPoint  entryPoint is SparkContext for Spark 1.x SparkSession 
for Spark 2.x
+   * @return
+   */
+  public static SparkShims getInstance(String sparkVersion, Properties 
properties, Object entryPoint) {
     if (sparkShims == null) {
       String sparkMajorVersion = getSparkMajorVersion(sparkVersion);
       try {
-        sparkShims = loadShims(sparkMajorVersion, properties);
+        sparkShims = loadShims(sparkMajorVersion, properties, entryPoint);
       } catch (ReflectiveOperationException e) {
         throw new RuntimeException(e);
       }
@@ -95,6 +102,7 @@ public abstract class SparkShims {
 
   public abstract String showDataFrame(Object obj, int maxResult);
 
+  public abstract Object getAsDataFrame(String value);
 
   protected void buildSparkJobUrl(String master,
                                   String sparkWebUrl,
diff --git 
a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java 
b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
index 6c86925..8e60ed0 100644
--- 
a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
+++ 
b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
@@ -23,17 +23,24 @@ import org.apache.spark.SparkContext;
 import org.apache.spark.scheduler.SparkListenerJobStart;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.StructType;
 import org.apache.spark.ui.jobs.JobProgressListener;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.ResultMessages;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
 public class Spark1Shims extends SparkShims {
 
-  public Spark1Shims(Properties properties) {
+  private SparkContext sc;
+
+  public Spark1Shims(Properties properties, Object entryPoint) {
     super(properties);
+    this.sc = (SparkContext) entryPoint;
   }
 
   public void setupSparkListener(final String master,
@@ -91,4 +98,24 @@ public class Spark1Shims extends SparkShims {
       return obj.toString();
     }
   }
+
+  @Override
+  public DataFrame getAsDataFrame(String value) {
+    String[] lines = value.split("\\n");
+    String head = lines[0];
+    String[] columns = head.split("\t");
+    StructType schema = new StructType();
+    for (String column : columns) {
+      schema = schema.add(column, "String");
+    }
+
+    List<Row> rows = new ArrayList<>();
+    for (int i = 1; i < lines.length; ++i) {
+      String[] tokens = lines[i].split("\t");
+      Row row = new GenericRow(tokens);
+      rows.add(row);
+    }
+    return SQLContext.getOrCreate(sc)
+            .createDataFrame(rows, schema);
+  }
 }
diff --git 
a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java 
b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
index 041ed01..a7304c5 100644
--- 
a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
+++ 
b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
@@ -24,16 +24,23 @@ import org.apache.spark.scheduler.SparkListener;
 import org.apache.spark.scheduler.SparkListenerJobStart;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.StructType;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.ResultMessages;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
 public class Spark2Shims extends SparkShims {
 
-  public Spark2Shims(Properties properties) {
+  private SparkSession sparkSession;
+
+  public Spark2Shims(Properties properties, Object entryPoint) {
     super(properties);
+    this.sparkSession = (SparkSession) entryPoint;
   }
 
   public void setupSparkListener(final String master,
@@ -93,4 +100,22 @@ public class Spark2Shims extends SparkShims {
     }
   }
 
+  @Override
+  public Dataset<Row> getAsDataFrame(String value) {
+    String[] lines = value.split("\\n");
+    String head = lines[0];
+    String[] columns = head.split("\t");
+    StructType schema = new StructType();
+    for (String column : columns) {
+      schema = schema.add(column, "String");
+    }
+
+    List<Row> rows = new ArrayList<>();
+    for (int i = 1; i < lines.length; ++i) {
+      String[] tokens = lines[i].split("\t");
+      Row row = new GenericRow(tokens);
+      rows.add(row);
+    }
+    return sparkSession.createDataFrame(rows, schema);
+  }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java
index 81d2812..01c0acf 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java
@@ -64,6 +64,7 @@ public class JdbcIntegrationTest {
     interpreterSetting.setProperty("default.driver", "com.mysql.jdbc.Driver");
     interpreterSetting.setProperty("default.url", 
"jdbc:mysql://localhost:3306/");
     interpreterSetting.setProperty("default.user", "root");
+
     Dependency dependency = new 
Dependency("mysql:mysql-connector-java:5.1.46");
     interpreterSetting.setDependencies(Lists.newArrayList(dependency));
     interpreterSettingManager.restart(interpreterSetting.getId());
@@ -78,5 +79,27 @@ public class JdbcIntegrationTest {
             .build();
     InterpreterResult interpreterResult = jdbcInterpreter.interpret("show 
databases;", context);
     assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, 
interpreterResult.code());
+
+    context.getLocalProperties().put("saveAs", "table_1");
+    interpreterResult = jdbcInterpreter.interpret("SELECT 1 as c1, 2 as c2;", 
context);
+    assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, 
interpreterResult.code());
+    assertEquals(1, interpreterResult.message().size());
+    assertEquals(InterpreterResult.Type.TABLE, 
interpreterResult.message().get(0).getType());
+    assertEquals("c1\tc2\n1\t2\n", 
interpreterResult.message().get(0).getData());
+
+    // read table_1 from python interpreter
+    Interpreter pythonInterpreter = interpreterFactory.getInterpreter("user1", 
"note1", "python", "test");
+    assertNotNull("PythonInterpreter is null", pythonInterpreter);
+
+    context = new InterpreterContext.Builder()
+            .setNoteId("note1")
+            .setParagraphId("paragraph_1")
+            .setAuthenticationInfo(AuthenticationInfo.ANONYMOUS)
+            .build();
+    interpreterResult = 
pythonInterpreter.interpret("df=z.getAsDataFrame('table_1')\nz.show(df)", 
context);
+    assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, 
interpreterResult.code());
+    assertEquals(1, interpreterResult.message().size());
+    assertEquals(InterpreterResult.Type.TABLE, 
interpreterResult.message().get(0).getType());
+    assertEquals("c1\tc2\n1\t2\n", 
interpreterResult.message().get(0).getData());
   }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
index 9c301b1..b645309 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
@@ -279,8 +279,15 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
       note = TestUtils.getInstance(Notebook.class).createNote("note1", 
anonymous);
       // test basic dataframe api
       Paragraph p = note.addNewParagraph(anonymous);
-      p.setText("%spark val 
df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" +
-              "df.collect()");
+      if (isSpark2()) {
+        p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" +
+                ".toDF(\"name\", \"age\")\n" +
+                "df.collect()");
+      } else {
+        p.setText("%spark val 
df=sqlContext.createDataFrame(Seq((\"hello\",20)))" +
+                ".toDF(\"name\", \"age\")\n" +
+                "df.collect()");
+      }
       note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
       assertTrue(p.getReturn().message().get(0).getData().contains(
@@ -288,12 +295,62 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
 
       // test display DataFrame
       p = note.addNewParagraph(anonymous);
-      p.setText("%spark val 
df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" +
-          "z.show(df)");
+      if (isSpark2()) {
+        p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" +
+                ".toDF(\"name\", \"age\")\n" +
+                "df.createOrReplaceTempView(\"test_table\")\n" +
+                "z.show(df)");
+      } else {
+        p.setText("%spark val 
df=sqlContext.createDataFrame(Seq((\"hello\",20)))" +
+                ".toDF(\"name\", \"age\")\n" +
+                "df.registerTempTable(\"test_table\")\n" +
+                "z.show(df)");
+      }
+      note.run(p.getId(), true);
+      assertEquals(Status.FINISHED, p.getStatus());
+      assertEquals(InterpreterResult.Type.TABLE, 
p.getReturn().message().get(0).getType());
+      assertEquals("name\tage\nhello\t20\n", 
p.getReturn().message().get(0).getData());
+
+      // run sql and save it into resource pool
+      p = note.addNewParagraph(anonymous);
+      p.setText("%spark.sql(saveAs=table_result) select * from test_table");
+      note.run(p.getId(), true);
+      assertEquals(Status.FINISHED, p.getStatus());
+      assertEquals(InterpreterResult.Type.TABLE, 
p.getReturn().message().get(0).getType());
+      assertEquals("name\tage\nhello\t20\n", 
p.getReturn().message().get(0).getData());
+
+      // get resource from spark
+      p = note.addNewParagraph(anonymous);
+      p.setText("%spark val 
df=z.getAsDataFrame(\"table_result\")\nz.show(df)");
+      note.run(p.getId(), true);
+      assertEquals(Status.FINISHED, p.getStatus());
+      assertEquals(InterpreterResult.Type.TABLE, 
p.getReturn().message().get(0).getType());
+      assertEquals("name\tage\nhello\t20\n", 
p.getReturn().message().get(0).getData());
+
+      // get resource from pyspark
+      p = note.addNewParagraph(anonymous);
+      p.setText("%spark.pyspark 
df=z.getAsDataFrame('table_result')\nz.show(df)");
+      note.run(p.getId(), true);
+      assertEquals(Status.FINISHED, p.getStatus());
+      assertEquals(InterpreterResult.Type.TABLE, 
p.getReturn().message().get(0).getType());
+      assertEquals("name\tage\nhello\t20\n", 
p.getReturn().message().get(0).getData());
+
+      // get resource from ipyspark
+      p = note.addNewParagraph(anonymous);
+      p.setText("%spark.ipyspark 
df=z.getAsDataFrame('table_result')\nz.show(df)");
       note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
       assertEquals(InterpreterResult.Type.TABLE, 
p.getReturn().message().get(0).getType());
-      assertEquals("_1\t_2\nhello\t20\n", 
p.getReturn().message().get(0).getData());
+      assertEquals("name\tage\nhello\t20\n", 
p.getReturn().message().get(0).getData());
+
+      // get resource from sparkr
+      p = note.addNewParagraph(anonymous);
+      p.setText("%spark.r df=z.getAsDataFrame('table_result')\ndf");
+      note.run(p.getId(), true);
+      assertEquals(Status.FINISHED, p.getStatus());
+      assertEquals(InterpreterResult.Type.TEXT, 
p.getReturn().message().get(0).getType());
+      assertTrue(p.getReturn().toString(),
+              p.getReturn().message().get(0).getData().contains("name age\n1 
hello  20"));
 
       // test display DataSet
       if (isSpark2()) {
@@ -592,6 +649,13 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
       waitForFinish(p);
       assertEquals(Status.FINISHED, p.getStatus());
       assertEquals(sparkVersion, p.getReturn().message().get(0).getData());
+
+      p.setText("%spark.pyspark sc.version");
+      note.run(p.getId());
+      waitForFinish(p);
+      assertEquals(Status.FINISHED, p.getStatus());
+      assertTrue(p.getReturn().toString(),
+              p.getReturn().message().get(0).getData().contains(sparkVersion));
     } finally {
       if (null != note) {
         TestUtils.getInstance(Notebook.class).removeNote(note.getId(), 
anonymous);
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
index 1b5ead4..aefb647 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
@@ -853,6 +853,25 @@ public abstract class BaseZeppelinContext {
   }
 
   /**
+   * Get object from resource pool
+   * Search local process first and then the other processes
+   *
+   * @param name
+   * @param clazz  The class of the returned value
+   * @return null if resource not found
+   */
+  @ZeppelinApi
+  public <T> T get(String name, Class<T> clazz) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    Resource resource = resourcePool.get(name);
+    if (resource != null) {
+      return resource.get(clazz);
+    } else {
+      return null;
+    }
+  }
+
+  /**
    * Remove object from resourcePool
    *
    * @param name
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index b2fc061..b15d3de 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -69,7 +69,6 @@ import org.apache.zeppelin.resource.DistributedResourcePool;
 import org.apache.zeppelin.resource.Resource;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.resource.ResourceSet;
-import org.apache.zeppelin.resource.WellKnownResourceName;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
@@ -88,6 +87,7 @@ import java.lang.reflect.Method;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
@@ -679,24 +679,35 @@ public class RemoteInterpreterServer extends Thread
         // data from context.out is prepended to InterpreterResult if both 
defined
         context.out.flush();
         List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
-        resultMessages.addAll(result.message());
 
+        for (InterpreterResultMessage resultMessage : result.message()) {
+          // only add non-empty InterpreterResultMessage
+          if (!StringUtils.isBlank(resultMessage.getData())) {
+            resultMessages.add(resultMessage);
+          }
+        }
+
+        List<String> stringResult = new ArrayList<>();
         for (InterpreterResultMessage msg : resultMessages) {
           if (msg.getType() == InterpreterResult.Type.IMG) {
             logger.debug("InterpreterResultMessage: IMAGE_DATA");
           } else {
             logger.debug("InterpreterResultMessage: " + msg.toString());
           }
+          stringResult.add(msg.getData());
         }
         // put result into resource pool
-        if (resultMessages.size() > 0) {
-          int lastMessageIndex = resultMessages.size() - 1;
-          if (resultMessages.get(lastMessageIndex).getType() == 
InterpreterResult.Type.TABLE) {
+        if (context.getLocalProperties().containsKey("saveAs")) {
+          if (stringResult.size() == 1) {
+            logger.info("Saving result into ResourcePool as single string: " +
+                    context.getLocalProperties().get("saveAs"));
+            context.getResourcePool().put(
+                    context.getLocalProperties().get("saveAs"), 
stringResult.get(0));
+          } else {
+            logger.info("Saving result into ResourcePool as string list: " +
+                    context.getLocalProperties().get("saveAs"));
             context.getResourcePool().put(
-                    context.getNoteId(),
-                    context.getParagraphId(),
-                    WellKnownResourceName.ZeppelinTableResult.toString(),
-                    resultMessages.get(lastMessageIndex));
+                    context.getLocalProperties().get("saveAs"), stringResult);
           }
         }
         return new InterpreterResult(result.code(), resultMessages);
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java
index ba31f01..04eefda 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java
@@ -56,6 +56,8 @@ public class DistributedResourcePool extends 
LocalResourcePool {
       if (resources.isEmpty()) {
         return null;
       } else {
+        // TODO(zjffdu) just assume there's no dupicated resources with the 
same name, but
+        // this assumption is false
         return resources.get(0);
       }
     } else {
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
index 32eaeb2..610cd28 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
@@ -17,10 +17,10 @@
 package org.apache.zeppelin.resource;
 
 import com.google.gson.Gson;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
+import com.google.gson.internal.Primitives;
 import org.apache.zeppelin.common.JsonSerializable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,6 +94,10 @@ public class Resource implements JsonSerializable, 
Serializable {
     }
   }
 
+  public <T> T get(Class<T> clazz) {
+    return Primitives.wrap(clazz).cast(r);
+  }
+
   public boolean isSerializable() {
     return serializable;
   }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
index ce06b73..4144d91 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
@@ -26,6 +26,7 @@ import org.apache.zeppelin.common.JsonSerializable;
 public class ResourceId implements JsonSerializable, Serializable {
   private static final Gson gson = new Gson();
 
+  // resourcePoolId is the interpreterGroupId which is unique across one 
Zeppelin instance
   private final String resourcePoolId;
   private final String name;
   private final String noteId;

Reply via email to