Re: Review Request 45667: Support Pig On Spark

2016-06-13 Thread Pallavi Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45667/
---

(Updated June 14, 2016, 4:39 a.m.)


Review request for pig, Daniel Dai and Rohini Palaniswamy.


Changes
---

Uploading patch on behalf of Kelly : The new patch has ~70% of the review 
comments addressed.


Bugs: PIG-4059 and PIG-4854
https://issues.apache.org/jira/browse/PIG-4059
https://issues.apache.org/jira/browse/PIG-4854


Repository: pig-git


Description
---

The patch contains all the work done in the spark branch, so far.


Diffs (updated)
-

  bin/pig 81f1426 
  build.xml 99ba1f4 
  ivy.xml dd9878e 
  ivy/libraries.properties 3a819a5 
  shims/test/hadoop20/org/apache/pig/test/SparkMiniCluster.java PRE-CREATION 
  shims/test/hadoop23/org/apache/pig/test/SparkMiniCluster.java PRE-CREATION 
  shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java 792a1bd 
  shims/test/hadoop23/org/apache/pig/test/YarnMiniCluster.java PRE-CREATION 
  src/META-INF/services/org.apache.pig.ExecType 5c034c8 
  src/docs/src/documentation/content/xdocs/start.xml 36f9952 
  
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 1ff1abd 
  
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 ecf780c 
  
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
 2376d03 
  
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
 bcbfe2b 
  
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 d80951a 
  
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 21b75f1 
  
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
 52cfb73 
  
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 13f70c0 
  
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
 c3a82c3 
  src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java 
PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java 
PRE-CREATION 
  src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java 
PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
 PRE-CREATION 
  src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java 
PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
 PRE-CREATION 
  src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java 
PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java 
PRE-CREATION 
  src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java 
PRE-CREATION 
  src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java 
PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
 PRE-CREATION 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
 PRE-CRE

[jira] [Commented] (PIG-4810) Implement Merge join for spark engine

2016-06-13 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15328915#comment-15328915
 ] 

liyunzhang_intel commented on PIG-4810:
---

[~kexianda]: Ok, after PIG-4856 is resolved, PIG-4870 can be also fixed soon.

> Implement Merge join for spark engine
> -
>
> Key: PIG-4810
> URL: https://issues.apache.org/jira/browse/PIG-4810
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4810-2.patch, PIG-4810-3.patch, PIG-4810-4.patch, 
> PIG-4810-5.patch, PIG-4810.patch
>
>
> In current code base(a9151ac), we use regular join to implement merge join in 
> spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4903) Avoid add all spark dependency jars to SPARK_YARN_DIST_FILES and SPARK_DIST_CLASSPATH

2016-06-13 Thread Srikanth Sundarrajan (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1532#comment-1532
 ] 

Srikanth Sundarrajan commented on PIG-4903:
---

Looks largely ok, except the following:

