Repository: zeppelin
Updated Branches:
  refs/heads/master 5d5d758ce -> 465c51a41


ZEPPELIN-335. Pig Interpreter

### What is this PR for?
Based on #338 , I refactor most of pig interpreter. As I don't think the 
approach in #338 is the best approach. In #338, we use script `bin/pig` to 
launch pig script, it is different to control that job (hard to kill and get 
progress and stats info).  In this PR, I use pig api to launch pig script. 
Besides that I implement another interpreter type `%pig.query` to leverage the 
display system of zeppelin. For the details you can check `pig.md`

### What type of PR is it?
[Feature]

### Todos
* Syntax Highlight
* new interpreter type `%pig.udf`, so that user can write pig udf in zeppelin 
directly and don't need to build udf jar manually.

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-335

### How should this be tested?
Unit test is added and also manual test is done

### Screenshots (if appropriate)

![image](https://cloud.githubusercontent.com/assets/164491/18986649/54217b4c-8730-11e6-9e33-25f98a98a9b6.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>
Author: Ali Bajwa <aba...@hortonworks.com>
Author: AhyoungRyu <ahyoung...@apache.org>
Author: Jeff Zhang <zjf...@gmail.com>

Closes #1476 from zjffdu/ZEPPELIN-335 and squashes the following commits:

73a07f0 [Jeff Zhang] minor update
a1b742b [Jeff Zhang] minor update on doc
e858301 [Jeff Zhang] address comments
c85a090 [Jeff Zhang] add license
58b4b2f [Jeff Zhang] minor update of docs
1ae7db2 [Jeff Zhang] Merge pull request #2 from AhyoungRyu/ZEPPELIN-335/docs
fe014a7 [AhyoungRyu] Fix docs title in front matter
df7a6db [AhyoungRyu] Add pig.md to dropdown menu
5e2e222 [AhyoungRyu] Minor update for pig.md
39f161a [Jeff Zhang] address comments
05a3b9b [Jeff Zhang] add pig.md
a09a7f7 [Jeff Zhang] refactor pig Interpreter
c28beb5 [Ali Bajwa] Updated based on comments: 1. Documentation: added pig.md 
with interpreter documentation and added pig entry to index.md 2. Added test 
junit test based on passwd file parsing example here 
https://pig.apache.org/docs/r0.10.0/start.html#run 3. Removed author tag from 
comment (this was copied from shell interpreter 
https://github.com/apache/incubator-zeppelin/blob/master/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java#L42)
 4. Implemented cancel functionality 5. Display output stream in case of error
2586336 [Ali Bajwa] exposed timeout and pig executable via interpreter and 
added comments
7abad20 [Ali Bajwa] initial commit of pig interpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/465c51a4
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/465c51a4
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/465c51a4

Branch: refs/heads/master
Commit: 465c51a419347f89a7d73c8014b3b46f27f1d5b8
Parents: 5d5d758
Author: Jeff Zhang <zjf...@apache.org>
Authored: Tue Oct 11 10:27:39 2016 +0800
Committer: Felix Cheung <felixche...@apache.org>
Committed: Sat Oct 15 12:26:50 2016 -0700

----------------------------------------------------------------------
 bin/interpreter.sh                              |  22 ++
 conf/interpreter-list                           |   1 +
 conf/zeppelin-site.xml.template                 |   2 +-
 docs/_includes/themes/zeppelin/_navigation.html |   1 +
 docs/interpreter/pig.md                         |  97 ++++++
 pig/pom.xml                                     | 184 ++++++++++++
 .../apache/zeppelin/pig/BasePigInterpreter.java | 100 +++++++
 .../org/apache/zeppelin/pig/PigInterpreter.java | 137 +++++++++
 .../zeppelin/pig/PigQueryInterpreter.java       | 172 +++++++++++
 .../apache/zeppelin/pig/PigScriptListener.java  |  94 ++++++
 .../java/org/apache/zeppelin/pig/PigUtils.java  | 292 +++++++++++++++++++
 pig/src/main/resources/interpreter-setting.json |  46 +++
 .../apache/zeppelin/pig/PigInterpreterTest.java | 155 ++++++++++
 .../zeppelin/pig/PigQueryInterpreterTest.java   | 153 ++++++++++
 pig/src/test/resources/log4j.properties         |  22 ++
 pom.xml                                         |   1 +
 zeppelin-distribution/src/bin_license/LICENSE   |  10 +-
 .../zeppelin/conf/ZeppelinConfiguration.java    |   4 +-
 18 files changed, 1490 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index a81c8f2..4fb4b26 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -149,6 +149,28 @@ elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then
   else
     echo "HBASE_HOME and HBASE_CONF_DIR are not set, configuration might not 
be loaded"
   fi
+elif [[ "${INTERPRETER_ID}" == "pig" ]]; then
+   # autodetect HADOOP_CONF_HOME by heuristic
+  if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
+    if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
+      export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
+    elif [[ -d "/etc/hadoop/conf" ]]; then
+      export HADOOP_CONF_DIR="/etc/hadoop/conf"
+    fi
+  fi
+
+  if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
+    ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
+  fi
+  
+  # autodetect TEZ_CONF_DIR
+  if [[ -n "${TEZ_CONF_DIR}" ]]; then
+    ZEPPELIN_INTP_CLASSPATH+=":${TEZ_CONF_DIR}"
+  elif [[ -d "/etc/tez/conf" ]]; then
+    ZEPPELIN_INTP_CLASSPATH+=":/etc/tez/conf"
+  else
+    echo "TEZ_CONF_DIR is not set, configuration might not be loaded"
+  fi
 fi
 
 addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}"

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/conf/interpreter-list
----------------------------------------------------------------------
diff --git a/conf/interpreter-list b/conf/interpreter-list
index 098b3c6..38cb386 100644
--- a/conf/interpreter-list
+++ b/conf/interpreter-list
@@ -32,6 +32,7 @@ kylin           org.apache.zeppelin:zeppelin-kylin:0.6.1      
          Kylin in
 lens            org.apache.zeppelin:zeppelin-lens:0.6.1                 Lens 
interpreter
 livy            org.apache.zeppelin:zeppelin-livy:0.6.1                 Livy 
interpreter
 md              org.apache.zeppelin:zeppelin-markdown:0.6.1             
Markdown support
+pig             org.apache.zeppelin:zeppelin-pig:0.6.1                  Pig 
interpreter
 postgresql      org.apache.zeppelin:zeppelin-postgresql:0.6.1           
Postgresql interpreter
 python          org.apache.zeppelin:zeppelin-python:0.6.1               Python 
interpreter
 shell           org.apache.zeppelin:zeppelin-shell:0.6.1                Shell 
command

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 05bd719..c4b369c 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -190,7 +190,7 @@
 
 <property>
   <name>zeppelin.interpreters</name>
-  
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,or
 
g.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter</value>
+  
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,or
 
g.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter,
 org.apache.zeppelin.pig.PigQueryInterpreter</value>
   <description>Comma separated interpreter configurations. First interpreter 
become a default</description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/docs/_includes/themes/zeppelin/_navigation.html
----------------------------------------------------------------------
diff --git a/docs/_includes/themes/zeppelin/_navigation.html 
b/docs/_includes/themes/zeppelin/_navigation.html
index a0e4485..9abcdb1 100644
--- a/docs/_includes/themes/zeppelin/_navigation.html
+++ b/docs/_includes/themes/zeppelin/_navigation.html
@@ -62,6 +62,7 @@
                 <li><a href="{{BASE_PATH}}/interpreter/lens.html">Lens</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/livy.html">Livy</a></li>
                 <li><a 
href="{{BASE_PATH}}/interpreter/markdown.html">Markdown</a></li>
+                <li><a href="{{BASE_PATH}}/interpreter/pig.html">Pig</a></li>
                 <li><a 
href="{{BASE_PATH}}/interpreter/python.html">Python</a></li>
                 <li><a 
href="{{BASE_PATH}}/interpreter/postgresql.html">Postgresql, HAWQ</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/r.html">R</a></li>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/docs/interpreter/pig.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md
new file mode 100644
index 0000000..227656b
--- /dev/null
+++ b/docs/interpreter/pig.md
@@ -0,0 +1,97 @@
+---
+layout: page
+title: "Pig Interpreter for Apache Zeppelin"
+description: "Apache Pig is a platform for analyzing large data sets that 
consists of a high-level language for expressing data analysis programs, 
coupled with infrastructure for evaluating these programs."
+group: manual
+---
+{% include JB/setup %}
+
+
+# Pig Interpreter for Apache Zeppelin
+
+<div id="toc"></div>
+
+## Overview
+[Apache Pig](https://pig.apache.org/) is a platform for analyzing large data 
sets that consists of a high-level language for expressing data analysis 
programs, coupled with infrastructure for evaluating these programs. The 
salient property of Pig programs is that their structure is amenable to 
substantial parallelization, which in turns enables them to handle very large 
data sets.
+
+## Supported interpreter type
+  - `%pig.script` (default)
+    
+    All the pig script can run in this type of interpreter, and display type 
is plain text.
+  
+  - `%pig.query`
+ 
+    Almost the same as `%pig.script`. The only difference is that you don't 
need to add alias in the last statement. And the display type is table.   
+
+## Supported runtime mode
+  - Local
+  - MapReduce
+  - Tez  (Only Tez 0.7 is supported)
+
+## How to use
+
+### How to setup Pig
+
+- Local Mode
+
+    Nothing needs to be done for local mode
+
+- MapReduce Mode
+
+    HADOOP\_CONF\_DIR needs to be specified in 
`ZEPPELIN_HOME/conf/zeppelin-env.sh`.
+
+- Tez Mode
+
+    HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in 
`ZEPPELIN_HOME/conf/zeppelin-env.sh`.
+
+### How to configure interpreter
+
+At the Interpreters menu, you have to create a new Pig interpreter. Pig 
interpreter has below properties by default.
+
+<table class="table-configuration">
+    <tr>
+        <th>Property</th>
+        <th>Default</th>
+        <th>Description</th>
+    </tr>
+    <tr>
+        <td>zeppelin.pig.execType</td>
+        <td>mapreduce</td>
+        <td>Execution mode for pig runtime. local | mapreduce | tez </td>
+    </tr>
+    <tr>
+        <td>zeppelin.pig.includeJobStats</td>
+        <td>false</td>
+        <td>whether display jobStats info in <code>%pig.script</code></td>
+    </tr>
+    <tr>
+        <td>zeppelin.pig.maxResult</td>
+        <td>1000</td>
+        <td>max row number displayed in <code>%pig.query</code></td>
+    </tr>
+</table>  
+
+### Example
+
+##### pig
+
+```
+%pig
+
+raw_data = load 'dataset/sf_crime/train.csv' using PigStorage(',') as 
(Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,X,Y);
+b = group raw_data all;
+c = foreach b generate COUNT($1);
+dump c;
+```
+
+##### pig.query
+
+```
+%pig.query
+
+b = foreach raw_data generate Category;
+c = group b by Category;
+foreach c generate group as category, COUNT($1) as count;
+```
+
+Data is shared between `%pig` and `%pig.query`, so that you can do some common 
work in `%pig`, and do different kinds of query based on the data of `%pig`.

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/pom.xml
----------------------------------------------------------------------
diff --git a/pig/pom.xml b/pig/pom.xml
new file mode 100644
index 0000000..a4e5cbb
--- /dev/null
+++ b/pig/pom.xml
@@ -0,0 +1,184 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>zeppelin</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.7.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>zeppelin-pig</artifactId>
+    <packaging>jar</packaging>
+    <version>0.7.0-SNAPSHOT</version>
+    <name>Zeppelin: Apache Pig Interpreter</name>
+    <description>Zeppelin interpreter for Apache Pig</description>
+    <url>http://zeppelin.apache.org</url>
+
+    <properties>
+        <pig.version>0.16.0</pig.version>
+        <hadoop.version>2.6.0</hadoop.version>
+        <tez.version>0.7.0</tez.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>zeppelin-interpreter</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.pig</groupId>
+            <artifactId>pig</artifactId>
+            <classifier>h2</classifier>
+            <version>${pig.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-api</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-common</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-dag</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-runtime-library</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-runtime-internals</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-mapreduce</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-yarn-timeline-history-with-acls</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>1.3.1</version>
+                <executions>
+                    <execution>
+                        <id>enforce</id>
+                        <phase>none</phase>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.8</version>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            
<outputDirectory>${project.build.directory}/../../interpreter/pig
+                            </outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                            <includeScope>runtime</includeScope>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>copy-artifact</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            
<outputDirectory>${project.build.directory}/../../interpreter/pig
+                            </outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                            <includeScope>runtime</includeScope>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>${project.groupId}</groupId>
+                                    
<artifactId>${project.artifactId}</artifactId>
+                                    <version>${project.version}</version>
+                                    <type>${project.packaging}</type>
+                                </artifactItem>
+                            </artifactItems>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java 
b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java
new file mode 100644
index 0000000..0aa8a20
--- /dev/null
+++ b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.pig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ */
+public abstract class BasePigInterpreter extends Interpreter {
+
+  private static Logger LOGGER = 
LoggerFactory.getLogger(BasePigInterpreter.class);
+
+  protected ConcurrentHashMap<String, PigScriptListener> listenerMap = new 
ConcurrentHashMap<>();
+
+  public BasePigInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+    LOGGER.info("Cancel paragraph:" + context.getParagraphId());
+    PigScriptListener listener = listenerMap.get(context.getParagraphId());
+    if (listener != null) {
+      Set<String> jobIds = listener.getJobIds();
+      if (jobIds.isEmpty()) {
+        LOGGER.info("No job is started, so can not cancel paragraph:" + 
context.getParagraphId());
+      }
+      for (String jobId : jobIds) {
+        LOGGER.info("Kill jobId:" + jobId);
+        HExecutionEngine engine =
+                (HExecutionEngine) 
getPigServer().getPigContext().getExecutionEngine();
+        try {
+          Field launcherField = 
HExecutionEngine.class.getDeclaredField("launcher");
+          launcherField.setAccessible(true);
+          Launcher launcher = (Launcher) launcherField.get(engine);
+          // It doesn't work for Tez Engine due to PIG-5035
+          launcher.killJob(jobId, new Configuration());
+        } catch (NoSuchFieldException | BackendException | 
IllegalAccessException e) {
+          LOGGER.error("Fail to cancel paragraph:" + context.getParagraphId(), 
e);
+        }
+      }
+    } else {
+      LOGGER.warn("No PigScriptListener found, can not cancel paragraph:"
+              + context.getParagraphId());
+    }
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.SIMPLE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    PigScriptListener listener = listenerMap.get(context.getParagraphId());
+    if (listener != null) {
+      return listener.getProgress();
+    }
+    return 0;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+            PigInterpreter.class.getName() + this.hashCode());
+  }
+
+  public abstract PigServer getPigServer();
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java 
b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
new file mode 100644
index 0000000..8cd1efc
--- /dev/null
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.pig;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.tools.pigstats.*;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.*;
+
+/**
+ * Pig interpreter for Zeppelin.
+ */
+public class PigInterpreter extends BasePigInterpreter {
+  private static Logger LOGGER = LoggerFactory.getLogger(PigInterpreter.class);
+
+  private PigServer pigServer;
+  private boolean includeJobStats = false;
+
+  public PigInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+    String execType = getProperty("zeppelin.pig.execType");
+    if (execType == null) {
+      execType = "mapreduce";
+    }
+    String includeJobStats = getProperty("zeppelin.pig.includeJobStats");
+    if (includeJobStats != null) {
+      this.includeJobStats = Boolean.parseBoolean(includeJobStats);
+    }
+    try {
+      pigServer = new PigServer(execType);
+    } catch (IOException e) {
+      LOGGER.error("Fail to initialize PigServer", e);
+      throw new RuntimeException("Fail to initialize PigServer", e);
+    }
+  }
+
+  @Override
+  public void close() {
+    pigServer = null;
+  }
+
+
+  @Override
+  public InterpreterResult interpret(String cmd, InterpreterContext 
contextInterpreter) {
+    // remember the origial stdout, because we will redirect stdout to capture
+    // the pig dump output.
+    PrintStream originalStdOut = System.out;
+    ByteArrayOutputStream bytesOutput = new ByteArrayOutputStream();
+    File tmpFile = null;
+    try {
+      tmpFile = PigUtils.createTempPigScript(cmd);
+      System.setOut(new PrintStream(bytesOutput));
+      // each thread should its own ScriptState & PigStats
+      
ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState());
+      // reset PigStats, otherwise you may get the PigStats of last job in the 
same thread
+      // because PigStats is ThreadLocal variable
+      
PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats());
+      PigScriptListener scriptListener = new PigScriptListener();
+      ScriptState.get().registerListener(scriptListener);
+      listenerMap.put(contextInterpreter.getParagraphId(), scriptListener);
+      pigServer.registerScript(tmpFile.getAbsolutePath());
+    } catch (IOException e) {
+      if (e instanceof FrontendException) {
+        FrontendException fe = (FrontendException) e;
+        if (!fe.getMessage().contains("Backend error :")) {
+          // If the error message contains "Backend error :", that means the 
exception is from
+          // backend.
+          LOGGER.error("Fail to run pig script.", e);
+          return new InterpreterResult(Code.ERROR, 
ExceptionUtils.getStackTrace(e));
+        }
+      }
+      PigStats stats = PigStats.get();
+      if (stats != null) {
+        String errorMsg = PigUtils.extactJobStats(stats);
+        if (errorMsg != null) {
+          LOGGER.error("Fail to run pig script, " + errorMsg);
+          return new InterpreterResult(Code.ERROR, errorMsg);
+        }
+      }
+      LOGGER.error("Fail to run pig script.", e);
+      return new InterpreterResult(Code.ERROR, 
ExceptionUtils.getStackTrace(e));
+    } finally {
+      System.setOut(originalStdOut);
+      listenerMap.remove(contextInterpreter.getParagraphId());
+      if (tmpFile != null) {
+        tmpFile.delete();
+      }
+    }
+    StringBuilder outputBuilder = new StringBuilder();
+    PigStats stats = PigStats.get();
+    if (stats != null && includeJobStats) {
+      String jobStats = PigUtils.extactJobStats(stats);
+      if (jobStats != null) {
+        outputBuilder.append(jobStats);
+      }
+    }
+    outputBuilder.append(bytesOutput.toString());
+    return new InterpreterResult(Code.SUCCESS, outputBuilder.toString());
+  }
+
+
+  public PigServer getPigServer() {
+    return pigServer;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java 
b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
new file mode 100644
index 0000000..c763b7f
--- /dev/null
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.pig;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ *
+ */
+public class PigQueryInterpreter extends BasePigInterpreter {
+
+  private static Logger LOGGER = 
LoggerFactory.getLogger(PigQueryInterpreter.class);
+  private PigServer pigServer;
+  private int maxResult;
+
+  public PigQueryInterpreter(Properties properties) {
+    super(properties);
+  }
+
+  @Override
+  public void open() {
+    pigServer = getPigInterpreter().getPigServer();
+    maxResult = Integer.parseInt(getProperty("zeppelin.pig.maxResult"));
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    // '-' is invalid for pig alias
+    String alias = "paragraph_" + context.getParagraphId().replace("-", "_");
+    String[] lines = st.split("\n");
+    List<String> queries = new ArrayList<String>();
+    for (int i = 0; i < lines.length; ++i) {
+      if (i == lines.length - 1) {
+        lines[i] = alias + " = " + lines[i];
+      }
+      queries.add(lines[i]);
+    }
+
+    StringBuilder resultBuilder = new StringBuilder("%table ");
+    try {
+      File tmpScriptFile = PigUtils.createTempPigScript(queries);
+      // each thread should its own ScriptState & PigStats
+      
ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState());
+      // reset PigStats, otherwise you may get the PigStats of last job in the 
same thread
+      // because PigStats is ThreadLocal variable
+      
PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats());
+      PigScriptListener scriptListener = new PigScriptListener();
+      ScriptState.get().registerListener(scriptListener);
+      listenerMap.put(context.getParagraphId(), scriptListener);
+      pigServer.registerScript(tmpScriptFile.getAbsolutePath());
+      Schema schema = pigServer.dumpSchema(alias);
+      boolean schemaKnown = (schema != null);
+      if (schemaKnown) {
+        for (int i = 0; i < schema.size(); ++i) {
+          Schema.FieldSchema field = schema.getField(i);
+          resultBuilder.append(field.alias);
+          if (i != schema.size() - 1) {
+            resultBuilder.append("\t");
+          }
+        }
+        resultBuilder.append("\n");
+      }
+      Iterator<Tuple> iter = pigServer.openIterator(alias);
+      boolean firstRow = true;
+      int index = 0;
+      while (iter.hasNext() && index <= maxResult) {
+        index++;
+        Tuple tuple = iter.next();
+        if (firstRow && !schemaKnown) {
+          for (int i = 0; i < tuple.size(); ++i) {
+            resultBuilder.append("c_" + i + "\t");
+          }
+          resultBuilder.append("\n");
+          firstRow = false;
+        }
+        resultBuilder.append(StringUtils.join(tuple, "\t"));
+        resultBuilder.append("\n");
+      }
+      if (index >= maxResult && iter.hasNext()) {
+        resultBuilder.append("\n<font color=red>Results are limited by " + 
maxResult + ".</font>");
+      }
+    } catch (IOException e) {
+      // Extract error in the following order
+      // 1. catch FrontendException, FrontendException happens in the query 
compilation phase.
+      // 2. PigStats, This is execution error
+      // 3. Other errors.
+      if (e instanceof FrontendException) {
+        FrontendException fe = (FrontendException) e;
+        if (!fe.getMessage().contains("Backend error :")) {
+          LOGGER.error("Fail to run pig query.", e);
+          return new InterpreterResult(Code.ERROR, 
ExceptionUtils.getStackTrace(e));
+        }
+      }
+      PigStats stats = PigStats.get();
+      if (stats != null) {
+        String errorMsg = PigUtils.extactJobStats(stats);
+        if (errorMsg != null) {
+          return new InterpreterResult(Code.ERROR, errorMsg);
+        }
+      }
+      LOGGER.error("Fail to run pig query.", e);
+      return new InterpreterResult(Code.ERROR, 
ExceptionUtils.getStackTrace(e));
+    } finally {
+      listenerMap.remove(context.getParagraphId());
+    }
+    return new InterpreterResult(Code.SUCCESS, resultBuilder.toString());
+  }
+
+  @Override
+  public PigServer getPigServer() {
+    return this.pigServer;
+  }
+
+  private PigInterpreter getPigInterpreter() {
+    LazyOpenInterpreter lazy = null;
+    PigInterpreter pig = null;
+    Interpreter p = 
getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName());
+
+    while (p instanceof WrappedInterpreter) {
+      if (p instanceof LazyOpenInterpreter) {
+        lazy = (LazyOpenInterpreter) p;
+      }
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
+    }
+    pig = (PigInterpreter) p;
+
+    if (lazy != null) {
+      lazy.open();
+    }
+    return pig;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java 
b/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java
new file mode 100644
index 0000000..1f88b2e
--- /dev/null
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.pig;
+
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ */
+public class PigScriptListener implements PigProgressNotificationListener {
+
+  private static Logger LOGGER = 
LoggerFactory.getLogger(PigScriptListener.class);
+
+  private Set<String> jobIds = new HashSet();
+  private int progress;
+
+  @Override
+  public void initialPlanNotification(String scriptId, OperatorPlan<?> plan) {
+
+  }
+
+  @Override
+  public void launchStartedNotification(String scriptId, int numJobsToLaunch) {
+
+  }
+
+  @Override
+  public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) 
{
+
+  }
+
+  @Override
+  public void jobStartedNotification(String scriptId, String assignedJobId) {
+    this.jobIds.add(assignedJobId);
+  }
+
+  @Override
+  public void jobFinishedNotification(String scriptId, JobStats jobStats) {
+
+  }
+
+  @Override
+  public void jobFailedNotification(String scriptId, JobStats jobStats) {
+
+  }
+
+  @Override
+  public void outputCompletedNotification(String scriptId, OutputStats 
outputStats) {
+
+  }
+
+  @Override
+  public void progressUpdatedNotification(String scriptId, int progress) {
+    LOGGER.debug("scriptId:" + scriptId + ", progress:" + progress);
+    this.progress = progress;
+  }
+
+  @Override
+  public void launchCompletedNotification(String scriptId, int 
numJobsSucceeded) {
+
+  }
+
+  public Set<String> getJobIds() {
+    return jobIds;
+  }
+
+  public int getProgress() {
+    return progress;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java 
b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
new file mode 100644
index 0000000..d444e02
--- /dev/null
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.pig;
+
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pig.PigRunner;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
+import org.apache.pig.tools.pigstats.InputStats;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
+import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
+import org.apache.pig.tools.pigstats.tez.TezDAGStats;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class PigUtils {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(PigUtils.class);
+
+  protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+  public static File createTempPigScript(String content) throws IOException {
+    File tmpFile = File.createTempFile("zeppelin", "pig");
+    LOGGER.debug("Create pig script file:" + tmpFile.getAbsolutePath());
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+    return tmpFile.getAbsoluteFile();
+  }
+
+  public static File createTempPigScript(List<String> lines) throws 
IOException {
+    return createTempPigScript(StringUtils.join(lines, "\n"));
+  }
+
+  public static String extactJobStats(PigStats stats) {
+    if (stats instanceof SimplePigStats) {
+      return extractFromSimplePigStats((SimplePigStats) stats);
+    } else if (stats instanceof TezPigScriptStats) {
+      return extractFromTezPigStats((TezPigScriptStats) stats);
+    } else {
+      throw new RuntimeException("Unrecognized stats type:" + 
stats.getClass().getSimpleName());
+    }
+  }
+
+  public static String extractFromSimplePigStats(SimplePigStats stats) {
+
+    try {
+      Field userIdField = PigStats.class.getDeclaredField("userId");
+      userIdField.setAccessible(true);
+      String userId = (String) (userIdField.get(stats));
+      Field startTimeField = PigStats.class.getDeclaredField("startTime");
+      startTimeField.setAccessible(true);
+      long startTime = (Long) (startTimeField.get(stats));
+      Field endTimeField = PigStats.class.getDeclaredField("endTime");
+      endTimeField.setAccessible(true);
+      long endTime = (Long) (endTimeField.get(stats));
+
+      if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) {
+        LOGGER.warn("unknown return code, can't display the results");
+        return null;
+      }
+      if (stats.getPigContext() == null) {
+        LOGGER.warn("unknown exec type, don't display the results");
+        return null;
+      }
+
+      SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+      StringBuilder sb = new StringBuilder();
+      
sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
+      
sb.append(stats.getHadoopVersion()).append("\t").append(stats.getPigVersion()).append("\t")
+              .append(userId).append("\t")
+              .append(sdf.format(new Date(startTime))).append("\t")
+              .append(sdf.format(new Date(endTime))).append("\t")
+              .append(stats.getFeatures()).append("\n");
+      sb.append("\n");
+      if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
+        sb.append("Success!\n");
+      } else if (stats.getReturnCode() == 
PigRunner.ReturnCode.PARTIAL_FAILURE) {
+        sb.append("Some jobs have failed! Stop running all dependent jobs\n");
+      } else {
+        sb.append("Failed!\n");
+      }
+      sb.append("\n");
+
+      Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
+      jobPlanField.setAccessible(true);
+      PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stats);
+
+      if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS
+              || stats.getReturnCode() == 
PigRunner.ReturnCode.PARTIAL_FAILURE) {
+        sb.append("Job Stats (time in seconds):\n");
+        sb.append(MRJobStats.SUCCESS_HEADER).append("\n");
+        List<JobStats> arr = jobPlan.getSuccessfulJobs();
+        for (JobStats js : arr) {
+          sb.append(js.getDisplayString());
+        }
+        sb.append("\n");
+      }
+      if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
+              || stats.getReturnCode() == 
PigRunner.ReturnCode.PARTIAL_FAILURE) {
+        sb.append("Failed Jobs:\n");
+        sb.append(MRJobStats.FAILURE_HEADER).append("\n");
+        List<JobStats> arr = jobPlan.getFailedJobs();
+        for (JobStats js : arr) {
+          sb.append(js.getDisplayString());
+        }
+        sb.append("\n");
+      }
+      sb.append("Input(s):\n");
+      for (InputStats is : stats.getInputStats()) {
+        sb.append(is.getDisplayString());
+      }
+      sb.append("\n");
+      sb.append("Output(s):\n");
+      for (OutputStats ds : stats.getOutputStats()) {
+        sb.append(ds.getDisplayString());
+      }
+
+      sb.append("\nCounters:\n");
+      sb.append("Total records written : " + 
stats.getRecordWritten()).append("\n");
+      sb.append("Total bytes written : " + 
stats.getBytesWritten()).append("\n");
+      sb.append("Spillable Memory Manager spill count : "
+              + stats.getSMMSpillCount()).append("\n");
+      sb.append("Total bags proactively spilled: "
+              + stats.getProactiveSpillCountObjects()).append("\n");
+      sb.append("Total records proactively spilled: "
+              + stats.getProactiveSpillCountRecords()).append("\n");
+      sb.append("\nJob DAG:\n").append(jobPlan.toString());
+
+      return "Script Statistics: \n" + sb.toString();
+    } catch (Exception e) {
+      LOGGER.error("Can not extract message from SimplePigStats", e);
+      return "Can not extract message from SimpelPigStats," + 
ExceptionUtils.getStackTrace(e);
+    }
+  }
+
+  private static String extractFromTezPigStats(TezPigScriptStats stats) {
+
+    try {
+      Field userIdField = PigStats.class.getDeclaredField("userId");
+      userIdField.setAccessible(true);
+      String userId = (String) (userIdField.get(stats));
+      Field startTimeField = PigStats.class.getDeclaredField("startTime");
+      startTimeField.setAccessible(true);
+      long startTime = (Long) (startTimeField.get(stats));
+      Field endTimeField = PigStats.class.getDeclaredField("endTime");
+      endTimeField.setAccessible(true);
+      long endTime = (Long) (endTimeField.get(stats));
+
+      SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+      StringBuilder sb = new StringBuilder();
+      sb.append("\n");
+      sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", 
stats.getHadoopVersion()));
+      sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", 
stats.getPigVersion()));
+      sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", 
TezExecType.getTezVersion()));
+      sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId));
+      sb.append(String.format("%1$20s: %2$-100s%n", "FileName", 
stats.getFileName()));
+      sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", 
sdf.format(new Date(startTime))));
+      sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", 
sdf.format(new Date(endTime))));
+      sb.append(String.format("%1$20s: %2$-100s%n", "Features", 
stats.getFeatures()));
+      sb.append("\n");
+      if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
+        sb.append("Success!\n");
+      } else if (stats.getReturnCode() == 
PigRunner.ReturnCode.PARTIAL_FAILURE) {
+        sb.append("Some tasks have failed! Stop running all dependent 
tasks\n");
+      } else {
+        sb.append("Failed!\n");
+      }
+      sb.append("\n");
+
+      // Print diagnostic info in case of failure
+      if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
+              || stats.getReturnCode() == 
PigRunner.ReturnCode.PARTIAL_FAILURE) {
+        if (stats.getErrorMessage() != null) {
+          String[] lines = stats.getErrorMessage().split("\n");
+          for (int i = 0; i < lines.length; i++) {
+            String s = lines[i].trim();
+            if (i == 0 || !org.apache.commons.lang.StringUtils.isEmpty(s)) {
+              sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? 
"ErrorMessage" : "", s));
+            }
+          }
+          sb.append("\n");
+        }
+      }
+
+      Field tezDAGStatsMapField = 
TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
+      tezDAGStatsMapField.setAccessible(true);
+      Map<String, TezDAGStats> tezDAGStatsMap =
+              (Map<String, TezDAGStats>) tezDAGStatsMapField.get(stats);
+      int count = 0;
+      for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+        sb.append("\n");
+        sb.append("DAG " + count++ + ":\n");
+        sb.append(dagStats.getDisplayString());
+        sb.append("\n");
+      }
+
+      sb.append("Input(s):\n");
+      for (InputStats is : stats.getInputStats()) {
+        sb.append(is.getDisplayString().trim()).append("\n");
+      }
+      sb.append("\n");
+      sb.append("Output(s):\n");
+      for (OutputStats os : stats.getOutputStats()) {
+        sb.append(os.getDisplayString().trim()).append("\n");
+      }
+      return "Script Statistics:\n" + sb.toString();
+    } catch (Exception e) {
+      LOGGER.error("Can not extract message from SimplePigStats", e);
+      return "Can not extract message from SimpelPigStats," + 
ExceptionUtils.getStackTrace(e);
+    }
+  }
+
+  public static List<String> extractJobIds(PigStats stat) {
+    if (stat instanceof SimplePigStats) {
+      return extractJobIdsFromSimplePigStats((SimplePigStats) stat);
+    } else if (stat instanceof TezPigScriptStats) {
+      return extractJobIdsFromTezPigStats((TezPigScriptStats) stat);
+    } else {
+      throw new RuntimeException("Unrecognized stats type:" + 
stat.getClass().getSimpleName());
+    }
+  }
+
+  public static List<String> extractJobIdsFromSimplePigStats(SimplePigStats 
stat) {
+    List<String> jobIds = new ArrayList<>();
+    try {
+      Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
+      jobPlanField.setAccessible(true);
+      PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stat);
+      List<JobStats> arr = jobPlan.getJobList();
+      for (JobStats js : arr) {
+        jobIds.add(js.getJobId());
+      }
+      return jobIds;
+    } catch (Exception e) {
+      LOGGER.error("Can not extract jobIds from SimpelPigStats", e);
+      throw new RuntimeException("Can not extract jobIds from SimpelPigStats", 
e);
+    }
+  }
+
+  public static List<String> extractJobIdsFromTezPigStats(TezPigScriptStats 
stat) {
+    List<String> jobIds = new ArrayList<>();
+    try {
+      Field tezDAGStatsMapField = 
TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
+      tezDAGStatsMapField.setAccessible(true);
+      Map<String, TezDAGStats> tezDAGStatsMap =
+              (Map<String, TezDAGStats>) tezDAGStatsMapField.get(stat);
+      for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+        LOGGER.debug("Tez JobId:" + dagStats.getJobId());
+        jobIds.add(dagStats.getJobId());
+      }
+      return jobIds;
+    } catch (Exception e) {
+      LOGGER.error("Can not extract jobIds from TezPigScriptStats", e);
+      throw new RuntimeException("Can not extract jobIds from 
TezPigScriptStats", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/pig/src/main/resources/interpreter-setting.json 
b/pig/src/main/resources/interpreter-setting.json
new file mode 100644
index 0000000..27918ed
--- /dev/null
+++ b/pig/src/main/resources/interpreter-setting.json
@@ -0,0 +1,46 @@
+[
+  {
+    "group": "pig",
+    "name": "script",
+    "className": "org.apache.zeppelin.pig.PigInterpreter",
+    "properties": {
+      "zeppelin.pig.execType": {
+        "envName": null,
+        "propertyName": "zeppelin.pig.execType",
+        "defaultValue": "mapreduce",
+        "description": "local | mapreduce | tez"
+      },
+      "zeppelin.pig.includeJobStats": {
+        "envName": null,
+        "propertyName": "zeppelin.pig.includeJobStats",
+        "defaultValue": "false",
+        "description": "flag to include job stats in output"
+      }
+    },
+    "editor": {
+      "language": "pig"
+    }
+  },
+  {
+    "group": "pig",
+    "name": "query",
+    "className": "org.apache.zeppelin.pig.PigQueryInterpreter",
+    "properties": {
+      "zeppelin.pig.execType": {
+        "envName": null,
+        "propertyName": "zeppelin.pig.execType",
+        "defaultValue": "mapreduce",
+        "description": "local | mapreduce | tez"
+      },
+      "zeppelin.pig.maxResult": {
+        "envName": null,
+        "propertyName": "zeppelin.pig.maxResult",
+        "defaultValue": "1000",
+        "description": "max row number for %pig.query"
+      }
+    },
+    "editor": {
+      "language": "pig"
+    }
+  }
+]

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
----------------------------------------------------------------------
diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java 
b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
new file mode 100644
index 0000000..3d062d6
--- /dev/null
+++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.pig;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PigInterpreterTest {
+
+  private PigInterpreter pigInterpreter;
+  private InterpreterContext context;
+
+  @Before
+  public void setUp() {
+    Properties properties = new Properties();
+    properties.put("zeppelin.pig.execType", "local");
+    pigInterpreter = new PigInterpreter(properties);
+    pigInterpreter.open();
+    context = new InterpreterContext(null, "paragraph_id", null, null, null, 
null, null, null, null,
+            null, null);
+  }
+
+  @After
+  public void tearDown() {
+    pigInterpreter.close();
+  }
+
+  @Test
+  public void testBasics() throws IOException {
+    String content = "1\tandy\n"
+            + "2\tpeter\n";
+    File tmpFile = File.createTempFile("zeppelin", "test");
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+
+    // simple pig script using dump
+    String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+            + "dump a;";
+    InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.SUCCESS, result.code());
+    assertTrue(result.message().contains("(1,andy)\n(2,peter)"));
+
+    // describe
+    pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, 
name: bytearray);"
+            + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.SUCCESS, result.code());
+    assertTrue(result.message().contains("a: {id: int,name: bytearray}"));
+
+    // syntax error (compilation error)
+    pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+            + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.ERROR, result.code());
+    assertTrue(result.message().contains("Syntax error, unexpected symbol at 
or near 'a'"));
+
+    // execution error
+    pigscript = "a = load 'invalid_path';"
+            + "dump a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.ERROR, result.code());
+    assertTrue(result.message().contains("Input path does not exist"));
+  }
+
+
+  @Test
+  public void testIncludeJobStats() throws IOException {
+    Properties properties = new Properties();
+    properties.put("zeppelin.pig.execType", "local");
+    properties.put("zeppelin.pig.includeJobStats", "true");
+    pigInterpreter = new PigInterpreter(properties);
+    pigInterpreter.open();
+
+    String content = "1\tandy\n"
+            + "2\tpeter\n";
+    File tmpFile = File.createTempFile("zeppelin", "test");
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+
+    // simple pig script using dump
+    String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+            + "dump a;";
+    InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.SUCCESS, result.code());
+    assertTrue(result.message().contains("Counters:"));
+    assertTrue(result.message().contains("(1,andy)\n(2,peter)"));
+
+    // describe
+    pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, 
name: bytearray);"
+            + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.SUCCESS, result.code());
+    // no job is launched, so no jobStats
+    assertTrue(!result.message().contains("Counters:"));
+    assertTrue(result.message().contains("a: {id: int,name: bytearray}"));
+
+    // syntax error (compilation error)
+    pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+            + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.ERROR, result.code());
+    // no job is launched, so no jobStats
+    assertTrue(!result.message().contains("Counters:"));
+    assertTrue(result.message().contains("Syntax error, unexpected symbol at 
or near 'a'"));
+
+    // execution error
+    pigscript = "a = load 'invalid_path';"
+            + "dump a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.ERROR, result.code());
+    assertTrue(result.message().contains("Counters:"));
+    assertTrue(result.message().contains("Input path does not exist"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java 
b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
new file mode 100644
index 0000000..00ece44
--- /dev/null
+++ b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.pig;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ */
+public class PigQueryInterpreterTest {
+
+  private PigInterpreter pigInterpreter;
+  private PigQueryInterpreter pigQueryInterpreter;
+  private InterpreterContext context;
+
+  @Before
+  public void setUp() {
+    Properties properties = new Properties();
+    properties.put("zeppelin.pig.execType", "local");
+    properties.put("zeppelin.pig.maxResult", "20");
+
+    pigInterpreter = new PigInterpreter(properties);
+    pigQueryInterpreter = new PigQueryInterpreter(properties);
+    List<Interpreter> interpreters = new ArrayList();
+    interpreters.add(pigInterpreter);
+    interpreters.add(pigQueryInterpreter);
+    InterpreterGroup group = new InterpreterGroup();
+    group.put("note_id", interpreters);
+    pigInterpreter.setInterpreterGroup(group);
+    pigQueryInterpreter.setInterpreterGroup(group);
+    pigInterpreter.open();
+    pigQueryInterpreter.open();
+
+    context = new InterpreterContext(null, "paragraph_id", null, null, null, 
null, null, null, null,
+            null, null);
+  }
+
+  @After
+  public void tearDown() {
+    pigInterpreter.close();
+    pigQueryInterpreter.close();
+  }
+
+  @Test
+  public void testBasics() throws IOException {
+    String content = "andy\tmale\t10\n"
+            + "peter\tmale\t20\n"
+            + "amy\tfemale\t14\n";
+    File tmpFile = File.createTempFile("zeppelin", "test");
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+
+    // run script in PigInterpreter
+    String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (name, 
gender, age);\n"
+            + "a2 = load 'invalid_path' as (name, gender, age);\n"
+            + "dump a;";
+    InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, result.type());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    
assertTrue(result.message().contains("(andy,male,10)\n(peter,male,20)\n(amy,female,14)"));
+
+    // run single line query in PigQueryInterpreter
+    String query = "foreach a generate name, age;";
+    result = pigQueryInterpreter.interpret(query, context);
+    assertEquals(InterpreterResult.Type.TABLE, result.type());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("name\tage\nandy\t10\npeter\t20\namy\t14\n", 
result.message());
+
+    // run multiple line query in PigQueryInterpreter
+    query = "b = group a by gender;\nforeach b generate group as gender, 
COUNT($1) as count;";
+    result = pigQueryInterpreter.interpret(query, context);
+    assertEquals(InterpreterResult.Type.TABLE, result.type());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("gender\tcount\nmale\t2\nfemale\t1\n", result.message());
+
+    // syntax error in PigQueryInterpereter
+    query = "b = group a by invalid_column;\nforeach b generate group as 
gender, COUNT($1) as count;";
+    result = pigQueryInterpreter.interpret(query, context);
+    assertEquals(InterpreterResult.Type.TEXT, result.type());
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(result.message().contains("Projected field [invalid_column] 
does not exist in schema"));
+
+    // execution error in PigQueryInterpreter
+    query = "foreach a2 generate name, age;";
+    result = pigQueryInterpreter.interpret(query, context);
+    assertEquals(InterpreterResult.Type.TEXT, result.type());
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(result.message().contains("Input path does not exist"));
+  }
+
+  @Test
+  public void testMaxResult() throws IOException {
+    StringBuilder content = new StringBuilder();
+    for (int i=0;i<30;++i) {
+      content.append(i + "\tname_" + i + "\n");
+    }
+    File tmpFile = File.createTempFile("zeppelin", "test");
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+
+    // run script in PigInterpreter
+    String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id, 
name);";
+    InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, result.type());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    // empty output
+    assertTrue(result.message().isEmpty());
+
+    // run single line query in PigQueryInterpreter
+    String query = "foreach a generate id;";
+    result = pigQueryInterpreter.interpret(query, context);
+    assertEquals(InterpreterResult.Type.TABLE, result.type());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertTrue(result.message().contains("id\n0\n1\n2"));
+    assertTrue(result.message().contains("Results are limited by 20"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/pig/src/test/resources/log4j.properties 
b/pig/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8daee59
--- /dev/null
+++ b/pig/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger = INFO, stdout
+
+log4j.appender.stdout = org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 03b2263..558ce06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@
     <module>shell</module>
     <module>livy</module>
     <module>hbase</module>
+    <module>pig</module>
     <module>postgresql</module>
     <module>jdbc</module>
     <module>file</module>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/zeppelin-distribution/src/bin_license/LICENSE
----------------------------------------------------------------------
diff --git a/zeppelin-distribution/src/bin_license/LICENSE 
b/zeppelin-distribution/src/bin_license/LICENSE
index 2ee668a..e39a2ad 100644
--- a/zeppelin-distribution/src/bin_license/LICENSE
+++ b/zeppelin-distribution/src/bin_license/LICENSE
@@ -156,7 +156,15 @@ The following components are provided under Apache License.
     (Apache 2.0) Tachyon Project Core (org.tachyonproject:tachyon:0.6.4 - 
http://tachyonproject.org/tachyon/)
     (Apache 2.0) Tachyon Project Client 
(org.tachyonproject:tachyon-client:0.6.4 - 
http://tachyonproject.org/tachyon-client/)
     (Apache 2.0) javax.inject (javax.inject:javax.inject:1 - 
http://code.google.com/p/atinject/)
-     
+    (Apache 2.0) Apache Pig (org.apache.pig:0.16 - http://pig.apache.org)
+    (Apache 2.0) tez-api (org.apache.tez:tez-api:0.7.0 - http://tez.apache.org)
+    (Apache 2.0) tez-common (org.apache.tez:tez-common:0.7.0 - 
http://tez.apache.org)
+    (Apache 2.0) tez-dag (org.apache.tez:tez-dag:0.7.0 - http://tez.apache.org)
+    (Apache 2.0) tez-runtime-library (org.apache.tez:runtime-library:0.7.0 - 
http://tez.apache.org)
+    (Apache 2.0) tez-runtime-internals 
(org.apache.tez:tez-runtime-internals:0.7.0 - http://tez.apache.org)
+    (Apache 2.0) tez-mapreduce (org.apache.tez:tez-mapreduce:0.7.0 - 
http://tez.apache.org)
+    (Apache 2.0) tez-yarn-timeline-history-with-acls 
(org.apache.tez:tez-yarn-timeline-history-with-acls:0.7.0 - 
http://tez.apache.org)
+
 ========================================================================
 MIT licenses
 ========================================================================

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index d819869..414aed2 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -521,6 +521,8 @@ public class ZeppelinConfiguration extends XMLConfiguration 
{
         + "org.apache.zeppelin.alluxio.AlluxioInterpreter,"
         + "org.apache.zeppelin.file.HDFSFileInterpreter,"
         + "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
+        + "org.apache.zeppelin.pig.PigInterpreter,"
+        + "org.apache.zeppelin.pig.PigQueryInterpreter,"
         + "org.apache.zeppelin.flink.FlinkInterpreter,"
         + "org.apache.zeppelin.python.PythonInterpreter,"
         + "org.apache.zeppelin.python.PythonInterpreterPandasSql,"
@@ -543,7 +545,7 @@ public class ZeppelinConfiguration extends XMLConfiguration 
{
     ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 
10),
     ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", 
"spark,md,angular,sh,"
         + 
"livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
-        + "scalding,jdbc,hbase,bigquery,beam"),
+        + "scalding,jdbc,hbase,bigquery,beam,pig"),
     ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
     ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
     // use specified notebook (id) as homescreen

Reply via email to