Repository: zeppelin Updated Branches: refs/heads/master 19eb80a24 -> f04ddfbe7
ZEPPELIN-2615. Upgrade pig to 0.17.0 to support spark engine ### What is this PR for? Pig 0.17.0 has just been released. This PR is to upgrade pig to 0.17.0 and support spark engine which is a big milestone of pig 0.17.0 Main Changes: * Upgrade pig to 0.17.0 * Remove some code using java reflection in `PigUtils.java`, as pig 0.17.0 has some improvement and expose new apis which could be used pig interpreter. * Support spark engine ### What type of PR is it? [Improvement | Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://github.com/zjffdu/zeppelin/tree/ZEPPELIN-2615 ### How should this be tested? Unit test is added and also manually test spark yarn-client mode in pig tutorial note. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2431 from zjffdu/ZEPPELIN-2615 and squashes the following commits: d4e9a6d [Jeff Zhang] Address comments 4b4e3db [Jeff Zhang] ZEPPELIN-2615. Upgrade pig to 0.17.0 to support spark engine Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/f04ddfbe Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/f04ddfbe Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/f04ddfbe Branch: refs/heads/master Commit: f04ddfbe71f95ef02938f9508ee84ed01f1992a7 Parents: 19eb80a Author: Jeff Zhang <zjf...@apache.org> Authored: Sun Jun 25 10:08:12 2017 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Thu Jun 29 14:20:23 2017 +0800 ---------------------------------------------------------------------- LICENSE | 7 + docs/interpreter/pig.md | 41 +++- pig/pom.xml | 47 +++- .../org/apache/zeppelin/pig/PigInterpreter.java | 12 +- .../zeppelin/pig/PigQueryInterpreter.java | 11 +- .../java/org/apache/zeppelin/pig/PigUtils.java | 241 +------------------ pig/src/main/resources/interpreter-setting.json | 20 +- .../zeppelin/pig/PigInterpreterSparkTest.java | 149 ++++++++++++ 8 files changed, 267 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f04ddfbe/LICENSE ---------------------------------------------------------------------- diff --git a/LICENSE b/LICENSE index e206a6c..a6c02de 100644 --- a/LICENSE +++ b/LICENSE @@ -276,3 +276,10 @@ The following components are provided under the BSD 2-Clause license. See file (BSD 2 Clause) portions of SQLLine (http://sqlline.sourceforge.net/) - http://sqlline.sourceforge.net/#license jdbc/src/main/java/org/apache/zeppelin/jdbc/SqlCompleter.java + +======================================================================== +Jython Software License +======================================================================== +The following components are provided under the Jython Software License. See file headers and project links for details. + + (Jython Software License) jython-standalone - http://www.jython.org/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f04ddfbe/docs/interpreter/pig.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md index 2e633a2..128ea76 100644 --- a/docs/interpreter/pig.md +++ b/docs/interpreter/pig.md @@ -29,31 +29,39 @@ which in turns enables them to handle very large data sets. - No pig alias in the last statement in `%pig.query` (read the examples below). - The last statement must be in single line in `%pig.query` -## Supported runtime mode - - Local - - MapReduce - - Tez_Local (Only Tez 0.7 is supported) - - Tez (Only Tez 0.7 is supported) ## How to use -### How to setup Pig +### How to setup Pig execution modes. - Local Mode - Nothing needs to be done for local mode + Set `zeppelin.pig.execType` as `local`. - MapReduce Mode - HADOOP\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`. + Set `zeppelin.pig.execType` as `mapreduce`. HADOOP\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`. - Tez Local Mode - Nothing needs to be done for tez local mode + Only Tez 0.7 is supported. Set `zeppelin.pig.execType` as `tez_local`. - Tez Mode - HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`. + Only Tez 0.7 is supported. Set `zeppelin.pig.execType` as `tez`. HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`. + +- Spark Local Mode + + Only Spark 1.6.x is supported, by default it is Spark 1.6.3. Set `zeppelin.pig.execType` as `spark_local`. + +- Spark Mode + + Only Spark 1.6.x is supported, by default it is Spark 1.6.3. Set `zeppelin.pig.execType` as `spark`. For now, only yarn-client mode is supported. To enable it, you need to set property `SPARK_MASTER` to yarn-client and set `SPARK_JAR` to the spark assembly jar. + +### How to choose custom Spark Version + +By default, Pig Interpreter would use Spark 1.6.3 built with scala 2.10, if you want to use another spark version or scala version, +you need to rebuild Zeppelin by specifying the custom Spark version via -Dpig.spark.version=<custom_spark_version> and scala version via -Dpig.scala.version=<scala_version> in the maven build command. ### How to configure interpreter @@ -71,7 +79,7 @@ So you can use that to find app running in YARN RM UI. <tr> <td>zeppelin.pig.execType</td> <td>mapreduce</td> - <td>Execution mode for pig runtime. local | mapreduce | tez_local | tez </td> + <td>Execution mode for pig runtime. local | mapreduce | tez_local | tez | spark_local | spark </td> </tr> <tr> <td>zeppelin.pig.includeJobStats</td> @@ -93,6 +101,17 @@ So you can use that to find app running in YARN RM UI. <td>default</td> <td>queue name for mapreduce engine</td> </tr> + <tr> + <td>SPARK_MASTER</td> + <td>local</td> + <td>local | yarn-client</td> + </tr> + <tr> + <td>SPARK_JAR</td> + <td></td> + <td>The spark assembly jar, both jar in local or hdfs is supported. Put it on hdfs could have + performance benefit</td> + </tr> </table> ### Example http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f04ddfbe/pig/pom.xml ---------------------------------------------------------------------- diff --git a/pig/pom.xml b/pig/pom.xml index e58a62a..f76a3f9 100644 --- a/pig/pom.xml +++ b/pig/pom.xml @@ -36,9 +36,11 @@ <url>http://zeppelin.apache.org</url> <properties> - <pig.version>0.16.0</pig.version> + <pig.version>0.17.0</pig.version> <hadoop.version>2.6.0</hadoop.version> <tez.version>0.7.0</tez.version> + <pig.spark.version>1.6.3</pig.spark.version> + <pig.scala.version>2.10</pig.scala.version> </properties> <dependencies> @@ -68,11 +70,29 @@ <dependency> <groupId>org.apache.pig</groupId> <artifactId>pig</artifactId> - <classifier>h2</classifier> <version>${pig.version}</version> + <exclusions> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api-2.5</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> + <groupId>org.python</groupId> + <artifactId>jython-standalone</artifactId> + <version>2.7.0</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> @@ -82,6 +102,12 @@ <groupId>org.apache.tez</groupId> <artifactId>tez-api</artifactId> <version>${tez.version}</version> + <exclusions> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> @@ -94,6 +120,12 @@ <groupId>org.apache.tez</groupId> <artifactId>tez-dag</artifactId> <version>${tez.version}</version> + <exclusions> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> @@ -121,6 +153,17 @@ </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${pig.scala.version}</artifactId> + <version>${pig.spark.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-yarn_${pig.scala.version}</artifactId> + <version>${pig.spark.version}</version> + </dependency> + + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f04ddfbe/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java ---------------------------------------------------------------------- diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java index 973397c..8937416 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java @@ -22,6 +22,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.pig.PigServer; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.tools.pigscript.parser.ParseException; import org.apache.pig.tools.pigstats.*; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; @@ -98,6 +99,10 @@ public class PigInterpreter extends BasePigInterpreter { listenerMap.put(contextInterpreter.getParagraphId(), scriptListener); pigServer.registerScript(tmpFile.getAbsolutePath()); } catch (IOException e) { + // 1. catch FrontendException, FrontendException happens in the query compilation phase. + // 2. catch ParseException for syntax error + // 3. PigStats, This is execution error + // 4. Other errors. if (e instanceof FrontendException) { FrontendException fe = (FrontendException) e; if (!fe.getMessage().contains("Backend error :")) { @@ -107,9 +112,12 @@ public class PigInterpreter extends BasePigInterpreter { return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); } } + if (e.getCause() instanceof ParseException) { + return new InterpreterResult(Code.ERROR, e.getCause().getMessage()); + } PigStats stats = PigStats.get(); if (stats != null) { - String errorMsg = PigUtils.extactJobStats(stats); + String errorMsg = stats.getDisplayString(); if (errorMsg != null) { LOGGER.error("Fail to run pig script, " + errorMsg); return new InterpreterResult(Code.ERROR, errorMsg); @@ -127,7 +135,7 @@ public class PigInterpreter extends BasePigInterpreter { StringBuilder outputBuilder = new StringBuilder(); PigStats stats = PigStats.get(); if (stats != null && includeJobStats) { - String jobStats = PigUtils.extactJobStats(stats); + String jobStats = stats.getDisplayString(); if (jobStats != null) { outputBuilder.append(jobStats); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f04ddfbe/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java ---------------------------------------------------------------------- diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java index 385ff45..d3bc432 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java @@ -25,6 +25,7 @@ import org.apache.pig.PigServer; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.tools.pigscript.parser.ParseException; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.ScriptState; import org.apache.zeppelin.interpreter.*; @@ -125,8 +126,9 @@ public class PigQueryInterpreter extends BasePigInterpreter { } catch (IOException e) { // Extract error in the following order // 1. catch FrontendException, FrontendException happens in the query compilation phase. - // 2. PigStats, This is execution error - // 3. Other errors. + // 2. catch ParseException for syntax error + // 3. PigStats, This is execution error + // 4. Other errors. if (e instanceof FrontendException) { FrontendException fe = (FrontendException) e; if (!fe.getMessage().contains("Backend error :")) { @@ -134,9 +136,12 @@ public class PigQueryInterpreter extends BasePigInterpreter { return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); } } + if (e.getCause() instanceof ParseException) { + return new InterpreterResult(Code.ERROR, e.getMessage()); + } PigStats stats = PigStats.get(); if (stats != null) { - String errorMsg = PigUtils.extactJobStats(stats); + String errorMsg = stats.getDisplayString(); if (errorMsg != null) { return new InterpreterResult(Code.ERROR, errorMsg); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f04ddfbe/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java ---------------------------------------------------------------------- diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java index 43687a5..8fc69ed 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java @@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.pig.PigRunner; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType; import org.apache.pig.tools.pigstats.InputStats; import org.apache.pig.tools.pigstats.JobStats; @@ -29,6 +30,9 @@ import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats; +import org.apache.pig.tools.pigstats.spark.SparkJobStats; +import org.apache.pig.tools.pigstats.spark.SparkPigStats; +import org.apache.pig.tools.pigstats.spark.SparkScriptState; import org.apache.pig.tools.pigstats.tez.TezDAGStats; import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; import org.slf4j.Logger; @@ -39,10 +43,7 @@ import java.io.FileWriter; import java.io.IOException; import java.lang.reflect.Field; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; /** * @@ -66,236 +67,4 @@ public class PigUtils { return createTempPigScript(StringUtils.join(lines, "\n")); } - public static String extactJobStats(PigStats stats) { - if (stats instanceof SimplePigStats) { - return extractFromSimplePigStats((SimplePigStats) stats); - } else if (stats instanceof TezPigScriptStats) { - return extractFromTezPigStats((TezPigScriptStats) stats); - } else { - throw new RuntimeException("Unrecognized stats type:" + stats.getClass().getSimpleName()); - } - } - - public static String extractFromSimplePigStats(SimplePigStats stats) { - - try { - Field userIdField = PigStats.class.getDeclaredField("userId"); - userIdField.setAccessible(true); - String userId = (String) (userIdField.get(stats)); - Field startTimeField = PigStats.class.getDeclaredField("startTime"); - startTimeField.setAccessible(true); - long startTime = (Long) (startTimeField.get(stats)); - Field endTimeField = PigStats.class.getDeclaredField("endTime"); - endTimeField.setAccessible(true); - long endTime = (Long) (endTimeField.get(stats)); - - if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) { - LOGGER.warn("unknown return code, can't display the results"); - return null; - } - if (stats.getPigContext() == null) { - LOGGER.warn("unknown exec type, don't display the results"); - return null; - } - - SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); - StringBuilder sb = new StringBuilder(); - sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n"); - sb.append(stats.getHadoopVersion()).append("\t").append(stats.getPigVersion()).append("\t") - .append(userId).append("\t") - .append(sdf.format(new Date(startTime))).append("\t") - .append(sdf.format(new Date(endTime))).append("\t") - .append(stats.getFeatures()).append("\n"); - sb.append("\n"); - if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) { - sb.append("Success!\n"); - } else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { - sb.append("Some jobs have failed! Stop running all dependent jobs\n"); - } else { - sb.append("Failed!\n"); - } - sb.append("\n"); - - Field jobPlanField = PigStats.class.getDeclaredField("jobPlan"); - jobPlanField.setAccessible(true); - PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stats); - - if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS - || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { - sb.append("Job Stats (time in seconds):\n"); - sb.append(MRJobStats.SUCCESS_HEADER).append("\n"); - List<JobStats> arr = jobPlan.getSuccessfulJobs(); - for (JobStats js : arr) { - sb.append(js.getDisplayString()); - } - sb.append("\n"); - } - if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE - || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { - sb.append("Failed Jobs:\n"); - sb.append(MRJobStats.FAILURE_HEADER).append("\n"); - List<JobStats> arr = jobPlan.getFailedJobs(); - for (JobStats js : arr) { - sb.append(js.getDisplayString()); - } - sb.append("\n"); - } - sb.append("Input(s):\n"); - for (InputStats is : stats.getInputStats()) { - sb.append(is.getDisplayString()); - } - sb.append("\n"); - sb.append("Output(s):\n"); - for (OutputStats ds : stats.getOutputStats()) { - sb.append(ds.getDisplayString()); - } - - sb.append("\nCounters:\n"); - sb.append("Total records written : " + stats.getRecordWritten()).append("\n"); - sb.append("Total bytes written : " + stats.getBytesWritten()).append("\n"); - sb.append("Spillable Memory Manager spill count : " - + stats.getSMMSpillCount()).append("\n"); - sb.append("Total bags proactively spilled: " - + stats.getProactiveSpillCountObjects()).append("\n"); - sb.append("Total records proactively spilled: " - + stats.getProactiveSpillCountRecords()).append("\n"); - sb.append("\nJob DAG:\n").append(jobPlan.toString()); - - return "Script Statistics: \n" + sb.toString(); - } catch (Exception e) { - LOGGER.error("Can not extract message from SimplePigStats", e); - return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e); - } - } - - private static String extractFromTezPigStats(TezPigScriptStats stats) { - - try { - if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) { - LOGGER.warn("unknown return code, can't display the results"); - return null; - } - if (stats.getPigContext() == null) { - LOGGER.warn("unknown exec type, don't display the results"); - return null; - } - - Field userIdField = PigStats.class.getDeclaredField("userId"); - userIdField.setAccessible(true); - String userId = (String) (userIdField.get(stats)); - Field startTimeField = PigStats.class.getDeclaredField("startTime"); - startTimeField.setAccessible(true); - long startTime = (Long) (startTimeField.get(stats)); - Field endTimeField = PigStats.class.getDeclaredField("endTime"); - endTimeField.setAccessible(true); - long endTime = (Long) (endTimeField.get(stats)); - - SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); - StringBuilder sb = new StringBuilder(); - sb.append("\n"); - sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", stats.getHadoopVersion())); - sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", stats.getPigVersion())); - sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion())); - sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId)); - sb.append(String.format("%1$20s: %2$-100s%n", "FileName", stats.getFileName())); - sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime)))); - sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime)))); - sb.append(String.format("%1$20s: %2$-100s%n", "Features", stats.getFeatures())); - sb.append("\n"); - if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) { - sb.append("Success!\n"); - } else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { - sb.append("Some tasks have failed! Stop running all dependent tasks\n"); - } else { - sb.append("Failed!\n"); - } - sb.append("\n"); - - // Print diagnostic info in case of failure - if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE - || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { - if (stats.getErrorMessage() != null) { - String[] lines = stats.getErrorMessage().split("\n"); - for (int i = 0; i < lines.length; i++) { - String s = lines[i].trim(); - if (i == 0 || !org.apache.commons.lang.StringUtils.isEmpty(s)) { - sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s)); - } - } - sb.append("\n"); - } - } - - Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap"); - tezDAGStatsMapField.setAccessible(true); - Map<String, TezDAGStats> tezDAGStatsMap = - (Map<String, TezDAGStats>) tezDAGStatsMapField.get(stats); - int count = 0; - for (TezDAGStats dagStats : tezDAGStatsMap.values()) { - sb.append("\n"); - sb.append("DAG " + count++ + ":\n"); - sb.append(dagStats.getDisplayString()); - sb.append("\n"); - } - - sb.append("Input(s):\n"); - for (InputStats is : stats.getInputStats()) { - sb.append(is.getDisplayString().trim()).append("\n"); - } - sb.append("\n"); - sb.append("Output(s):\n"); - for (OutputStats os : stats.getOutputStats()) { - sb.append(os.getDisplayString().trim()).append("\n"); - } - return "Script Statistics:\n" + sb.toString(); - } catch (Exception e) { - LOGGER.error("Can not extract message from SimplePigStats", e); - return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e); - } - } - - public static List<String> extractJobIds(PigStats stat) { - if (stat instanceof SimplePigStats) { - return extractJobIdsFromSimplePigStats((SimplePigStats) stat); - } else if (stat instanceof TezPigScriptStats) { - return extractJobIdsFromTezPigStats((TezPigScriptStats) stat); - } else { - throw new RuntimeException("Unrecognized stats type:" + stat.getClass().getSimpleName()); - } - } - - public static List<String> extractJobIdsFromSimplePigStats(SimplePigStats stat) { - List<String> jobIds = new ArrayList<>(); - try { - Field jobPlanField = PigStats.class.getDeclaredField("jobPlan"); - jobPlanField.setAccessible(true); - PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stat); - List<JobStats> arr = jobPlan.getJobList(); - for (JobStats js : arr) { - jobIds.add(js.getJobId()); - } - return jobIds; - } catch (Exception e) { - LOGGER.error("Can not extract jobIds from SimpelPigStats", e); - throw new RuntimeException("Can not extract jobIds from SimpelPigStats", e); - } - } - - public static List<String> extractJobIdsFromTezPigStats(TezPigScriptStats stat) { - List<String> jobIds = new ArrayList<>(); - try { - Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap"); - tezDAGStatsMapField.setAccessible(true); - Map<String, TezDAGStats> tezDAGStatsMap = - (Map<String, TezDAGStats>) tezDAGStatsMapField.get(stat); - for (TezDAGStats dagStats : tezDAGStatsMap.values()) { - LOGGER.debug("Tez JobId:" + dagStats.getJobId()); - jobIds.add(dagStats.getJobId()); - } - return jobIds; - } catch (Exception e) { - LOGGER.error("Can not extract jobIds from TezPigScriptStats", e); - throw new RuntimeException("Can not extract jobIds from TezPigScriptStats", e); - } - } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f04ddfbe/pig/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/pig/src/main/resources/interpreter-setting.json b/pig/src/main/resources/interpreter-setting.json index 583a126..c1eb69a 100644 --- a/pig/src/main/resources/interpreter-setting.json +++ b/pig/src/main/resources/interpreter-setting.json @@ -8,13 +8,25 @@ "envName": null, "propertyName": "zeppelin.pig.execType", "defaultValue": "mapreduce", - "description": "local | mapreduce | tez" + "description": "local | mapreduce | tez_local | tez | spark_local | spark" }, "zeppelin.pig.includeJobStats": { "envName": null, "propertyName": "zeppelin.pig.includeJobStats", "defaultValue": "false", "description": "flag to include job stats in output" + }, + "SPARK_MASTER": { + "envName": "SPARK_MASTER", + "propertyName": "SPARK_MASTER", + "defaultValue": "local", + "description": "local | yarn-client" + }, + "SPARK_JAR": { + "envName": "SPARK_JAR", + "propertyName": "SPARK_JAR", + "defaultValue": "", + "description": "spark assembly jar uploaded in hdfs" } }, "editor": { @@ -27,12 +39,6 @@ "name": "query", "className": "org.apache.zeppelin.pig.PigQueryInterpreter", "properties": { - "zeppelin.pig.execType": { - "envName": null, - "propertyName": "zeppelin.pig.execType", - "defaultValue": "mapreduce", - "description": "local | mapreduce | tez" - }, "zeppelin.pig.maxResult": { "envName": null, "propertyName": "zeppelin.pig.maxResult", http://git-wip-us.apache.org/repos/asf/zeppelin/blob/f04ddfbe/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java ---------------------------------------------------------------------- diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java new file mode 100644 index 0000000..e821bfe --- /dev/null +++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.pig; + +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.After; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class PigInterpreterSparkTest { + private PigInterpreter pigInterpreter; + private InterpreterContext context; + + public void setUpSpark(boolean includeJobStats) { + Properties properties = new Properties(); + properties.put("zeppelin.pig.execType", "spark_local"); + properties.put("zeppelin.pig.includeJobStats", includeJobStats + ""); + pigInterpreter = new PigInterpreter(properties); + pigInterpreter.open(); + context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, null, + null, null); + + } + @After + public void tearDown() { + pigInterpreter.close(); + } + + @Test + public void testBasics() throws IOException { + setUpSpark(false); + + String content = "1\tandy\n" + + "2\tpeter\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // simple pig script using dump + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + + "dump a;"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)")); + + // describe + pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}")); + + // syntax error (compilation error) + pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().get(0).getData().contains("Syntax error, unexpected symbol at or near 'a'")); + + // syntax error + pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + + "foreach a generate $0;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().get(0).getData().contains("expecting one of")); + } + + @Test + public void testIncludeJobStats() throws IOException { + setUpSpark(true); + + String content = "1\tandy\n" + + "2\tpeter\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // simple pig script using dump + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + + "dump a;"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().get(0).getData().contains("Spark Job")); + assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)")); + + // describe + pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + // no job is launched, so no jobStats + assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}")); + + // syntax error (compilation error) + pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + // no job is launched, so no jobStats + assertTrue(result.message().get(0).getData().contains("Syntax error, unexpected symbol at or near 'a'")); + + // execution error + pigscript = "a = load 'invalid_path';" + + "dump a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().get(0).getData().contains("Failed to read data from")); + } + +} + +