{code}
+  elif [[ $f == "spark" ]]; then
+isSparkMode="true" 
+remaining[${#remaining[@]}]="$f"
+  elif [[ $f == "spark_local" || $f == "SPARK_LOCAL" ]]; then
+isSparkLocalMode="true"
+remaining[${#remaining[@]}]="$f"
{code}

Any argument with contents "spark", would assume the execution mode to be 
spark. Can we verify the previous argument and ensure that it is indeed -x?

> Avoid add all spark dependency jars to  SPARK_YARN_DIST_FILES and 
> SPARK_DIST_CLASSPATH
> --
>
> Key: PIG-4903
> URL: https://issues.apache.org/jira/browse/PIG-4903
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
> Attachments: PIG-4903.patch, PIG-4903_1.patch, PIG-4903_2.patch, 
> PIG-4903_3.patch
>
>
> There are some comments about bin/pig on 
> https://reviews.apache.org/r/45667/#comment198955.
> {code}
> # ADDING SPARK DEPENDENCIES ##
> # Spark typically works with a single assembly file. However this
> # assembly isn't available as a artifact to pull in via ivy.
> # To work around this short coming, we add all the jars barring
> # spark-yarn to DIST through dist-files and then add them to classpath
> # of the executors through an independent env variable. The reason
> # for excluding spark-yarn is because spark-yarn is already being added
> # by the spark-yarn-client via jarOf(Client.Class)
> for f in $PIG_HOME/lib/*.jar; do
> if [[ $f == $PIG_HOME/lib/spark-assembly* ]]; then
> # Exclude spark-assembly.jar from shipped jars, but retain in 
> classpath
> SPARK_JARS=${SPARK_JARS}:$f;
> else
> SPARK_JARS=${SPARK_JARS}:$f;
> SPARK_YARN_DIST_FILES=${SPARK_YARN_DIST_FILES},file://$f;
> SPARK_DIST_CLASSPATH=${SPARK_DIST_CLASSPATH}:\${PWD}/`basename $f`
> fi
> done
> CLASSPATH=${CLASSPATH}:${SPARK_JARS}
> export SPARK_YARN_DIST_FILES=`echo ${SPARK_YARN_DIST_FILES} | sed 's/^,//g'`
> export SPARK_JARS=${SPARK_YARN_DIST_FILES}
> export SPARK_DIST_CLASSPATH
> {code}
> Here we first copy all spark dependency jar like 
> spark-network-shuffle_2.10-1.6.1 jar to distcache(SPARK_YARN_DIST_FILES) then 
> add them to the classpath of executor(SPARK_DIST_CLASSPATH). Actually we need 
> not copy all these depency jar to SPARK_DIST_CLASSPATH because all these 
> dependency jars are included in spark-assembly.jar and spark-assembly.jar is 
> uploaded with the spark job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: Review Request 45667: Support Pig On Spark

2016-06-13 Thread Zhang, Liyun
Pallavi create this review board so I have not privilege to upload new patch to 
this review board, I have sent the new patch to Pallavi and later Pallavi will 
upload the patch.

-Original Message-
From: kelly zhang [mailto:nore...@reviews.apache.org] On Behalf Of kelly zhang
Sent: Tuesday, June 14, 2016 11:25 AM
To: Rohini Palaniswamy; Daniel Dai
Cc: Zhang, Liyun; Pallavi Rao; pig
Subject: Re: Review Request 45667: Support Pig On Spark



> On May 22, 2016, 9:57 p.m., Rohini Palaniswamy wrote:
> > test/org/apache/pig/test/TestBuiltin.java, line 3255 
> >  > line3255>
> >
> > This testcase is broken if you have 0-0 repeating twice. It is not 
> > UniqueID anymore.

0-0 repeating twice is because we use TaskID in UniqueID#exec:
public String exec(Tuple input) throws IOException {
String taskIndex = 
PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX);
String sequenceId = taskIndex + "-" + Long.toString(sequence);
sequence++;
return sequenceId;
}
in MR, we initialize PigContants.TASK_INDEX in  
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
protected void setup(Context context) throws IOException, InterruptedException {
   ...
context.getConfiguration().set(PigConstants.TASK_INDEX, 
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
...
}

But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to 
initialize PigContants.TASK_INDEX when job starts.
Suggest to file a new jira(Initialize PigContants.TASK_INDEX when spark job 
starts) and skip this unit test until this jira is resolved.


> On May 22, 2016, 9:57 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/data/SelfSpillBag.java, line 32 
> >  > line32>
> >
> > Why is bag even being serialized by Spark?

SelfSpillBag is used in TestHBaseStorage, if not mark it transient, 
NotSerializableExecption is thrown out


> On May 22, 2016, 9:57 p.m., Rohini Palaniswamy wrote:
> > test/org/apache/pig/newplan/logical/relational/TestLocationInPhysica
> > lPlan.java, line 66 
> >  > line66>
> >
> > Why does A[3,4] repeat?

The pig script is like:

LOAD '" + Util.encodeEscape(input.getAbsolutePath()) + "' using 
PigStorage();\n"
   "B = GROUP A BY $0;\n"
"A = FOREACH B GENERATE COUNT(A);\n"
   "STORE A INTO '" + Util.encodeEscape(output.getAbsolutePath()) + 
"';");

The spark plan is :
A: 
Store(/tmp/pig_junit_tmp1755582848/test6087259092054964214output:org.apache.pig.builtin.PigStorage)
 - scope-9
|
|---A: New For Each(false)[tuple] - scope-13
|   |
|   Project[bag][1] - scope-11
|   
|   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-12
|   |
|   |---Project[bag][1] - scope-28
|
|---Reduce By(false,false)[tuple] - scope-18
|   |
|   Project[bytearray][0] - scope-19
|   |
|   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - 
scope-20
|   |
|   |---Project[bag][1] - scope-21
|
|---B: Local Rearrange[tuple]{bytearray}(false) - scope-24
|   |
|   Project[bytearray][0] - scope-26
|
|---A: New For Each(false,false)[bag] - scope-14
|   |
|   Project[bytearray][0] - scope-15
|   |
|   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - 
scope-16
|   |
|   |---Project[bag][1] - scope-17
|
|---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-27
|
|---A: 
Load(/tmp/pig_junit_tmp1755582848/test7108242581632795697input:PigStorage) - 
scope-0

 There are two ForEach (scope-13 and scope-14) in the sparkplan so A[3,4] 
appears twice.

Comparing with MR plan:
#--
# Map Reduce Plan 
#--
MapReduce node scope-10
Map Plan
B: Local Rearrange[tuple]{bytearray}(false) - scope-22
|   |
|   Project[bytearray][0] - scope-24
|
|---A: New For Each(false,false)[bag] - scope-11
|   |
|   Project[bytearray][0] - scope-12
|   |
|   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-13
|   |
|   |---Project[bag][1] - scope-14
|
|---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-25
|
|---A: 
Load(/tmp/pig_junit_tmp910232853/test2548400580131197161input:PigStorage) - 
scope-0 Combine Plan
B: Local Rearrange[tuple]{bytearray}(false) - scope-26
|   |
|   Project[bytearray][0] - scope-28
|
|---A: New For Each(false,false)[bag] - scope-15
|   |
|   Project

[jira] [Commented] (PIG-4919) Upgrade spark.version to 1.6.1

2016-06-13 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15328874#comment-15328874
 ] 

liyunzhang_intel commented on PIG-4919:
---

[~xuefuz]: please merge it spark branch, thanks!

> Upgrade spark.version to 1.6.1
> --
>
> Key: PIG-4919
> URL: https://issues.apache.org/jira/browse/PIG-4919
> Project: Pig
>  Issue Type: Sub-task
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-4919.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 45667: Support Pig On Spark

2016-06-13 Thread kelly zhang


> On May 22, 2016, 9:57 p.m., Rohini Palaniswamy wrote:
> > test/org/apache/pig/test/TestBuiltin.java, line 3255
> > 
> >
> > This testcase is broken if you have 0-0 repeating twice. It is not 
> > UniqueID anymore.

0-0 repeating twice is because we use TaskID in UniqueID#exec:
public String exec(Tuple input) throws IOException {
String taskIndex = 
PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX);
String sequenceId = taskIndex + "-" + Long.toString(sequence);
sequence++;
return sequenceId;
}
in MR, we initialize PigContants.TASK_INDEX in  
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
 
protected void setup(Context context) throws IOException, InterruptedException {
   ...
context.getConfiguration().set(PigConstants.TASK_INDEX, 
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
...
}

But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to 
initialize PigContants.TASK_INDEX when job starts.
Suggest to file a new jira(Initialize PigContants.TASK_INDEX when spark job 
starts) and skip this unit test until this jira is resolved.


> On May 22, 2016, 9:57 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/data/SelfSpillBag.java, line 32
> > 
> >
> > Why is bag even being serialized by Spark?

SelfSpillBag is used in TestHBaseStorage, if not mark it transient, 
NotSerializableExecption is thrown out


> On May 22, 2016, 9:57 p.m., Rohini Palaniswamy wrote:
> > test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java,
> >  line 66
> > 
> >
> > Why does A[3,4] repeat?

The pig script is like:

LOAD '" + Util.encodeEscape(input.getAbsolutePath()) + "' using 
PigStorage();\n"
   "B = GROUP A BY $0;\n"
"A = FOREACH B GENERATE COUNT(A);\n"
   "STORE A INTO '" + Util.encodeEscape(output.getAbsolutePath()) + 
"';");

The spark plan is :
A: 
Store(/tmp/pig_junit_tmp1755582848/test6087259092054964214output:org.apache.pig.builtin.PigStorage)
 - scope-9
|
|---A: New For Each(false)[tuple] - scope-13
|   |
|   Project[bag][1] - scope-11
|   
|   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-12
|   |
|   |---Project[bag][1] - scope-28
|
|---Reduce By(false,false)[tuple] - scope-18
|   |
|   Project[bytearray][0] - scope-19
|   |
|   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - 
scope-20
|   |
|   |---Project[bag][1] - scope-21
|
|---B: Local Rearrange[tuple]{bytearray}(false) - scope-24
|   |
|   Project[bytearray][0] - scope-26
|
|---A: New For Each(false,false)[bag] - scope-14
|   |
|   Project[bytearray][0] - scope-15
|   |
|   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - 
scope-16
|   |
|   |---Project[bag][1] - scope-17
|
|---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-27
|
|---A: 
Load(/tmp/pig_junit_tmp1755582848/test7108242581632795697input:PigStorage) - 
scope-0

 There are two ForEach (scope-13 and scope-14) in the sparkplan so A[3,4] 
appears twice.

Comparing with MR plan:
#--
# Map Reduce Plan 
#--
MapReduce node scope-10
Map Plan
B: Local Rearrange[tuple]{bytearray}(false) - scope-22
|   |
|   Project[bytearray][0] - scope-24
|
|---A: New For Each(false,false)[bag] - scope-11
|   |
|   Project[bytearray][0] - scope-12
|   |
|   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-13
|   |
|   |---Project[bag][1] - scope-14
|
|---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-25
|
|---A: 
Load(/tmp/pig_junit_tmp910232853/test2548400580131197161input:PigStorage) - 
scope-0
Combine Plan
B: Local Rearrange[tuple]{bytearray}(false) - scope-26
|   |
|   Project[bytearray][0] - scope-28
|
|---A: New For Each(false,false)[bag] - scope-15
|   |
|   Project[bytearray][0] - scope-16
|   |
|   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-17
|   |
|   |---Project[bag][1] - scope-18
|
|---B: Package(CombinerPackager)[tuple]{bytearray} - scope-21
Reduce Plan
A: 
Store(/tmp/pig_junit_tmp910232853/test9096852332434708302output:org.apache.pig.builtin.PigStorage)
 - scope-9
|
|---A: New For Each(false)[bag] - scope-8
|   |
|   POUserFunc(org.apache.pig.builtin.

[jira] [Commented] (PIG-4919) Upgrade spark.version to 1.6.1

2016-06-13 Thread Mohit Sabharwal (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15328855#comment-15328855
 ] 

Mohit Sabharwal commented on PIG-4919:
--

+1 (non-binding)

> Upgrade spark.version to 1.6.1
> --
>
> Key: PIG-4919
> URL: https://issues.apache.org/jira/browse/PIG-4919
> Project: Pig
>  Issue Type: Sub-task
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-4919.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4810) Implement Merge join for spark engine

2016-06-13 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15328798#comment-15328798
 ] 

Xianda Ke commented on PIG-4810:


Hi [~kellyzly], Thanks for your comments. 
1. setReplication() make sense. Thanks.
2. MergeJoin require sorted data as input. MergeJoin optimization will fail UT. 
That why ORDER query is added.
3. I will fix indent issue.

I will update the patch soon.

> Implement Merge join for spark engine
> -
>
> Key: PIG-4810
> URL: https://issues.apache.org/jira/browse/PIG-4810
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4810-2.patch, PIG-4810-3.patch, PIG-4810-4.patch, 
> PIG-4810-5.patch, PIG-4810.patch
>
>
> In current code base(a9151ac), we use regular join to implement merge join in 
> spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)