Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.7 ff1a35306 -> c3c6c7a45


ZEPPELIN-2615. Upgrade pig to 0.17.0 to support spark engine

### What is this PR for?

Pig 0.17.0 has just been released. This PR is to upgrade pig to 0.17.0 and 
support spark engine which is a big milestone of pig 0.17.0

Main Changes:

* Upgrade pig to 0.17.0
* Remove some code using java reflection in `PigUtils.java`, as pig 0.17.0 has 
some improvement and expose new apis which could be used pig interpreter.
* Support spark engine

### What type of PR is it?
[Improvement | Feature]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://github.com/zjffdu/zeppelin/tree/ZEPPELIN-2615

### How should this be tested?
Unit test is added and also manually test spark yarn-client mode in pig 
tutorial note.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2431 from zjffdu/ZEPPELIN-2615 and squashes the following commits:

d4e9a6d [Jeff Zhang] Address comments
4b4e3db [Jeff Zhang] ZEPPELIN-2615. Upgrade pig to 0.17.0 to support spark 
engine

(cherry picked from commit f04ddfbe71f95ef02938f9508ee84ed01f1992a7)
Signed-off-by: Jeff Zhang <zjf...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c3c6c7a4
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c3c6c7a4
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c3c6c7a4

Branch: refs/heads/branch-0.7
Commit: c3c6c7a450f5d24e1a89ba482ad88ee454af4a61
Parents: ff1a353
Author: Jeff Zhang <zjf...@apache.org>
Authored: Sun Jun 25 10:08:12 2017 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Thu Jun 29 14:20:44 2017 +0800

----------------------------------------------------------------------
 LICENSE                                         |   7 +
 docs/interpreter/pig.md                         |  41 +++-
 pig/pom.xml                                     |  47 +++-
 .../org/apache/zeppelin/pig/PigInterpreter.java |  12 +-
 .../zeppelin/pig/PigQueryInterpreter.java       |  11 +-
 .../java/org/apache/zeppelin/pig/PigUtils.java  | 241 +------------------
 pig/src/main/resources/interpreter-setting.json |  20 +-
 .../zeppelin/pig/PigInterpreterSparkTest.java   | 149 ++++++++++++
 8 files changed, 267 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c3c6c7a4/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index b8b84ef..e36d231 100644
--- a/LICENSE
+++ b/LICENSE
@@ -273,3 +273,10 @@ The following components are provided under the BSD 
2-Clause license.  See file
 
   (BSD 2 Clause) portions of SQLLine (http://sqlline.sourceforge.net/) - 
http://sqlline.sourceforge.net/#license
    jdbc/src/main/java/org/apache/zeppelin/jdbc/SqlCompleter.java
+
+========================================================================
+Jython Software License
+========================================================================
+The following components are provided under the Jython Software License.  See 
file headers and project links for details.
+
+  (Jython Software License) jython-standalone - http://www.jython.org/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c3c6c7a4/docs/interpreter/pig.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md
index d1f18fa..4dd840b 100644
--- a/docs/interpreter/pig.md
+++ b/docs/interpreter/pig.md
@@ -25,31 +25,39 @@ group: manual
     - No pig alias in the last statement in `%pig.query` (read the examples 
below).
     - The last statement must be in single line in `%pig.query`
     
-## Supported runtime mode
-  - Local
-  - MapReduce
-  - Tez_Local (Only Tez 0.7 is supported)
-  - Tez  (Only Tez 0.7 is supported)
 
 ## How to use
 
-### How to setup Pig
+### How to setup Pig execution modes.
 
 - Local Mode
 
-    Nothing needs to be done for local mode
+    Set `zeppelin.pig.execType` as `local`.
 
 - MapReduce Mode
 
-    HADOOP\_CONF\_DIR needs to be specified in 
`ZEPPELIN_HOME/conf/zeppelin-env.sh`.
+    Set `zeppelin.pig.execType` as `mapreduce`. HADOOP\_CONF\_DIR needs to be 
specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`.
 
 - Tez Local Mode
     
-    Nothing needs to be done for tez local mode
+    Only Tez 0.7 is supported. Set `zeppelin.pig.execType` as `tez_local`.
     
 - Tez Mode
 
-    HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in 
`ZEPPELIN_HOME/conf/zeppelin-env.sh`.
+    Only Tez 0.7 is supported. Set `zeppelin.pig.execType` as `tez`. 
HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in 
`ZEPPELIN_HOME/conf/zeppelin-env.sh`.
+
+- Spark Local Mode
+    
+    Only Spark 1.6.x is supported, by default it is Spark 1.6.3. Set 
`zeppelin.pig.execType` as `spark_local`.
+    
+- Spark Mode
+    
+    Only Spark 1.6.x is supported, by default it is Spark 1.6.3. Set 
`zeppelin.pig.execType` as `spark`. For now, only yarn-client mode is 
supported. To enable it, you need to set property `SPARK_MASTER` to yarn-client 
and set `SPARK_JAR` to the spark assembly jar.
+        
+### How to choose custom Spark Version
+
+By default, Pig Interpreter would use Spark 1.6.3 built with scala 2.10, if 
you want to use another spark version or scala version, 
+you need to rebuild Zeppelin by specifying the custom Spark version via 
-Dpig.spark.version=<custom_spark_version> and scala version via 
-Dpig.scala.version=<scala_version> in the maven build command.
 
 ### How to configure interpreter
 
@@ -66,7 +74,7 @@ Besides, we use paragraph title as job name if it exists, 
else use the last line
     <tr>
         <td>zeppelin.pig.execType</td>
         <td>mapreduce</td>
-        <td>Execution mode for pig runtime. local | mapreduce | tez_local | 
tez </td>
+        <td>Execution mode for pig runtime. local | mapreduce | tez_local | 
tez | spark_local | spark </td>
     </tr>
     <tr>
         <td>zeppelin.pig.includeJobStats</td>
@@ -88,6 +96,17 @@ Besides, we use paragraph title as job name if it exists, 
else use the last line
         <td>default</td>
         <td>queue name for mapreduce engine</td>
     </tr>
+    <tr>
+        <td>SPARK_MASTER</td>
+        <td>local</td>
+        <td>local | yarn-client</td>
+    </tr>
+    <tr>
+        <td>SPARK_JAR</td>
+        <td></td>
+        <td>The spark assembly jar, both jar in local or hdfs is supported. 
Put it on hdfs could have
+        performance benefit</td>
+    </tr>
 </table>  
 
 ### Example

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c3c6c7a4/pig/pom.xml
----------------------------------------------------------------------
diff --git a/pig/pom.xml b/pig/pom.xml
index 266551b..6528a82 100644
--- a/pig/pom.xml
+++ b/pig/pom.xml
@@ -36,9 +36,11 @@
     <url>http://zeppelin.apache.org</url>
 
     <properties>
-        <pig.version>0.16.0</pig.version>
+        <pig.version>0.17.0</pig.version>
         <hadoop.version>2.6.0</hadoop.version>
         <tez.version>0.7.0</tez.version>
+        <pig.spark.version>1.6.3</pig.spark.version>
+        <pig.scala.version>2.10</pig.scala.version>
     </properties>
 
     <dependencies>
@@ -62,11 +64,29 @@
         <dependency>
             <groupId>org.apache.pig</groupId>
             <artifactId>pig</artifactId>
-            <classifier>h2</classifier>
             <version>${pig.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>servlet-api-2.5</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
+            <groupId>org.python</groupId>
+            <artifactId>jython-standalone</artifactId>
+            <version>2.7.0</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
             <version>${hadoop.version}</version>
@@ -76,6 +96,12 @@
             <groupId>org.apache.tez</groupId>
             <artifactId>tez-api</artifactId>
             <version>${tez.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -88,6 +114,12 @@
             <groupId>org.apache.tez</groupId>
             <artifactId>tez-dag</artifactId>
             <version>${tez.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -115,6 +147,17 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${pig.scala.version}</artifactId>
+            <version>${pig.spark.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-yarn_${pig.scala.version}</artifactId>
+            <version>${pig.spark.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c3c6c7a4/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java 
b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
index 973397c..8937416 100644
--- a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.pig.PigServer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.tools.pigscript.parser.ParseException;
 import org.apache.pig.tools.pigstats.*;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -98,6 +99,10 @@ public class PigInterpreter extends BasePigInterpreter {
       listenerMap.put(contextInterpreter.getParagraphId(), scriptListener);
       pigServer.registerScript(tmpFile.getAbsolutePath());
     } catch (IOException e) {
+      // 1. catch FrontendException, FrontendException happens in the query 
compilation phase.
+      // 2. catch ParseException for syntax error
+      // 3. PigStats, This is execution error
+      // 4. Other errors.
       if (e instanceof FrontendException) {
         FrontendException fe = (FrontendException) e;
         if (!fe.getMessage().contains("Backend error :")) {
@@ -107,9 +112,12 @@ public class PigInterpreter extends BasePigInterpreter {
           return new InterpreterResult(Code.ERROR, 
ExceptionUtils.getStackTrace(e));
         }
       }
+      if (e.getCause() instanceof ParseException) {
+        return new InterpreterResult(Code.ERROR, e.getCause().getMessage());
+      }
       PigStats stats = PigStats.get();
       if (stats != null) {
-        String errorMsg = PigUtils.extactJobStats(stats);
+        String errorMsg = stats.getDisplayString();
         if (errorMsg != null) {
           LOGGER.error("Fail to run pig script, " + errorMsg);
           return new InterpreterResult(Code.ERROR, errorMsg);
@@ -127,7 +135,7 @@ public class PigInterpreter extends BasePigInterpreter {
     StringBuilder outputBuilder = new StringBuilder();
     PigStats stats = PigStats.get();
     if (stats != null && includeJobStats) {
-      String jobStats = PigUtils.extactJobStats(stats);
+      String jobStats = stats.getDisplayString();
       if (jobStats != null) {
         outputBuilder.append(jobStats);
       }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c3c6c7a4/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java 
b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
index 566b536..45e190d 100644
--- a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
@@ -25,6 +25,7 @@ import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.tools.pigscript.parser.ParseException;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.zeppelin.interpreter.*;
@@ -123,8 +124,9 @@ public class PigQueryInterpreter extends BasePigInterpreter 
{
     } catch (IOException e) {
       // Extract error in the following order
       // 1. catch FrontendException, FrontendException happens in the query 
compilation phase.
-      // 2. PigStats, This is execution error
-      // 3. Other errors.
+      // 2. catch ParseException for syntax error
+      // 3. PigStats, This is execution error
+      // 4. Other errors.
       if (e instanceof FrontendException) {
         FrontendException fe = (FrontendException) e;
         if (!fe.getMessage().contains("Backend error :")) {
@@ -132,9 +134,12 @@ public class PigQueryInterpreter extends 
BasePigInterpreter {
           return new InterpreterResult(Code.ERROR, 
ExceptionUtils.getStackTrace(e));
         }
       }
+      if (e.getCause() instanceof ParseException) {
+        return new InterpreterResult(Code.ERROR, e.getMessage());
+      }
       PigStats stats = PigStats.get();
       if (stats != null) {
-        String errorMsg = PigUtils.extactJobStats(stats);
+        String errorMsg = stats.getDisplayString();
         if (errorMsg != null) {
           return new InterpreterResult(Code.ERROR, errorMsg);
         }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c3c6c7a4/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java 
b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
index 43687a5..8fc69ed 100644
--- a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.pig.PigRunner;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
@@ -29,6 +30,9 @@ import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
+import org.apache.pig.tools.pigstats.spark.SparkJobStats;
+import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkScriptState;
 import org.apache.pig.tools.pigstats.tez.TezDAGStats;
 import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.slf4j.Logger;
@@ -39,10 +43,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  *
@@ -66,236 +67,4 @@ public class PigUtils {
     return createTempPigScript(StringUtils.join(lines, "\n"));
   }
 
-  public static String extactJobStats(PigStats stats) {
-    if (stats instanceof SimplePigStats) {
-      return extractFromSimplePigStats((SimplePigStats) stats);
-    } else if (stats instanceof TezPigScriptStats) {
-      return extractFromTezPigStats((TezPigScriptStats) stats);
-    } else {
-      throw new RuntimeException("Unrecognized stats type:" + 
stats.getClass().getSimpleName());
-    }
-  }
-
-  public static String extractFromSimplePigStats(SimplePigStats stats) {
-
-    try {
-      Field userIdField = PigStats.class.getDeclaredField("userId");
-      userIdField.setAccessible(true);
-      String userId = (String) (userIdField.get(stats));
-      Field startTimeField = PigStats.class.getDeclaredField("startTime");
-      startTimeField.setAccessible(true);
-      long startTime = (Long) (startTimeField.get(stats));
-      Field endTimeField = PigStats.class.getDeclaredField("endTime");
-      endTimeField.setAccessible(true);
-      long endTime = (Long) (endTimeField.get(stats));
-
-      if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) {
-        LOGGER.warn("unknown return code, can't display the results");
-        return null;
-      }
-      if (stats.getPigContext() == null) {
-        LOGGER.warn("unknown exec type, don't display the results");
-        return null;
-      }
-
-      SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
-      StringBuilder sb = new StringBuilder();
-      
sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
-      
sb.append(stats.getHadoopVersion()).append("\t").append(stats.getPigVersion()).append("\t")
-              .append(userId).append("\t")
-              .append(sdf.format(new Date(startTime))).append("\t")
-              .append(sdf.format(new Date(endTime))).append("\t")
-              .append(stats.getFeatures()).append("\n");
-      sb.append("\n");
-      if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
-        sb.append("Success!\n");
-      } else if (stats.getReturnCode() == 
PigRunner.ReturnCode.PARTIAL_FAILURE) {
-        sb.append("Some jobs have failed! Stop running all dependent jobs\n");
-      } else {
-        sb.append("Failed!\n");
-      }
-      sb.append("\n");
-
-      Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
-      jobPlanField.setAccessible(true);
-      PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stats);
-
-      if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS
-              || stats.getReturnCode() == 
PigRunner.ReturnCode.PARTIAL_FAILURE) {
-        sb.append("Job Stats (time in seconds):\n");
-        sb.append(MRJobStats.SUCCESS_HEADER).append("\n");
-        List<JobStats> arr = jobPlan.getSuccessfulJobs();
-        for (JobStats js : arr) {
-          sb.append(js.getDisplayString());
-        }
-        sb.append("\n");
-      }
-      if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
-              || stats.getReturnCode() == 
PigRunner.ReturnCode.PARTIAL_FAILURE) {
-        sb.append("Failed Jobs:\n");
-        sb.append(MRJobStats.FAILURE_HEADER).append("\n");
-        List<JobStats> arr = jobPlan.getFailedJobs();
-        for (JobStats js : arr) {
-          sb.append(js.getDisplayString());
-        }
-        sb.append("\n");
-      }
-      sb.append("Input(s):\n");
-      for (InputStats is : stats.getInputStats()) {
-        sb.append(is.getDisplayString());
-      }
-      sb.append("\n");
-      sb.append("Output(s):\n");
-      for (OutputStats ds : stats.getOutputStats()) {
-        sb.append(ds.getDisplayString());
-      }
-
-      sb.append("\nCounters:\n");
-      sb.append("Total records written : " + 
stats.getRecordWritten()).append("\n");
-      sb.append("Total bytes written : " + 
stats.getBytesWritten()).append("\n");
-      sb.append("Spillable Memory Manager spill count : "
-              + stats.getSMMSpillCount()).append("\n");
-      sb.append("Total bags proactively spilled: "
-              + stats.getProactiveSpillCountObjects()).append("\n");
-      sb.append("Total records proactively spilled: "
-              + stats.getProactiveSpillCountRecords()).append("\n");
-      sb.append("\nJob DAG:\n").append(jobPlan.toString());
-
-      return "Script Statistics: \n" + sb.toString();
-    } catch (Exception e) {
-      LOGGER.error("Can not extract message from SimplePigStats", e);
-      return "Can not extract message from SimpelPigStats," + 
ExceptionUtils.getStackTrace(e);
-    }
-  }
-
-  private static String extractFromTezPigStats(TezPigScriptStats stats) {
-
-    try {
-      if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) {
-        LOGGER.warn("unknown return code, can't display the results");
-        return null;
-      }
-      if (stats.getPigContext() == null) {
-        LOGGER.warn("unknown exec type, don't display the results");
-        return null;
-      }
-
-      Field userIdField = PigStats.class.getDeclaredField("userId");
-      userIdField.setAccessible(true);
-      String userId = (String) (userIdField.get(stats));
-      Field startTimeField = PigStats.class.getDeclaredField("startTime");
-      startTimeField.setAccessible(true);
-      long startTime = (Long) (startTimeField.get(stats));
-      Field endTimeField = PigStats.class.getDeclaredField("endTime");
-      endTimeField.setAccessible(true);
-      long endTime = (Long) (endTimeField.get(stats));
-
-      SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
-      StringBuilder sb = new StringBuilder();
-      sb.append("\n");
-      sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", 
stats.getHadoopVersion()));
-      sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", 
stats.getPigVersion()));
-      sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", 
TezExecType.getTezVersion()));
-      sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId));
-      sb.append(String.format("%1$20s: %2$-100s%n", "FileName", 
stats.getFileName()));
-      sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", 
sdf.format(new Date(startTime))));
-      sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", 
sdf.format(new Date(endTime))));
-      sb.append(String.format("%1$20s: %2$-100s%n", "Features", 
stats.getFeatures()));
-      sb.append("\n");
-      if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
-        sb.append("Success!\n");
-      } else if (stats.getReturnCode() == 
PigRunner.ReturnCode.PARTIAL_FAILURE) {
-        sb.append("Some tasks have failed! Stop running all dependent 
tasks\n");
-      } else {
-        sb.append("Failed!\n");
-      }
-      sb.append("\n");
-
-      // Print diagnostic info in case of failure
-      if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
-              || stats.getReturnCode() == 
PigRunner.ReturnCode.PARTIAL_FAILURE) {
-        if (stats.getErrorMessage() != null) {
-          String[] lines = stats.getErrorMessage().split("\n");
-          for (int i = 0; i < lines.length; i++) {
-            String s = lines[i].trim();
-            if (i == 0 || !org.apache.commons.lang.StringUtils.isEmpty(s)) {
-              sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? 
"ErrorMessage" : "", s));
-            }
-          }
-          sb.append("\n");
-        }
-      }
-
-      Field tezDAGStatsMapField = 
TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
-      tezDAGStatsMapField.setAccessible(true);
-      Map<String, TezDAGStats> tezDAGStatsMap =
-              (Map<String, TezDAGStats>) tezDAGStatsMapField.get(stats);
-      int count = 0;
-      for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
-        sb.append("\n");
-        sb.append("DAG " + count++ + ":\n");
-        sb.append(dagStats.getDisplayString());
-        sb.append("\n");
-      }
-
-      sb.append("Input(s):\n");
-      for (InputStats is : stats.getInputStats()) {
-        sb.append(is.getDisplayString().trim()).append("\n");
-      }
-      sb.append("\n");
-      sb.append("Output(s):\n");
-      for (OutputStats os : stats.getOutputStats()) {
-        sb.append(os.getDisplayString().trim()).append("\n");
-      }
-      return "Script Statistics:\n" + sb.toString();
-    } catch (Exception e) {
-      LOGGER.error("Can not extract message from SimplePigStats", e);
-      return "Can not extract message from SimpelPigStats," + 
ExceptionUtils.getStackTrace(e);
-    }
-  }
-
-  public static List<String> extractJobIds(PigStats stat) {
-    if (stat instanceof SimplePigStats) {
-      return extractJobIdsFromSimplePigStats((SimplePigStats) stat);
-    } else if (stat instanceof TezPigScriptStats) {
-      return extractJobIdsFromTezPigStats((TezPigScriptStats) stat);
-    } else {
-      throw new RuntimeException("Unrecognized stats type:" + 
stat.getClass().getSimpleName());
-    }
-  }
-
-  public static List<String> extractJobIdsFromSimplePigStats(SimplePigStats 
stat) {
-    List<String> jobIds = new ArrayList<>();
-    try {
-      Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
-      jobPlanField.setAccessible(true);
-      PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stat);
-      List<JobStats> arr = jobPlan.getJobList();
-      for (JobStats js : arr) {
-        jobIds.add(js.getJobId());
-      }
-      return jobIds;
-    } catch (Exception e) {
-      LOGGER.error("Can not extract jobIds from SimpelPigStats", e);
-      throw new RuntimeException("Can not extract jobIds from SimpelPigStats", 
e);
-    }
-  }
-
-  public static List<String> extractJobIdsFromTezPigStats(TezPigScriptStats 
stat) {
-    List<String> jobIds = new ArrayList<>();
-    try {
-      Field tezDAGStatsMapField = 
TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
-      tezDAGStatsMapField.setAccessible(true);
-      Map<String, TezDAGStats> tezDAGStatsMap =
-              (Map<String, TezDAGStats>) tezDAGStatsMapField.get(stat);
-      for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
-        LOGGER.debug("Tez JobId:" + dagStats.getJobId());
-        jobIds.add(dagStats.getJobId());
-      }
-      return jobIds;
-    } catch (Exception e) {
-      LOGGER.error("Can not extract jobIds from TezPigScriptStats", e);
-      throw new RuntimeException("Can not extract jobIds from 
TezPigScriptStats", e);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c3c6c7a4/pig/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/pig/src/main/resources/interpreter-setting.json 
b/pig/src/main/resources/interpreter-setting.json
index 583a126..c1eb69a 100644
--- a/pig/src/main/resources/interpreter-setting.json
+++ b/pig/src/main/resources/interpreter-setting.json
@@ -8,13 +8,25 @@
         "envName": null,
         "propertyName": "zeppelin.pig.execType",
         "defaultValue": "mapreduce",
-        "description": "local | mapreduce | tez"
+        "description": "local | mapreduce | tez_local | tez | spark_local | 
spark"
       },
       "zeppelin.pig.includeJobStats": {
         "envName": null,
         "propertyName": "zeppelin.pig.includeJobStats",
         "defaultValue": "false",
         "description": "flag to include job stats in output"
+      },
+      "SPARK_MASTER": {
+        "envName": "SPARK_MASTER",
+        "propertyName": "SPARK_MASTER",
+        "defaultValue": "local",
+        "description": "local | yarn-client"
+      },
+      "SPARK_JAR": {
+        "envName": "SPARK_JAR",
+        "propertyName": "SPARK_JAR",
+        "defaultValue": "",
+        "description": "spark assembly jar uploaded in hdfs"
       }
     },
     "editor": {
@@ -27,12 +39,6 @@
     "name": "query",
     "className": "org.apache.zeppelin.pig.PigQueryInterpreter",
     "properties": {
-      "zeppelin.pig.execType": {
-        "envName": null,
-        "propertyName": "zeppelin.pig.execType",
-        "defaultValue": "mapreduce",
-        "description": "local | mapreduce | tez"
-      },
       "zeppelin.pig.maxResult": {
         "envName": null,
         "propertyName": "zeppelin.pig.maxResult",

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c3c6c7a4/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java
----------------------------------------------------------------------
diff --git 
a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java 
b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java
new file mode 100644
index 0000000..e821bfe
--- /dev/null
+++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.pig;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class PigInterpreterSparkTest {
+  private PigInterpreter pigInterpreter;
+  private InterpreterContext context;
+
+  public void setUpSpark(boolean includeJobStats) {
+    Properties properties = new Properties();
+    properties.put("zeppelin.pig.execType", "spark_local");
+    properties.put("zeppelin.pig.includeJobStats", includeJobStats + "");
+    pigInterpreter = new PigInterpreter(properties);
+    pigInterpreter.open();
+    context = new InterpreterContext(null, "paragraph_id", null, null, null, 
null, null, null, null, null,
+        null, null);
+
+  }
+  @After
+  public void tearDown() {
+    pigInterpreter.close();
+  }
+
+  @Test
+  public void testBasics() throws IOException {
+    setUpSpark(false);
+
+    String content = "1\tandy\n"
+        + "2\tpeter\n";
+    File tmpFile = File.createTempFile("zeppelin", "test");
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+
+    // simple pig script using dump
+    String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+        + "dump a;";
+    InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
+
+    // describe
+    pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, 
name: bytearray);"
+        + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertTrue(result.message().get(0).getData().contains("a: {id: int,name: 
bytearray}"));
+
+    // syntax error (compilation error)
+    pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+        + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(result.message().get(0).getData().contains("Syntax error, 
unexpected symbol at or near 'a'"));
+
+    // syntax error
+    pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+        + "foreach a generate $0;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(result.message().get(0).getData().contains("expecting one of"));
+  }
+
+  @Test
+  public void testIncludeJobStats() throws IOException {
+    setUpSpark(true);
+
+    String content = "1\tandy\n"
+        + "2\tpeter\n";
+    File tmpFile = File.createTempFile("zeppelin", "test");
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+
+    // simple pig script using dump
+    String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+        + "dump a;";
+    InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertTrue(result.message().get(0).getData().contains("Spark Job"));
+    
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
+
+    // describe
+    pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, 
name: bytearray);"
+        + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    // no job is launched, so no jobStats
+    assertTrue(result.message().get(0).getData().contains("a: {id: int,name: 
bytearray}"));
+
+    // syntax error (compilation error)
+    pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+        + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    // no job is launched, so no jobStats
+    assertTrue(result.message().get(0).getData().contains("Syntax error, 
unexpected symbol at or near 'a'"));
+
+    // execution error
+    pigscript = "a = load 'invalid_path';"
+        + "dump a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(result.message().get(0).getData().contains("Failed to read data 
from"));
+  }
+
+}
+
+

Reply via email to