[jira] [Commented] (PIG-5047) support outer join for skewedjoin in spark mode

2017-04-24 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982236#comment-15982236
 ] 

Xianda Ke commented on PIG-5047:


[~kellyzly], Please help review and commit. Thanks.

> support outer join for skewedjoin in spark mode
> ---
>
> Key: PIG-5047
> URL: https://issues.apache.org/jira/browse/PIG-5047
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5047.patch
>
>
> currently, skewedjoin does't support outer join.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5186) Support aggregate warnings with Spark engine

2017-04-21 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978343#comment-15978343
 ] 

Xianda Ke commented on PIG-5186:


it is nice to make Counter/CounterGroup generic.  LGTM,  +1(non-binding)
[~kellyzly], Nandor also have reviewed it in RB. Please help review and commit.

> Support aggregate warnings with Spark engine
> 
>
> Key: PIG-5186
> URL: https://issues.apache.org/jira/browse/PIG-5186
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Adam Szita
>Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5186.0.patch, PIG-5186.1.patch, PIG-5186.2.patch
>
>
> Looks like we don't get aggregate warning stats when using Spark as exec 
> engine:
> {code}
> ./test_harness.pl::TestDriverPig::compareScript INFO Check failed: regex 
> match of  expected in stderr
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (PIG-5170) SkewedJoin_14 is failing with spark exec type

2017-04-19 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke resolved PIG-5170.

Resolution: Fixed

fixed. see  PIG-5047

> SkewedJoin_14 is failing with spark exec type
> -
>
> Key: PIG-5170
> URL: https://issues.apache.org/jira/browse/PIG-5170
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> results are different



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5047) support outer join for skewedjoin in spark mode

2017-04-19 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974304#comment-15974304
 ] 

Xianda Ke commented on PIG-5047:


1. all SkewedJoin e2e test cases passed.   SkewedJoin_14 is similar with 
PIG-4587,  I've already update this fix within the patch(PIG-5047.patch).
2. All UT cases in TestSkewedJoin  passed.



> support outer join for skewedjoin in spark mode
> ---
>
> Key: PIG-5047
> URL: https://issues.apache.org/jira/browse/PIG-5047
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5047.patch
>
>
> currently, skewedjoin does't support outer join.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5047) support outer join for skewedjoin in spark mode

2017-04-19 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974271#comment-15974271
 ] 

Xianda Ke commented on PIG-5047:


Hi [~nkollar], [~szita], [~kellyzly],  Please help review this jira when you 
are free. Thanks.
Regards,
Xianda

> support outer join for skewedjoin in spark mode
> ---
>
> Key: PIG-5047
> URL: https://issues.apache.org/jira/browse/PIG-5047
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5047.patch
>
>
> currently, skewedjoin does't support outer join.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (PIG-5047) support outer join for skewedjoin in spark mode

2017-04-19 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5047:
---
Attachment: PIG-5047.patch

based on PIG-4858,  we support outer join.

> support outer join for skewedjoin in spark mode
> ---
>
> Key: PIG-5047
> URL: https://issues.apache.org/jira/browse/PIG-5047
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5047.patch
>
>
> currently, skewedjoin does't support outer join.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (PIG-5206) Support outer join for SkewedJoin in spark mode

2017-04-19 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke resolved PIG-5206.

  Resolution: Duplicate
Release Note: we've already create PIG-5047. the same issue, close this one.

> Support outer join for SkewedJoin in spark mode
> ---
>
> Key: PIG-5206
> URL: https://issues.apache.org/jira/browse/PIG-5206
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> Skewed join currently works with two-table inner join.
> It should support outer join.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Reopened] (PIG-5170) SkewedJoin_14 is failing with spark exec type

2017-04-19 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke reopened PIG-5170:

  Assignee: Xianda Ke

after apply the outer join patch. other SkewdJoin e2e cases passed, but 
SkewedJoin_14 still failed.  after investigation, this bug is similar with 
PIG-4587.

For the streaming table, since more than one reducer can be associated with a 
key, the streamed table records (that match the key) needs to be copied over to 
each of these reducers(see PIG-4858 for details).  if it is outer join, no need 
to join the null left record to all the copied records, only join the first one.

i reopen this jira, and will update the fix patch soon.

> SkewedJoin_14 is failing with spark exec type
> -
>
> Key: PIG-5170
> URL: https://issues.apache.org/jira/browse/PIG-5170
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> results are different



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5212) SkewedJoin_6 is failing on Spark

2017-04-18 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972174#comment-15972174
 ] 

Xianda Ke commented on PIG-5212:


hi [~kellyzly], these is merge conflict. would you please rebase the patch?
I'll help review this jira.

> SkewedJoin_6 is failing on Spark
> 
>
> Key: PIG-5212
> URL: https://issues.apache.org/jira/browse/PIG-5212
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5212.patch
>
>
> result are different:
> {code}
> diff <(head -20 SkewedJoin_6_benchmark.out/out_sorted) <(head -20 
> SkewedJoin_6.out/out_sorted)
> < alice allen 19  1.930   alice allen 27  1.950
> < alice allen 19  1.930   alice allen 34  1.230
> < alice allen 19  1.930   alice allen 36  2.270
> < alice allen 19  1.930   alice allen 38  0.810
> < alice allen 19  1.930   alice allen 38  1.800
> < alice allen 19  1.930   alice allen 42  2.460
> < alice allen 19  1.930   alice allen 43  0.880
> < alice allen 19  1.930   alice allen 45  2.800
> < alice allen 19  1.930   alice allen 46  3.970
> < alice allen 19  1.930   alice allen 51  1.080
> < alice allen 19  1.930   alice allen 68  3.390
> < alice allen 19  1.930   alice allen 68  3.510
> < alice allen 19  1.930   alice allen 72  1.750
> < alice allen 19  1.930   alice allen 72  3.630
> < alice allen 19  1.930   alice allen 74  0.020
> < alice allen 19  1.930   alice allen 74  2.400
> < alice allen 19  1.930   alice allen 77  2.520
> < alice allen 20  2.470   alice allen 27  1.950
> < alice allen 20  2.470   alice allen 34  1.230
> < alice allen 20  2.470   alice allen 36  2.270
> ---
> > alice allen 27  1.950   alice allen 19  1.930
> > alice allen 27  1.950   alice allen 20  2.470
> > alice allen 27  1.950   alice allen 27  1.950
> > alice allen 27  1.950   alice allen 34  1.230
> > alice allen 27  1.950   alice allen 36  2.270
> > alice allen 27  1.950   alice allen 38  0.810
> > alice allen 27  1.950   alice allen 38  1.800
> > alice allen 27  1.950   alice allen 42  2.460
> > alice allen 27  1.950   alice allen 43  0.880
> > alice allen 27  1.950   alice allen 45  2.800
> > alice allen 27  1.950   alice allen 46  3.970
> > alice allen 27  1.950   alice allen 51  1.080
> > alice allen 27  1.950   alice allen 68  3.390
> > alice allen 27  1.950   alice allen 68  3.510
> > alice allen 27  1.950   alice allen 72  1.750
> > alice allen 27  1.950   alice allen 72  3.630
> > alice allen 27  1.950   alice allen 74  0.020
> > alice allen 27  1.950   alice allen 74  2.400
> > alice allen 27  1.950   alice allen 77  2.520
> > alice allen 34  1.230   alice allen 19  1.930
> {code}
> It looks like the two tables are in wrong order, columns from 'a' should come 
> first, then columns from 'b'. In spark mode this is inverted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5212) SkewedJoin_6 is failing on Spark

2017-04-09 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962336#comment-15962336
 ] 

Xianda Ke commented on PIG-5212:


[~nkollar] OK, I will investigate this issue.
I have found a bug in sampling stage when run SkewedJoin_14. I am not sure 
whether this jira has the same root cause, I will investigate it.

> SkewedJoin_6 is failing on Spark
> 
>
> Key: PIG-5212
> URL: https://issues.apache.org/jira/browse/PIG-5212
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> result are different:
> {code}
> diff <(head -20 SkewedJoin_6_benchmark.out/out_sorted) <(head -20 
> SkewedJoin_6.out/out_sorted)
> < alice allen 19  1.930   alice allen 27  1.950
> < alice allen 19  1.930   alice allen 34  1.230
> < alice allen 19  1.930   alice allen 36  2.270
> < alice allen 19  1.930   alice allen 38  0.810
> < alice allen 19  1.930   alice allen 38  1.800
> < alice allen 19  1.930   alice allen 42  2.460
> < alice allen 19  1.930   alice allen 43  0.880
> < alice allen 19  1.930   alice allen 45  2.800
> < alice allen 19  1.930   alice allen 46  3.970
> < alice allen 19  1.930   alice allen 51  1.080
> < alice allen 19  1.930   alice allen 68  3.390
> < alice allen 19  1.930   alice allen 68  3.510
> < alice allen 19  1.930   alice allen 72  1.750
> < alice allen 19  1.930   alice allen 72  3.630
> < alice allen 19  1.930   alice allen 74  0.020
> < alice allen 19  1.930   alice allen 74  2.400
> < alice allen 19  1.930   alice allen 77  2.520
> < alice allen 20  2.470   alice allen 27  1.950
> < alice allen 20  2.470   alice allen 34  1.230
> < alice allen 20  2.470   alice allen 36  2.270
> ---
> > alice allen 27  1.950   alice allen 19  1.930
> > alice allen 27  1.950   alice allen 20  2.470
> > alice allen 27  1.950   alice allen 27  1.950
> > alice allen 27  1.950   alice allen 34  1.230
> > alice allen 27  1.950   alice allen 36  2.270
> > alice allen 27  1.950   alice allen 38  0.810
> > alice allen 27  1.950   alice allen 38  1.800
> > alice allen 27  1.950   alice allen 42  2.460
> > alice allen 27  1.950   alice allen 43  0.880
> > alice allen 27  1.950   alice allen 45  2.800
> > alice allen 27  1.950   alice allen 46  3.970
> > alice allen 27  1.950   alice allen 51  1.080
> > alice allen 27  1.950   alice allen 68  3.390
> > alice allen 27  1.950   alice allen 68  3.510
> > alice allen 27  1.950   alice allen 72  1.750
> > alice allen 27  1.950   alice allen 72  3.630
> > alice allen 27  1.950   alice allen 74  0.020
> > alice allen 27  1.950   alice allen 74  2.400
> > alice allen 27  1.950   alice allen 77  2.520
> > alice allen 34  1.230   alice allen 19  1.930
> {code}
> It looks like the two tables are in wrong order, columns from 'a' should come 
> first, then columns from 'b'. In spark mode this is inverted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (PIG-5170) SkewedJoin_14 is failing with spark exec type

2017-03-31 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950944#comment-15950944
 ] 

Xianda Ke edited comment on PIG-5170 at 3/31/17 2:04 PM:
-

Hi [~nkollar],  I prefer to support PIG-5206 in 0.17. [~kellyzly], any comments?
the development is ongoing, almost finished. my own ut cases passed. I will 
verify e2e cases and upload the patch for review soon.


was (Author: kexianda):
Hi [~nkollar],  I prefer to support PIG-5026 in 0.17. [~kellyzly], any comments?
the development is ongoing, almost finished. my own ut cases passed. I will 
verify e2e cases and upload the patch for review soon.

> SkewedJoin_14 is failing with spark exec type
> -
>
> Key: PIG-5170
> URL: https://issues.apache.org/jira/browse/PIG-5170
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> results are different



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5170) SkewedJoin_14 is failing with spark exec type

2017-03-31 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950944#comment-15950944
 ] 

Xianda Ke commented on PIG-5170:


Hi [~nkollar],  I prefer to support PIG-5026 in 0.17. [~kellyzly], any comments?
the development is ongoing, almost finished. my own ut cases passed. I will 
verify e2e cases and upload the patch for review soon.

> SkewedJoin_14 is failing with spark exec type
> -
>
> Key: PIG-5170
> URL: https://issues.apache.org/jira/browse/PIG-5170
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> results are different



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5168) SkewedJoin_12 is failing with spark exec type

2017-03-31 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950558#comment-15950558
 ] 

Xianda Ke commented on PIG-5168:


after investigation, it fails because skewedjoin currently doesn't support 
outer join. see PIG-5206

> SkewedJoin_12 is failing with spark exec type
> -
>
> Key: PIG-5168
> URL: https://issues.apache.org/jira/browse/PIG-5168
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> spark mode didn't produce any results (empty file)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5169) SkewedJoin_13 is failing with spark exec type

2017-03-31 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950559#comment-15950559
 ] 

Xianda Ke commented on PIG-5169:


after investigation, it fails because skewedjoin currently doesn't support 
outer join. see PIG-5206

> SkewedJoin_13 is failing with spark exec type
> -
>
> Key: PIG-5169
> URL: https://issues.apache.org/jira/browse/PIG-5169
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> different output produced



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5170) SkewedJoin_14 is failing with spark exec type

2017-03-31 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950560#comment-15950560
 ] 

Xianda Ke commented on PIG-5170:


after investigation, it fails because skewedjoin currently doesn't support 
outer join. see PIG-5206

> SkewedJoin_14 is failing with spark exec type
> -
>
> Key: PIG-5170
> URL: https://issues.apache.org/jira/browse/PIG-5170
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> results are different



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (PIG-5206) Support outer join for SkewedJoin in spark mode

2017-03-31 Thread Xianda Ke (JIRA)
Xianda Ke created PIG-5206:
--

 Summary: Support outer join for SkewedJoin in spark mode
 Key: PIG-5206
 URL: https://issues.apache.org/jira/browse/PIG-5206
 Project: Pig
  Issue Type: Sub-task
Reporter: Xianda Ke


Skewed join currently works with two-table inner join.
It should support outer join.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (PIG-5206) Support outer join for SkewedJoin in spark mode

2017-03-31 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke reassigned PIG-5206:
--

Assignee: Xianda Ke

> Support outer join for SkewedJoin in spark mode
> ---
>
> Key: PIG-5206
> URL: https://issues.apache.org/jira/browse/PIG-5206
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> Skewed join currently works with two-table inner join.
> It should support outer join.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (PIG-4858) Implement Skewed join for spark engine

2017-03-31 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4858:
---
Attachment: PIG-4858_8.patch

refine the code in SparkCompiler.  Use getSkewedJoinSampleJob() 
[~kellyzly], please help review.

TestSkewedJoin passed.

```shell
patch -p0 < PIG-4858_8.patch
```

> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858_2.patch, PIG-4858_3.patch, PIG-4858_4.patch, 
> PIG-4858_6.patch, PIG-4858_7.patch, PIG-4858_8.patch, PIG-4858.patch, 
> SkewedJoinInSparkMode.pdf
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (PIG-4858) Implement Skewed join for spark engine

2017-03-31 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4858:
---
Attachment: PIG-4858_7.patch

minor change: enable UT case for spark engine.
[~kellyzly], please help review. 

> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858_2.patch, PIG-4858_3.patch, PIG-4858_4.patch, 
> PIG-4858_6.patch, PIG-4858_7.patch, PIG-4858.patch, SkewedJoinInSparkMode.pdf
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (PIG-4858) Implement Skewed join for spark engine

2017-03-31 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4858:
---
Attachment: PIG-4858_6.patch

1. resovle merge conflicts, and some minor import statements change.
2. enable UT case for skewedjoin

[~kellyzly], please help review. Thanks.


> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858_2.patch, PIG-4858_3.patch, PIG-4858_4.patch, 
> PIG-4858_6.patch, PIG-4858.patch, SkewedJoinInSparkMode.pdf
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (PIG-5169) SkewedJoin_13 is failing with spark exec type

2017-03-28 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke reassigned PIG-5169:
--

Assignee: Xianda Ke

> SkewedJoin_13 is failing with spark exec type
> -
>
> Key: PIG-5169
> URL: https://issues.apache.org/jira/browse/PIG-5169
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> different output produced



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (PIG-5170) SkewedJoin_14 is failing with spark exec type

2017-03-28 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke reassigned PIG-5170:
--

Assignee: Xianda Ke

> SkewedJoin_14 is failing with spark exec type
> -
>
> Key: PIG-5170
> URL: https://issues.apache.org/jira/browse/PIG-5170
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> results are different



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (PIG-5168) SkewedJoin_12 is failing with spark exec type

2017-03-28 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke reassigned PIG-5168:
--

Assignee: Xianda Ke

> SkewedJoin_12 is failing with spark exec type
> -
>
> Key: PIG-5168
> URL: https://issues.apache.org/jira/browse/PIG-5168
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Nandor Kollar
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> spark mode didn't produce any results (empty file)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (PIG-5196) Enable persist/cache mechanism in Pig

2017-03-23 Thread Xianda Ke (JIRA)
Xianda Ke created PIG-5196:
--

 Summary: Enable persist/cache mechanism in Pig
 Key: PIG-5196
 URL: https://issues.apache.org/jira/browse/PIG-5196
 Project: Pig
  Issue Type: Sub-task
  Components: spark
Reporter: Xianda Ke
Assignee: Xianda Ke


We can get performance benefit from persist/cache mechanism.
With persist/cache mechanism, we can optimize current implementation of multi 
query, specialized Joins



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (PIG-4858) Implement Skewed join for spark engine

2017-03-15 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4858:
---
Attachment: PIG-4858_4.patch

Hi [~kellyzly],  please help review this patch when your are free.

1.  based on PIG-5044. rewrite SparkCompiler.getSkewedJoinJob(), 
broadcasting the sampling index.  Some code is duplicated with 
SparkComiler.getSamplingJob(), because SparkComiler.getSamplingJob() is too 
big. it needs to be refactored. I will file a new jira for this.

2. merge the fix from PIG-3417

3. currently, skewed join does not support outer join.  I will file a new jira 
for this. 


> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858_2.patch, PIG-4858_3.patch, PIG-4858_4.patch, 
> PIG-4858.patch, SkewedJoinInSparkMode.pdf
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (PIG-5125) refactor POPoissonSampleSpark

2017-02-04 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5125:
---
Description: 
To avoid merge conflicts, POPoissonSampleSpark doesn't extend POPoissonSample 
now.  After spark branch merged into trunk, we should refactor 
POPoissonSampleSpark
see the discussion in PIG-5044

  was:
To avoid merge conflicts, POPoissonSampleSpark doesn't extend POPoissonSample 
now.  After spark branch merged into trunk, we should refactor 
POPoissonSampleSpark
see the discussion in PIG-4856


> refactor POPoissonSampleSpark
> -
>
> Key: PIG-5125
> URL: https://issues.apache.org/jira/browse/PIG-5125
> Project: Pig
>  Issue Type: Improvement
>Reporter: Xianda Ke
>Assignee: Xianda Ke
>Priority: Minor
>
> To avoid merge conflicts, POPoissonSampleSpark doesn't extend POPoissonSample 
> now.  After spark branch merged into trunk, we should refactor 
> POPoissonSampleSpark
> see the discussion in PIG-5044



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (PIG-5125) refactor POPoissonSampleSpark

2017-02-04 Thread Xianda Ke (JIRA)
Xianda Ke created PIG-5125:
--

 Summary: refactor POPoissonSampleSpark
 Key: PIG-5125
 URL: https://issues.apache.org/jira/browse/PIG-5125
 Project: Pig
  Issue Type: Improvement
Reporter: Xianda Ke
Assignee: Xianda Ke
Priority: Minor


To avoid merge conflicts, POPoissonSampleSpark doesn't extend POPoissonSample 
now.  After spark branch merged into trunk, we should refactor 
POPoissonSampleSpark
see the discussion in PIG-4856



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-4858) Implement Skewed join for spark engine

2016-12-15 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15753547#comment-15753547
 ] 

Xianda Ke commented on PIG-4858:


Hi [~kellyzly], Currently,the function getSamplingJob() in PIG-5044's patch is 
not suitable for PIG-4858. Because SkewedJoin in Spark doesn't use a UDF for 
sampling now, just like it in Tez. It uses POPoissonSampleSpark for sampling. 

Yes, part of its sampling logic are the same. Such as sorting the sampling 
result and setting parallelism.
>From my point of view. Firstly, we can try to finished these two feature 
>independently. Then, we can refactor the code later on and break 
>getSamplingJob to small functions and extract common functions, so that 
>SkewedJoin can re-use them.

> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858.patch, PIG-4858_2.patch, PIG-4858_3.patch, 
> SkewedJoinInSparkMode.pdf
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4858) Implement Skewed join for spark engine

2016-12-14 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4858:
---
Attachment: PIG-4858_3.patch

small changes.
1. refine code  2. use SparkUtil.getParallelis()

[~kellyzly], please help review when you are free. Thanks.

> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858.patch, PIG-4858_2.patch, PIG-4858_3.patch, 
> SkewedJoinInSparkMode.pdf
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4952) Calculate the value of parallism for spark mode

2016-12-07 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15730669#comment-15730669
 ] 

Xianda Ke commented on PIG-4952:


Hi [~kellyzly] & [~nkollar],

how about this:

{code}
// if spark has default parallelism conf
if (sc.conf().contains("spark.default.parallelism")) {
parallelism = sc.defaultParallelism();
} else {
 // use the max partitions number of parent RDD
// find out max partitions number
int maxPartitions = -1;
for (int i = 0; i < predRDDs.size(); i++) {
if (predRDDs.get(i).partitions().length > maxPartitions) {
maxPartitions = predRDDs.get(i).partitions().length;
}
}
parallelism = maxPartitions;
}
{code}

> Calculate the value of parallism for spark mode
> ---
>
> Key: PIG-4952
> URL: https://issues.apache.org/jira/browse/PIG-4952
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4952.patch, PIG-4952_1.patch
>
>
> Calculate the value of parallism for spark mode like what 
> org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter
>  does.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4952) Calculate the value of parallism for spark mode

2016-12-07 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15728138#comment-15728138
 ] 

Xianda Ke commented on PIG-4952:


max parallelism value from parent RDDs seems OK.

LGTM
+1

> Calculate the value of parallism for spark mode
> ---
>
> Key: PIG-4952
> URL: https://issues.apache.org/jira/browse/PIG-4952
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4952.patch, PIG-4952_1.patch
>
>
> Calculate the value of parallism for spark mode like what 
> org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter
>  does.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5068) Set SPARK_REDUCERS by pig.properties not by system configuration

2016-11-23 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692244#comment-15692244
 ] 

Xianda Ke commented on PIG-5068:


LGTM
+1(non-binding)

> Set SPARK_REDUCERS by pig.properties not by system configuration
> 
>
> Key: PIG-5068
> URL: https://issues.apache.org/jira/browse/PIG-5068
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-5068.patch, PIG-5068_1.patch, PIG-5068_2.patch
>
>
> In SparkUtil.java, we set the SPARK_REDUCERS by system configuration
> {code}
> public static int getParallelism(List predecessors,
> PhysicalOperator physicalOperator) {
> String numReducers = System.getenv("SPARK_REDUCERS");
> if (numReducers != null) {
> return Integer.parseInt(numReducers);
> }
> int parallelism = physicalOperator.getRequestedParallelism();
> if (parallelism <= 0) {
> // Parallelism wasn't set in Pig, so set it to whatever Spark 
> thinks
> // is reasonable.
> parallelism = predecessors.get(0).context().defaultParallelism();
> }
> return parallelism;
> }
> {code}
> It is better to set it by pig.properties



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5068) Set SPARK_REDUCERS by pig.properties not by system configuration

2016-11-23 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692167#comment-15692167
 ] 

Xianda Ke commented on PIG-5068:


[~kellyzly],  static variable in SparkUtil  is not thread-safe.


> Set SPARK_REDUCERS by pig.properties not by system configuration
> 
>
> Key: PIG-5068
> URL: https://issues.apache.org/jira/browse/PIG-5068
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-5068.patch, PIG-5068_1.patch
>
>
> In SparkUtil.java, we set the SPARK_REDUCERS by system configuration
> {code}
> public static int getParallelism(List predecessors,
> PhysicalOperator physicalOperator) {
> String numReducers = System.getenv("SPARK_REDUCERS");
> if (numReducers != null) {
> return Integer.parseInt(numReducers);
> }
> int parallelism = physicalOperator.getRequestedParallelism();
> if (parallelism <= 0) {
> // Parallelism wasn't set in Pig, so set it to whatever Spark 
> thinks
> // is reasonable.
> parallelism = predecessors.get(0).context().defaultParallelism();
> }
> return parallelism;
> }
> {code}
> It is better to set it by pig.properties



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5068) Set SPARK_REDUCERS by pig.properties not by system configuration

2016-11-22 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689248#comment-15689248
 ] 

Xianda Ke commented on PIG-5068:


[~kellyzly],
1. agree with you. it would be better to remove the conf from system 
environment.

2. from my point of view, it seem no need to add such a simple class which just 
store a single integer value. 

3. in fact, i have also write such a function for skewed join.  i paste it here 
for your information.
SkewedJoinConverter.java
{code}
private int getDefaultParallelism(List predRDDs) {

int parallelism = -1;

SparkContext sc = predRDDs.get(0).context();
if (parallelism < 0) {
if (sc.conf().contains("spark.default.parallelism")) {
parallelism = sc.defaultParallelism();
} else {
// find out max partitions number
int maxPartitions = -1;
for (int i = 0; i < predRDDs.size(); i++) {
if (predRDDs.get(i).partitions().length > maxPartitions) {
maxPartitions = predRDDs.get(i).partitions().length;
}
}
parallelism = maxPartitions;
}
}

return parallelism;
}
{code}
in this function, i have handled this case:  when 
sc.conf().contains("spark.default.parallelism") is false. 
I have a glance at SparkContext.scala and MesosSchedculerBackend.scala.  if 
sparkcontext.conf does not contain "spark.default.parallelism", the 
defalutParallelism depands on TaskScheduler. for instance, 
MesosSchedulerBackend will return 8. If we pick the partition number of 
preceding RDD, it would be better.  But I didn't parse PigContext's property 
"spark.reducers".

from my point of view, it would be better if we combine them.
here is my proposal: a util functionin in SparkUtil.java
{code}
public static int getParallelism(List predecessors,
PhysicalOperator physicalOperator) {

int parallelism = -1;
String sparkReducers = 
pigContext.getProperties().getProperty("spark.reducers");
if (sparkReducers != null) {
return Integer.parseInt(sparkReducers);
}

int parallelism = physicalOperator.getRequestedParallelism();
if (parallelism > 0) {
return parallelism;  
}

// Parallelism wasn't set in Pig, so set it to whatever Spark thinks
// is reasonable.
SparkContext sc = predecessors.get(0).context();
parallelism = sc.defaultParallelism();
if (sc.conf().contains("spark.default.parallelism")) {
parallelism = sc.defaultParallelism();
} else {
// find out max partitions number
int maxPartitions = -1;
for (int i = 0; i < predecessors.size(); i++) {
if (predecessors.get(i).partitions().length > maxPartitions) {
maxPartitions = predecessors.get(i).partitions().length;
}
}
parallelism = maxPartitions;
}   

return parallelism; 
}
{code}



> Set SPARK_REDUCERS by pig.properties not by system configuration
> 
>
> Key: PIG-5068
> URL: https://issues.apache.org/jira/browse/PIG-5068
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-5068.patch
>
>
> In SparkUtil.java, we set the SPARK_REDUCERS by system configuration
> {code}
> public static int getParallelism(List predecessors,
> PhysicalOperator physicalOperator) {
> String numReducers = System.getenv("SPARK_REDUCERS");
> if (numReducers != null) {
> return Integer.parseInt(numReducers);
> }
> int parallelism = physicalOperator.getRequestedParallelism();
> if (parallelism <= 0) {
> // Parallelism wasn't set in Pig, so set it to whatever Spark 
> thinks
> // is reasonable.
> parallelism = predecessors.get(0).context().defaultParallelism();
> }
> return parallelism;
> }
> {code}
> It is better to set it by pig.properties



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-11-22 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688696#comment-15688696
 ] 

Xianda Ke commented on PIG-5029:


Hi [~kellyzly],

Salted key solution seem OK.  JDK's Random is a pseudorandom number generator 
(PRNG), also known as a deterministic random bit generator DRBG. If the seed 
task-id is same, random numbers sequence will be the same.

I hava a question: if user did not set the RequestedParallelism in the script. 
what's default setRequestedParallelism?  Do we have to handle the default 
parallelism at runtime?

> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, PIG-5029_2.patch, PIG-5029_3.patch, 
> PIG-5051_5029_5.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (PIG-5052) Initialize MRConfiguration.JOB_ID in spark mode correctly

2016-11-01 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15627352#comment-15627352
 ] 

Xianda Ke edited comment on PIG-5052 at 11/2/16 1:55 AM:
-

LGTM
+1 (non-binding)


was (Author: kexianda):
LGTM
+1 (non-bingd)

> Initialize MRConfiguration.JOB_ID in spark mode correctly
> -
>
> Key: PIG-5052
> URL: https://issues.apache.org/jira/browse/PIG-5052
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5052.patch
>
>
> currently, we initialize MRConfiguration.JOB_ID in SparkUtil#newJobConf.  
> we just set the value as a random string.
> {code}
> jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
> {code}
> We need to find a spark api to initiliaze it correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (PIG-5052) Initialize MRConfiguration.JOB_ID in spark mode correctly

2016-11-01 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke reassigned PIG-5052:
--

Assignee: Xianda Ke  (was: liyunzhang_intel)

> Initialize MRConfiguration.JOB_ID in spark mode correctly
> -
>
> Key: PIG-5052
> URL: https://issues.apache.org/jira/browse/PIG-5052
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5052.patch
>
>
> currently, we initialize MRConfiguration.JOB_ID in SparkUtil#newJobConf.  
> we just set the value as a random string.
> {code}
> jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
> {code}
> We need to find a spark api to initiliaze it correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5052) Initialize MRConfiguration.JOB_ID in spark mode correctly

2016-11-01 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15627352#comment-15627352
 ] 

Xianda Ke commented on PIG-5052:


LGTM
+1 (non-bingd)

> Initialize MRConfiguration.JOB_ID in spark mode correctly
> -
>
> Key: PIG-5052
> URL: https://issues.apache.org/jira/browse/PIG-5052
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5052.patch
>
>
> currently, we initialize MRConfiguration.JOB_ID in SparkUtil#newJobConf.  
> we just set the value as a random string.
> {code}
> jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
> {code}
> We need to find a spark api to initiliaze it correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly

2016-10-30 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15621022#comment-15621022
 ] 

Xianda Ke commented on PIG-5051:


svn-git sync failed? it seems OK now.

LGTM, +1 (non-binding)


> Initialize PigContants.TASK_INDEX in spark mode correctly
> -
>
> Key: PIG-5051
> URL: https://issues.apache.org/jira/browse/PIG-5051
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4920_6_5051.patch, PIG-5051.patch
>
>
> in MR, we initialize PigContants.TASK_INDEX in  
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
>  
> {code}
> protected void setup(Context context) throws IOException, 
> InterruptedException {
>...
> context.getConfiguration().set(PigConstants.TASK_INDEX, 
> Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
> ...
> }
> {code}
> But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to 
> initialize PigContants.TASK_INDEX when job starts. We need find a solution to 
> initialize PigContants.TASK_INDEX correctly.
> After this jira is fixed.  The behavior of TestBuiltin#testUniqueID in spark 
> mode will be same with what in mr.
> Now we divide two cases in  TestBuiltin#testUniqueID
> {code}
>  @Test
> public void testUniqueID() throws Exception {
>  ...
> if (!Util.isSparkExecType(cluster.getExecType())) {
> assertEquals("0-0", iter.next().get(1));
> assertEquals("0-1", iter.next().get(1));
> assertEquals("0-2", iter.next().get(1));
> assertEquals("0-3", iter.next().get(1));
> assertEquals("0-4", iter.next().get(1));
> assertEquals("1-0", iter.next().get(1));
> assertEquals("1-1", iter.next().get(1));
> assertEquals("1-2", iter.next().get(1));
> assertEquals("1-3", iter.next().get(1));
> assertEquals("1-4", iter.next().get(1));
> } else {
> // because we set PigConstants.TASK_INDEX as 0 in
> // ForEachConverter#ForEachFunction#initializeJobConf
> // UniqueID.exec() will output like 0-*
> // the behavior of spark will be same with mr until PIG-5051 is 
> fixed.
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> }
>...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly

2016-10-30 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15621021#comment-15621021
 ] 

Xianda Ke commented on PIG-5051:


svn-git sync failed? it seems OK now.

LGTM, +1 (non-binding)


> Initialize PigContants.TASK_INDEX in spark mode correctly
> -
>
> Key: PIG-5051
> URL: https://issues.apache.org/jira/browse/PIG-5051
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4920_6_5051.patch, PIG-5051.patch
>
>
> in MR, we initialize PigContants.TASK_INDEX in  
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
>  
> {code}
> protected void setup(Context context) throws IOException, 
> InterruptedException {
>...
> context.getConfiguration().set(PigConstants.TASK_INDEX, 
> Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
> ...
> }
> {code}
> But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to 
> initialize PigContants.TASK_INDEX when job starts. We need find a solution to 
> initialize PigContants.TASK_INDEX correctly.
> After this jira is fixed.  The behavior of TestBuiltin#testUniqueID in spark 
> mode will be same with what in mr.
> Now we divide two cases in  TestBuiltin#testUniqueID
> {code}
>  @Test
> public void testUniqueID() throws Exception {
>  ...
> if (!Util.isSparkExecType(cluster.getExecType())) {
> assertEquals("0-0", iter.next().get(1));
> assertEquals("0-1", iter.next().get(1));
> assertEquals("0-2", iter.next().get(1));
> assertEquals("0-3", iter.next().get(1));
> assertEquals("0-4", iter.next().get(1));
> assertEquals("1-0", iter.next().get(1));
> assertEquals("1-1", iter.next().get(1));
> assertEquals("1-2", iter.next().get(1));
> assertEquals("1-3", iter.next().get(1));
> assertEquals("1-4", iter.next().get(1));
> } else {
> // because we set PigConstants.TASK_INDEX as 0 in
> // ForEachConverter#ForEachFunction#initializeJobConf
> // UniqueID.exec() will output like 0-*
> // the behavior of spark will be same with mr until PIG-5051 is 
> fixed.
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> }
>...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (PIG-5047) support outer join for skewedjoin in spark mode

2016-10-18 Thread Xianda Ke (JIRA)
Xianda Ke created PIG-5047:
--

 Summary: support outer join for skewedjoin in spark mode
 Key: PIG-5047
 URL: https://issues.apache.org/jira/browse/PIG-5047
 Project: Pig
  Issue Type: Sub-task
Reporter: Xianda Ke
Assignee: Xianda Ke


currently, skewedjoin does't support outer join.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4858) Implement Skewed join for spark engine

2016-10-18 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4858:
---
Attachment: PIG-4858_2.patch

fix bug and refine POPoissionSampleSpark

> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858.patch, PIG-4858_2.patch, 
> SkewedJoinInSparkMode.pdf
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4969) Optimize combine case for spark mode

2016-10-12 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15570703#comment-15570703
 ] 

Xianda Ke commented on PIG-4969:


LGTM.  verified.
+1
hi [~xuefuz], please check in PIG-4969_3.patch  

> Optimize combine case for spark mode
> 
>
> Key: PIG-4969
> URL: https://issues.apache.org/jira/browse/PIG-4969
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4969_2.patch, PIG-4969_3.patch
>
>
> In our test result of 1 TB pigmix benchmark , it shows that it runs slower in 
> combine case in spark mode .
> ||Script||MR||Spark
> |L_1|8089 |10064
> L1.pig
> {code}
> register pigperf.jar;
> A = load '/user/pig/tests/data/pigmix/page_views' using 
> org.apache.pig.test.udf.storefunc.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = foreach A generate user, (int)action as action, (map[])page_info as 
> page_info,
> flatten((bag{tuple(map[])})page_links) as page_links;
> C = foreach B generate user,
> (action == 1 ? page_info#'a' : page_links#'b') as header;
> D = group C by user parallel 40;
> E = foreach D generate group, COUNT(C) as cnt;
> store E into 'L1out';
> {code}
> Then spark plan
> {code}
> exec] #--
>  [exec] # Spark Plan  
>  [exec] #--
>  [exec] 
>  [exec] Spark node scope-38
>  [exec] E: 
> Store(hdfs://bdpe81:8020/user/root/output/pig/L1out:org.apache.pig.builtin.PigStorage)
>  - scope-37
>  [exec] |
>  [exec] |---E: New For Each(false,false)[tuple] - scope-42
>  [exec] |   |
>  [exec] |   Project[bytearray][0] - scope-39
>  [exec] |   |
>  [exec] |   Project[bag][1] - scope-40
>  [exec] |   
>  [exec] |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - 
> scope-41
>  [exec] |   |
>  [exec] |   |---Project[bag][1] - scope-57
>  [exec] |
>  [exec] |---Reduce By(false,false)[tuple] - scope-47
>  [exec] |   |
>  [exec] |   Project[bytearray][0] - scope-48
>  [exec] |   |
>  [exec] |   
> POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-49
>  [exec] |   |
>  [exec] |   |---Project[bag][1] - scope-50
>  [exec] |
>  [exec] |---D: Local Rearrange[tuple]{bytearray}(false) - scope-53
>  [exec] |   |
>  [exec] |   Project[bytearray][0] - scope-55
>  [exec] |
>  [exec] |---E: New For Each(false,false)[bag] - scope-43
>  [exec] |   |
>  [exec] |   Project[bytearray][0] - scope-44
>  [exec] |   |
>  [exec] |   
> POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-45
>  [exec] |   |
>  [exec] |   |---Project[bag][1] - scope-46
>  [exec] |
>  [exec] |---Pre Combiner Local Rearrange[tuple]{Unknown} 
> - scope-56
>  [exec] |
>  [exec] |---C: New For Each(false,false)[bag] - 
> scope-26
>  [exec] |   |
>  [exec] |   Project[bytearray][0] - scope-13
>  [exec] |   |
>  [exec] |   POBinCond[bytearray] - scope-22
>  [exec] |   |
>  [exec] |   |---Equal To[boolean] - scope-17
>  [exec] |   |   |
>  [exec] |   |   |---Project[int][1] - scope-15
>  [exec] |   |   |
>  [exec] |   |   |---Constant(1) - scope-16
>  [exec] |   |
>  [exec] |   |---POMapLookUp[bytearray] - scope-19
>  [exec] |   |   |
>  [exec] |   |   |---Project[map][2] - scope-18
>  [exec] |   |
>  [exec] |   |---POMapLookUp[bytearray] - scope-21
>  [exec] |   |
>  [exec] |   |---Project[map][3] - scope-20
>  [exec] |
>  [exec] |---B: New For 
> Each(false,false,false,true)[bag] - scope-12
>  [exec] |   |
>  [exec] |   Project[bytearray][0] - scope-1
>  

[jira] [Updated] (PIG-4858) Implement Skewed join for spark engine

2016-09-20 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4858:
---
Attachment: SkewedJoinInSparkMode.pdf

design doc is updated.

> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858.patch, SkewedJoinInSparkMode.pdf
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4858) Implement Skewed join for spark engine

2016-09-20 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4858:
---
Attachment: (was: SkewedJoinInSparkMode.pdf)

> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858.patch
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

2016-09-19 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15505624#comment-15505624
 ] 

Xianda Ke commented on PIG-5029:


[~kellyzly],

when task re-run, the partitioning is not the same the first run, because of  
the random int.
some records may be duplicated, some may be missed.

for your information:
in SkewedJoin(PIG-4858), records is sent to the reducers in a round robin 
fashion. This partitioning is not random, but even.



> Optimize sort case when data is skewed
> --
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4858) Implement Skewed join for spark engine

2016-09-15 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4858:
---
Attachment: SkewedJoinInSparkMode.pdf

> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858.patch, SkewedJoinInSparkMode.pdf
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4858) Implement Skewed join for spark engine

2016-09-15 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493280#comment-15493280
 ] 

Xianda Ke commented on PIG-4858:


design doc is attached. refer to it for details.

> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858.patch, SkewedJoinInSparkMode.pdf
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4858) Implement Skewed join for spark engine

2016-09-14 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4858:
---
Attachment: PIG-4858.patch

> Implement Skewed join for spark engine
> --
>
> Key: PIG-4858
> URL: https://issues.apache.org/jira/browse/PIG-4858
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4858.patch
>
>
> Now we use regular join to replace skewed join. Need implement it 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5025) Improve TestLoad.java: use own separated folder under /tmp

2016-09-13 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15489308#comment-15489308
 ] 

Xianda Ke commented on PIG-5025:


to avoid occasional failure,  a unique folder name would be better. 
{code}
WORKING_DIR = "/tmp/test" +  java.util.UUID.randomUUID() ;
{code}

> Improve TestLoad.java: use own separated folder under /tmp
> --
>
> Key: PIG-5025
> URL: https://issues.apache.org/jira/browse/PIG-5025
> Project: Pig
>  Issue Type: Improvement
>Reporter: Adam Szita
>Assignee: Adam Szita
>Priority: Minor
> Attachments: PIG-5025.patch
>
>
> Test cases testCommaSeparatedString2 and testGlobChars may fail if for some 
> reason files (from any other sources) in /tmp have : (colon) in the 
> filenames. This is because HDFS doesn't support colon since it has its own 
> URI handling. Exception below.
> I propose we separate the working dir of these tests to use their own folder 
> in /tmp.
> Failed to parse: java.net.URISyntaxException: Relative path in absolute URI: 
> t:2sTest.txt
>   at 
> org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:198)
>   at org.apache.pig.test.TestLoad.checkLoadPath(TestLoad.java:317)
>   at org.apache.pig.test.TestLoad.checkLoadPath(TestLoad.java:299)
>   at 
> org.apache.pig.test.TestLoad.testCommaSeparatedString2(TestLoad.java:189)
> Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException: 
> Relative path in absolute URI: t:2sTest.txt
>   at org.apache.hadoop.fs.Path.initialize(Path.java:206)
>   at org.apache.hadoop.fs.Path.(Path.java:172)
>   at org.apache.hadoop.fs.Path.(Path.java:94)
>   at org.apache.hadoop.fs.Globber.doGlob(Globber.java:260)
>   at org.apache.hadoop.fs.Globber.glob(Globber.java:151)
>   at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1637)
>   at 
> org.apache.pig.backend.hadoop.datastorage.HDataStorage.asCollection(HDataStorage.java:215)
>   at 
> org.apache.pig.backend.hadoop.datastorage.HDataStorage.asCollection(HDataStorage.java:41)
>   at 
> org.apache.pig.builtin.JsonMetadata.findMetaFile(JsonMetadata.java:119)
>   at org.apache.pig.builtin.JsonMetadata.getSchema(JsonMetadata.java:191)
>   at org.apache.pig.builtin.PigStorage.getSchema(PigStorage.java:518)
>   at 
> org.apache.pig.newplan.logical.relational.LOLoad.getSchemaFromMetaData(LOLoad.java:175)
>   at 
> org.apache.pig.newplan.logical.relational.LOLoad.(LOLoad.java:89)
>   at 
> org.apache.pig.parser.LogicalPlanBuilder.buildLoadOp(LogicalPlanBuilder.java:866)
>   at 
> org.apache.pig.parser.LogicalPlanGenerator.load_clause(LogicalPlanGenerator.java:3568)
>   at 
> org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1625)
>   at 
> org.apache.pig.parser.LogicalPlanGenerator.general_statement(LogicalPlanGenerator.java:1102)
>   at 
> org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560)
>   at 
> org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421)
>   at 
> org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:188)
> Caused by: java.net.URISyntaxException: Relative path in absolute URI: 
> t:2sTest.txt
>   at java.net.URI.checkPath(URI.java:1823)
>   at java.net.URI.(URI.java:745)
>   at org.apache.hadoop.fs.Path.initialize(Path.java:203)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-08 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5024:
---
Attachment: PIG-5024_6.patch

> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5024.patch, PIG-5024_2.patch, PIG-5024_3.patch, 
> PIG-5024_4.patch, PIG-5024_5.patch, PIG-5024_6.patch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-08 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5024:
---
Attachment: PIG-5024_5.patch

> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5024.patch, PIG-5024_2.patch, PIG-5024_3.patch, 
> PIG-5024_4.patch, PIG-5024_5.patch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-08 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5024:
---
Attachment: PIG-5024_4.patch

> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5024.patch, PIG-5024_2.patch, PIG-5024_3.patch, 
> PIG-5024_4.patch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-08 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5024:
---
Attachment: PIG-5024_4.patch

> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5024.patch, PIG-5024_2.patch, PIG-5024_3.patch, 
> PIG-5024_4.patch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-08 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5024:
---
Attachment: (was: PIG-5024_4.patch)

> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5024.patch, PIG-5024_2.patch, PIG-5024_3.patch, 
> PIG-5024_4.patch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-08 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5024:
---
Attachment: PIG-5024_4.patch

blank lines are removed.
 Thanks

> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5024.patch, PIG-5024_2.patch, PIG-5024_3.patch, 
> PIG-5024_4.patch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-08 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5024:
---
Attachment: PIG-5024_3.patch

> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5024.patch, PIG-5024_2.patch, PIG-5024_3.patch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-08 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473910#comment-15473910
 ] 

Xianda Ke commented on PIG-5024:


thanks. fixed. PIG-5024_3.patch is attached.

> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5024.patch, PIG-5024_2.patch, PIG-5024_3.patch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-07 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5024:
---
Attachment: PIG-5024_2.patch

hi [~kellyzly], lots of thanks for your comments.

patch is updated. please help view.

> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5024.patch, PIG-5024_2.patch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-07 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5024:
---
Attachment: PIG-5024.patch

{code}
ant clean -Dhadoopversion=23 test-spark
{code}
UT passed.

[~kellyzly], [~mohitsabharwal] please help review.

> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-5024.patch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-07 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469771#comment-15469771
 ] 

Xianda Ke commented on PIG-5024:


To enable broadcast mechanism, add add a broadcast physical operator.

BroadcastConverter just broadcast the predecessor RDD and save the broadcast 
variable to a map, which can be referenced by other function/closures.

Now, RDDConverter.convert() will take three parameters: predecessor RDDs, 
broadcasted variables map and a PhysicalOperator


> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (PIG-4870) Enable MergeJoin testcase in TestCollectedGroup for spark engine

2016-09-06 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469348#comment-15469348
 ] 

Xianda Ke edited comment on PIG-4870 at 9/7/16 3:10 AM:


Since merge join optimization is ready, just enable 
TestCollectedGroup.testMapsideGroupWithMergeJoin in spark mode.

Only remove this line, and reformat 
```
if(!Util.isSparkExecType(cluster.getExecType())
```
[~kellyzly], please help review.


was (Author: kexianda):
Since merge join optimization is ready, just enable 
TestCollectedGroup.testMapsideGroupWithMergeJoin in spark mode.

> Enable MergeJoin testcase in TestCollectedGroup for spark engine
> 
>
> Key: PIG-4870
> URL: https://issues.apache.org/jira/browse/PIG-4870
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4870.patch
>
>
> TestCollectedGroup.testMapsideGroupWithMergeJoin was disabled( PIG-4781).
> When MergeJoin (PIG-4810) is ready,  we can enable the UT case for spark 
> engine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4870) Enable MergeJoin testcase in TestCollectedGroup for spark engine

2016-09-06 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4870:
---
Attachment: PIG-4870.patch

Since merge join optimization is ready, just enable 
TestCollectedGroup.testMapsideGroupWithMergeJoin in spark mode.

> Enable MergeJoin testcase in TestCollectedGroup for spark engine
> 
>
> Key: PIG-4870
> URL: https://issues.apache.org/jira/browse/PIG-4870
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4870.patch
>
>
> TestCollectedGroup.testMapsideGroupWithMergeJoin was disabled( PIG-4781).
> When MergeJoin (PIG-4810) is ready,  we can enable the UT case for spark 
> engine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-06 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-5024:
---
Description: 
Currently, when optimize some kinds of JOIN, the indexed or sampling files are 
saved into HDFS. By setting the replication to a larger number, it serves as 
distributed cache.

Spark's broadcast mechanism is suitable for this. It seems that we can add a 
physical operator to broadcast small RDDs.
This will benefit the optimization of some specialized Joins, such as Skewed 
Join, Replicated Join and so on. 



  was:
Currently, when optimize some kinds of JOIN, the indexed or sampling files are 
saved into HDFS. By setting the replication to a larger number, it serves as 
cache.

It seems that we can add a physical operator to broadcast small RDDs.
This will benefit some specialized Joins, such as Skewed Join, Replicated Join 
and so on. 




> add a physical operator to broadcast small RDDs
> ---
>
> Key: PIG-5024
> URL: https://issues.apache.org/jira/browse/PIG-5024
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> Currently, when optimize some kinds of JOIN, the indexed or sampling files 
> are saved into HDFS. By setting the replication to a larger number, it serves 
> as distributed cache.
> Spark's broadcast mechanism is suitable for this. It seems that we can add a 
> physical operator to broadcast small RDDs.
> This will benefit the optimization of some specialized Joins, such as Skewed 
> Join, Replicated Join and so on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (PIG-5024) add a physical operator to broadcast small RDDs

2016-09-06 Thread Xianda Ke (JIRA)
Xianda Ke created PIG-5024:
--

 Summary: add a physical operator to broadcast small RDDs
 Key: PIG-5024
 URL: https://issues.apache.org/jira/browse/PIG-5024
 Project: Pig
  Issue Type: Sub-task
Reporter: Xianda Ke
Assignee: Xianda Ke


Currently, when optimize some kinds of JOIN, the indexed or sampling files are 
saved into HDFS. By setting the replication to a larger number, it serves as 
cache.

It seems that we can add a physical operator to broadcast small RDDs.
This will benefit some specialized Joins, such as Skewed Join, Replicated Join 
and so on. 





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (PIG-4970) Remove the deserialize and serialization of JobConf in code for spark mode

2016-08-23 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434176#comment-15434176
 ] 

Xianda Ke edited comment on PIG-4970 at 8/24/16 5:07 AM:
-

+1 (non-binding)
hi [~xuefuz], please help check in. 


was (Author: kexianda):
+1 (non-binding)
[~xuefuz], please help review and commit.

> Remove the deserialize and serialization of JobConf in code for spark mode
> --
>
> Key: PIG-4970
> URL: https://issues.apache.org/jira/browse/PIG-4970
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4970.patch, PIG-4970_2.patch, PIG-4970_3.patch, 
> PIG-4970_4.patch
>
>
> Now we use KryoSerializer to serialize the jobConf in 
> [SparkLauncher|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java#L191].
>  then 
> deserialize it in 
> [ForEachConverter|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java#L83],
>   
> [StreamConverter|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java#L70].
>We deserialize and serialize the jobConf in order to make jobConf 
> available in spark executor thread.
> We can refactor it in following ways:
> 1. Let spark to broadcast the jobConf in 
> [sparkContext.newAPIHadoopRDD|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java#L102].
>  Here not create a new jobConf and load properties from PigContext but 
> directly use jobConf from SparkLauncher.
> 2. get jobConf in 
> [org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark#createRecordReader|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java#L42]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4970) Remove the deserialize and serialization of JobConf in code for spark mode

2016-08-23 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434176#comment-15434176
 ] 

Xianda Ke commented on PIG-4970:


+1 (non-binding)
[~xuefuz], please help review and commit.

> Remove the deserialize and serialization of JobConf in code for spark mode
> --
>
> Key: PIG-4970
> URL: https://issues.apache.org/jira/browse/PIG-4970
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4970.patch, PIG-4970_2.patch, PIG-4970_3.patch, 
> PIG-4970_4.patch
>
>
> Now we use KryoSerializer to serialize the jobConf in 
> [SparkLauncher|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java#L191].
>  then 
> deserialize it in 
> [ForEachConverter|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java#L83],
>   
> [StreamConverter|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java#L70].
>We deserialize and serialize the jobConf in order to make jobConf 
> available in spark executor thread.
> We can refactor it in following ways:
> 1. Let spark to broadcast the jobConf in 
> [sparkContext.newAPIHadoopRDD|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java#L102].
>  Here not create a new jobConf and load properties from PigContext but 
> directly use jobConf from SparkLauncher.
> 2. get jobConf in 
> [org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark#createRecordReader|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java#L42]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4970) Remove the deserialize and serialization of JobConf in code for spark mode

2016-08-23 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432389#comment-15432389
 ] 

Xianda Ke commented on PIG-4970:


Ok. got it.  "pig.cachedbag.type" was never set in mr/tez mode. it seems that 
InternalCachedBag works.

> Remove the deserialize and serialization of JobConf in code for spark mode
> --
>
> Key: PIG-4970
> URL: https://issues.apache.org/jira/browse/PIG-4970
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4970.patch
>
>
> Now we use KryoSerializer to serialize the jobConf in 
> [SparkLauncher|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java#L191].
>  then 
> deserialize it in 
> [ForEachConverter|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java#L83],
>   
> [StreamConverter|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java#L70].
>We deserialize and serialize the jobConf in order to make jobConf 
> available in spark executor thread.
> We can refactor it in following ways:
> 1. Let spark to broadcast the jobConf in 
> [sparkContext.newAPIHadoopRDD|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java#L102].
>  Here not create a new jobConf and load properties from PigContext but 
> directly use jobConf from SparkLauncher.
> 2. get jobConf in 
> [org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark#createRecordReader|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java#L42]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4970) Remove the deserialize and serialization of JobConf in code for spark mode

2016-08-23 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15432329#comment-15432329
 ] 

Xianda Ke commented on PIG-4970:


hi [~kellyzly]

Since jobConf is already broadcasted to work node, it is OK to remove the 
redundant serialization/deserialization.

open issues:
1. remove unused import statements in XxxConverter.java 
2. it seems that we should set "pig.cachedbag.type" as "default" for Package & 
JoinGroupSark operators.

> Remove the deserialize and serialization of JobConf in code for spark mode
> --
>
> Key: PIG-4970
> URL: https://issues.apache.org/jira/browse/PIG-4970
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4970.patch
>
>
> Now we use KryoSerializer to serialize the jobConf in 
> [SparkLauncher|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java#L191].
>  then 
> deserialize it in 
> [ForEachConverter|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java#L83],
>   
> [StreamConverter|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java#L70].
>We deserialize and serialize the jobConf in order to make jobConf 
> available in spark executor thread.
> We can refactor it in following ways:
> 1. Let spark to broadcast the jobConf in 
> [sparkContext.newAPIHadoopRDD|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java#L102].
>  Here not create a new jobConf and load properties from PigContext but 
> directly use jobConf from SparkLauncher.
> 2. get jobConf in 
> [org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark#createRecordReader|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java#L42]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4941) TestRank3#testRankWithSplitInMap hangs after upgrade to spark 1.6.1

2016-07-14 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376570#comment-15376570
 ] 

Xianda Ke commented on PIG-4941:


also see: MAPREDUCE-5896  &  SPARK-1767

LGTM
+1

> TestRank3#testRankWithSplitInMap hangs after upgrade to spark 1.6.1
> ---
>
> Key: PIG-4941
> URL: https://issues.apache.org/jira/browse/PIG-4941
> Project: Pig
>  Issue Type: Sub-task
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-4941.patch, rank.jstack
>
>
> After upgrading spark version to 1.6.1, TestRank3#testRankWithSplitInMap 
> hangs and fails due to timeout exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4946) Remove redudant code of bin/pig in spark mode after PIG-4903 check in

2016-07-11 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370295#comment-15370295
 ] 

Xianda Ke commented on PIG-4946:


It is OK to remove the  redundant code.
Verified.  
LGTM. 
+1

> Remove redudant code of bin/pig in spark mode after PIG-4903 check in
> -
>
> Key: PIG-4946
> URL: https://issues.apache.org/jira/browse/PIG-4946
> Project: Pig
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-4946.patch
>
>
> After PIG-4903 checkin, some redudant code of bin/pig in spark branch is 
> generated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (PIG-4944) Reset UDFContext#jobConf in spark mode

2016-07-05 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15363799#comment-15363799
 ] 

Xianda Ke edited comment on PIG-4944 at 7/6/16 5:48 AM:


In MR mode.  LocalJobRunner create a new thread for different UT cases. 
Thread_local jobConf is set in this new created thread.
In spark mode(Local mode), all the ut cases share the same main thread.  
UDFContext's JobConf is reset in PigInputFormat.passLoadSignature()

LGTM.  It is the right place to clean JobConf in end of 
SparkLauncher.launchPig()
+1

[~xuefuz], please help review and commit

{code}
patch -p1 < PIG-4944.patch
{code}


was (Author: kexianda):
In MR mode.  LocalJobRunner create a new thread for different UT cases. 
Thread_local jobConf is set in this new created thread.
In spark mode(Local mode), all the ut cases share the same main thread.  
UDFContext's JobConf is reset in PigInputFormat.passLoadSignature()

LGTM.  It is the right place to clean JobConf in end of 
SparkLauncher.launchPig()
+1


> Reset UDFContext#jobConf in spark mode
> --
>
> Key: PIG-4944
> URL: https://issues.apache.org/jira/browse/PIG-4944
> Project: Pig
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-4944.patch, TestEvalPipelineLocal.mr, 
> TestEvalPipelineLocal.spark
>
>
> Community gave some comments about TestEvalPipelineLocal unit test:
> https://reviews.apache.org/r/45667/#comment199056
> We can reset "UDFContext.getUDFContext().addJobConf(null)" in other place not 
>  in TestEvalPipelineLocal#testSetLocationCalledInFE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (PIG-4944) Reset UDFContext#jobConf in spark mode

2016-07-05 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15363799#comment-15363799
 ] 

Xianda Ke edited comment on PIG-4944 at 7/6/16 5:27 AM:


In MR mode.  LocalJobRunner create a new thread for different UT cases. 
Thread_local jobConf is set in this new created thread.
In spark mode(Local mode), all the ut cases share the same main thread.  
UDFContext's JobConf is reset in PigInputFormat.passLoadSignature()

LGTM.  It is the right place to clean JobConf in end of 
SparkLauncher.launchPig()
+1



was (Author: kexianda):
In MR mode.  LocalJobRunner create a new thread for different UT cases. 
Thread_local jobConf is set in this new created thread.
In spark mode(Local mode), all the ut cases share the same main thread.  

LGTM.  It is the right place to clean JobConf in end of 
SparkLauncher.launchPig()
+1


> Reset UDFContext#jobConf in spark mode
> --
>
> Key: PIG-4944
> URL: https://issues.apache.org/jira/browse/PIG-4944
> Project: Pig
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-4944.patch, TestEvalPipelineLocal.mr, 
> TestEvalPipelineLocal.spark
>
>
> Community gave some comments about TestEvalPipelineLocal unit test:
> https://reviews.apache.org/r/45667/#comment199056
> We can reset "UDFContext.getUDFContext().addJobConf(null)" in other place not 
>  in TestEvalPipelineLocal#testSetLocationCalledInFE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4944) Reset UDFContext#jobConf in spark mode

2016-07-05 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15363799#comment-15363799
 ] 

Xianda Ke commented on PIG-4944:


In MR mode.  LocalJobRunner create a new thread for different UT cases. 
Thread_local jobConf is set in this new created thread.
In spark mode(Local mode), all the ut cases share the same main thread.  

LGTM.  It is the right place to clean JobConf in end of 
SparkLauncher.launchPig()
+1


> Reset UDFContext#jobConf in spark mode
> --
>
> Key: PIG-4944
> URL: https://issues.apache.org/jira/browse/PIG-4944
> Project: Pig
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-4944.patch, TestEvalPipelineLocal.mr, 
> TestEvalPipelineLocal.spark
>
>
> Community gave some comments about TestEvalPipelineLocal unit test:
> https://reviews.apache.org/r/45667/#comment199056
> We can reset "UDFContext.getUDFContext().addJobConf(null)" in other place not 
>  in TestEvalPipelineLocal#testSetLocationCalledInFE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4281) Fix TestFinish for Spark engine

2016-07-04 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15361902#comment-15361902
 ] 

Xianda Ke commented on PIG-4281:


[~kellyzly] Ok, got it. Thanks for your explanation.

LGTM
+1

> Fix TestFinish for Spark engine
> ---
>
> Key: PIG-4281
> URL: https://issues.apache.org/jira/browse/PIG-4281
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4281.patch, PIG-4281_2.patch, PIG-4281_3.patch, 
> TEST-org.apache.pig.test.TestFinish.txt
>
>
> error log is attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4281) Fix TestFinish for Spark engine

2016-07-04 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15361055#comment-15361055
 ] 

Xianda Ke commented on PIG-4281:


Hi [~kellyzly],

I agree with community's comments that  a user UDF should not have access to 
cluster object.

But I wonder that it won't work if we just set PigMapReduce.sJobConfInternal in 
SparkLaucher( driver program). When PigMapReduce is  instantiated in the 
backend JVMs,  I guess that sJobConfInternal is still null in spark mode.

Why PigMapReduce.sJobConfInternal  works in MR mode?  I guess that the reason 
is that sJobConfInternal is reset in reducer/mapper
{code}
// PigGenericMapReduce.java Line 318
sJobConfInternal.set(context.getConfiguration());
{code}

To make PigMapReduce.sJobConfInternal work, it seems that we have to pass the 
serialized conf bytes to the functions which will be passed to spark's RDD.





> Fix TestFinish for Spark engine
> ---
>
> Key: PIG-4281
> URL: https://issues.apache.org/jira/browse/PIG-4281
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4281.patch, PIG-4281_2.patch, PIG-4281_3.patch, 
> TEST-org.apache.pig.test.TestFinish.txt
>
>
> error log is attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4936) Fix NPE exception in TestCustomPartitioner#testCustomPartitionerParseJoins

2016-06-30 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356661#comment-15356661
 ] 

Xianda Ke commented on PIG-4936:


LGTM
+1
[~xuefuz], please help review and commit this patch.

> Fix NPE exception in TestCustomPartitioner#testCustomPartitionerParseJoins
> --
>
> Key: PIG-4936
> URL: https://issues.apache.org/jira/browse/PIG-4936
> Project: Pig
>  Issue Type: Sub-task
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-4936.patch
>
>
> After PIG-4919(upgrade spark to 1.6.1 version), 
> TestCustomPartitioner#TestCustomPartitioner fails.
> {code}
> Testcase: testCustomPartitionerParseJoins took 1.66 sec
> Caused an ERROR
> Unable to open iterator for alias hash. Backend error : Job aborted due to 
> stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost 
> task 0.0 in stage 2.0 (TID 3, localhost): java.lang.RuntimeException: 
> java.lang.reflect.InvocationTargetException
> at 
> org.apache.pig.backend.hadoop.executionengine.spark.MapReducePartitionerWrapper.getPartition(MapReducePartitionerWrapper.java:118)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> org.apache.pig.backend.hadoop.executionengine.spark.MapReducePartitionerWrapper.getPartition(MapReducePartitionerWrapper.java:110)
> ... 8 more 
> Caused by: java.lang.NullPointerException
> at 
> org.apache.pig.impl.io.PigNullableWritable.hashCode(PigNullableWritable.java:185)
> at 
> org.apache.pig.test.utils.SimpleCustomPartitioner.getPartition(SimpleCustomPartitioner.java:33)
> ... 13 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (PIG-4936) Fix NPE exception in TestCustomPartitioner#testCustomPartitionerParseJoins

2016-06-27 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350654#comment-15350654
 ] 

Xianda Ke edited comment on PIG-4936 at 6/27/16 2:57 PM:
-

hi [~kellyzly],

I'm just curious about what change cause failure.   
is it cause by the change in Partitioner.scala?
{code}
git diff v1.6.1 v1.4.1 ./core/src/main/scala/org/apache/spark/Partitioner.scala
{code}



was (Author: kexianda):
hi [~kellyzly],
code seems OK.
I'm just curious about what change cause failure.   
is it cause by the change in Partitioner.scala?
{code}
git diff v1.6.1 v1.4.1 ./core/src/main/scala/org/apache/spark/Partitioner.scala
{code}


> Fix NPE exception in TestCustomPartitioner#testCustomPartitionerParseJoins
> --
>
> Key: PIG-4936
> URL: https://issues.apache.org/jira/browse/PIG-4936
> Project: Pig
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-4936.patch
>
>
> After PIG-4919(upgrade spark to 1.6.1 version), 
> TestCustomPartitioner#TestCustomPartitioner fails.
> {code}
> Testcase: testCustomPartitionerParseJoins took 1.66 sec
> Caused an ERROR
> Unable to open iterator for alias hash. Backend error : Job aborted due to 
> stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost 
> task 0.0 in stage 2.0 (TID 3, localhost): java.lang.RuntimeException: 
> java.lang.reflect.InvocationTargetException
> at 
> org.apache.pig.backend.hadoop.executionengine.spark.MapReducePartitionerWrapper.getPartition(MapReducePartitionerWrapper.java:118)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> org.apache.pig.backend.hadoop.executionengine.spark.MapReducePartitionerWrapper.getPartition(MapReducePartitionerWrapper.java:110)
> ... 8 more 
> Caused by: java.lang.NullPointerException
> at 
> org.apache.pig.impl.io.PigNullableWritable.hashCode(PigNullableWritable.java:185)
> at 
> org.apache.pig.test.utils.SimpleCustomPartitioner.getPartition(SimpleCustomPartitioner.java:33)
> ... 13 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4936) Fix NPE exception in TestCustomPartitioner#testCustomPartitionerParseJoins

2016-06-27 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350654#comment-15350654
 ] 

Xianda Ke commented on PIG-4936:


hi [~kellyzly],
code seems OK.
I'm just curious about what change cause failure.   
is it cause by the change in Partitioner.scala?
{code}
git diff v1.6.1 v1.4.1 ./core/src/main/scala/org/apache/spark/Partitioner.scala
{code}


> Fix NPE exception in TestCustomPartitioner#testCustomPartitionerParseJoins
> --
>
> Key: PIG-4936
> URL: https://issues.apache.org/jira/browse/PIG-4936
> Project: Pig
>  Issue Type: Bug
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-4936.patch
>
>
> After PIG-4919(upgrade spark to 1.6.1 version), 
> TestCustomPartitioner#TestCustomPartitioner fails.
> {code}
> Testcase: testCustomPartitionerParseJoins took 1.66 sec
> Caused an ERROR
> Unable to open iterator for alias hash. Backend error : Job aborted due to 
> stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost 
> task 0.0 in stage 2.0 (TID 3, localhost): java.lang.RuntimeException: 
> java.lang.reflect.InvocationTargetException
> at 
> org.apache.pig.backend.hadoop.executionengine.spark.MapReducePartitionerWrapper.getPartition(MapReducePartitionerWrapper.java:118)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> org.apache.pig.backend.hadoop.executionengine.spark.MapReducePartitionerWrapper.getPartition(MapReducePartitionerWrapper.java:110)
> ... 8 more 
> Caused by: java.lang.NullPointerException
> at 
> org.apache.pig.impl.io.PigNullableWritable.hashCode(PigNullableWritable.java:185)
> at 
> org.apache.pig.test.utils.SimpleCustomPartitioner.getPartition(SimpleCustomPartitioner.java:33)
> ... 13 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4927) Support stop.on.failure in spark mode

2016-06-20 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15340980#comment-15340980
 ] 

Xianda Ke commented on PIG-4927:


[~kellyzly], see comments in RB.

> Support stop.on.failure in spark mode
> -
>
> Key: PIG-4927
> URL: https://issues.apache.org/jira/browse/PIG-4927
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Attachments: PIG-4927.patch
>
>
> After merge with trunk, following unit test fails because we don't support 
> stop.on.failure property now.
> TestGrunt#testStopOnFailure



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4871) Not use OperatorPlan#forceConnect in MultiQueryOptimizationSpark

2016-06-16 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15335444#comment-15335444
 ] 

Xianda Ke commented on PIG-4871:


LGTM.   see the discussion in [PIG-4594 | 
https://issues.apache.org/jira/browse/PIG-4594?focusedCommentId=14641986=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14641986]

+1 (non-binding)

>  Not use OperatorPlan#forceConnect in MultiQueryOptimizationSpark
> -
>
> Key: PIG-4871
> URL: https://issues.apache.org/jira/browse/PIG-4871
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4871_2.patch
>
>
> In current code base, we use OperatorPlan#forceConnect() while merge the 
> physical plan of spliter and splittee in MultiQueryOptimizationSpark.
> The difference between OperatorPlan#connect and OperatorPlan#forceConnect is 
> not checking whether support multiOutputs and multiInputs or not in 
> forceConnect.
> {code}
>  /**
>  * connect from and to and ignore some judgements: like ignoring judge 
> whether from operator supports multiOutputs
>  * and whether to operator supports multiInputs
>  *
>  * @param from Operator data will flow from.
>  * @param to   Operator data will flow to.
>  * @throws PlanException if connect from or to which is not in the plan
>  */
> public void forceConnect(E from, E to) throws PlanException {
> markDirty();
> // Check that both nodes are in the plan.
> checkInPlan(from);
> checkInPlan(to);
> mFromEdges.put(from, to);
> mToEdges.put(to, from);
> }
> {code}
> Let's use an example to explain why add forceConnect before.
> {code}
> A = load './split5'  AS (a0:int, a1:int, a2:int);
> B = foreach A generate a0, a1;
> C = join A by a0, B by a0;
> D = filter C by A::a1>=B::a1;
> store D into './split5.out';
> {code}
> before multiquery optimization
> {code}
> scope-37->scope-43 
> scope-43
> #--
> # Spark Plan  
> #--
> Spark node scope-37
> Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
>  - scope-38
> |
> |---A: New For Each(false,false,false)[bag] - scope-10
> |   |
> |   Cast[int] - scope-2
> |   |
> |   |---Project[bytearray][0] - scope-1
> |   |
> |   Cast[int] - scope-5
> |   |
> |   |---Project[bytearray][1] - scope-4
> |   |
> |   Cast[int] - scope-8
> |   |
> |   |---Project[bytearray][2] - scope-7
> |
> |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-0
> Spark node scope-43
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
> |   |
> |   Greater Than or Equal[boolean] - scope-35
> |   |
> |   |---Project[int][1] - scope-33
> |   |
> |   |---Project[int][4] - scope-34
> |
> |---C: New For Each(true,true)[tuple] - scope-31
> |   |
> |   Project[bag][1] - scope-29
> |   |
> |   Project[bag][2] - scope-30
> |
> |---C: Package(Packager)[tuple]{int} - scope-24
> |
> |---C: Global Rearrange[tuple] - scope-23
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-25
> |   |   |
> |   |   Project[int][0] - scope-26
> |   |
> |   
> |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
>  - scope-39
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-27
> |   |
> |   Project[int][0] - scope-28
> |
> |---B: New For Each(false,false)[bag] - scope-20
> |   |
> |   Project[int][0] - scope-16
> |   |
> |   Project[int][1] - scope-18
> |
> 
> |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
>  - scope-41{code}
> after multiquery optimization
> {code}
> after multiquery optimization:
> scope-37
> #--
> # Spark Plan  
> #--
> Spark node scope-37
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: 

[jira] [Updated] (PIG-4810) Implement Merge join for spark engine

2016-06-16 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4810:
---
Attachment: PIG-4810-7.patch

PIG-4810-7.patch is slightly updated.
uploaded to [RB | https://reviews.apache.org/r/44771/diff/6-7/]

[~kellyzly], please help review.

> Implement Merge join for spark engine
> -
>
> Key: PIG-4810
> URL: https://issues.apache.org/jira/browse/PIG-4810
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4810-2.patch, PIG-4810-3.patch, PIG-4810-4.patch, 
> PIG-4810-5.patch, PIG-4810-6.patch, PIG-4810-7.patch, PIG-4810.patch
>
>
> In current code base(a9151ac), we use regular join to implement merge join in 
> spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4810) Implement Merge join for spark engine

2016-06-16 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4810:
---
Attachment: (was: PIG-4810-7.patch)

> Implement Merge join for spark engine
> -
>
> Key: PIG-4810
> URL: https://issues.apache.org/jira/browse/PIG-4810
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4810-2.patch, PIG-4810-3.patch, PIG-4810-4.patch, 
> PIG-4810-5.patch, PIG-4810-6.patch, PIG-4810.patch
>
>
> In current code base(a9151ac), we use regular join to implement merge join in 
> spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4929) del

2016-06-16 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4929:
---
Description: (was: currently, the client code to construct a 
CryptoRandom or CryptoCipher looks like this:
{code}
// code snippet (a)
Properties props = new Properties();
props.setProperty(
 ConfigurationKeys.COMMONS_CRYPTO_SECURE_RANDOM_CLASSES_KEY,
OpensslCryptoRandom.class.getName());
CryptoRandom random = CryptoRandomFactory.getCryptoRandom(props);
{code}
or using configuration file, it looks like :

{code}
# config file
secure.random.classes="org.apache.commons.crypto.random.OpensslCryptoRandom"
{code}
{code}
// code snippet (b)
{
Properties props = loadMyApplicationConfig();
// ...
}
 
{
// bussiness logic ...
CryptoRandom random = CryptoRandomFactory.getCryptoRandom(props);
// ...
CryptoCipher cipher = CryptoCipherFactory.getInstance(transform, props);
}
{code}
disadvantages:
1. if client user just want use openssl engine,  trivial stuff in code snippet 
(a). it looks annoying.
2. Client user has to use the long long config key string such as  
"COMMONS_CRYPTO_SECURE_RANDOM_CLASSES_KEY" or full name of classes
 Client user has to read source to  learn how to config the properties. 
3. the implementation classes such as JavaCryptoRandom,  OsCryptoRandom and 
JavaCryptoRandom are public.
it would be hard to change library implementation in future.

if we just *use a enum (RandomProvider or CryptCipherProvider)*
{code}
// code snippet (c)
// client code looks simple and elegant now:
//RandomProvider.OS or RandomProvider.JAVA
CryptoRandom random = 
CryptoRandomFactory.getCryptoRandom(RandomProvider.OPENSSL);
{code}
still, client user can use configuration file
{code}
# config file
RandomProvider="OPENSSL"
CryptCipherProvider="JCE"
{code}
{code}
// code snippet 
{
Properties props = loadMyApplicationConfig();
RandomProvider randProvider = RandomProvider.valueOf(props.getProperty(p1));
CryptoProvider cryptoRrovider 
=RandomProvider.valueOf(props.getProperty(p1));
}
{
// bussiness logic ...
CryptoRandom random = CryptoRandomFactory.getCryptoRandom(randProvider);
// ...
CryptoCipher cipher = CryptoCipherFactory.getInstance(transform, 
cryptoRrovider);
}
{code}
advantages:
1. Simpler API.  snippet (c) is simpler than snippet (a).  
2. Modern IDE will hint that CryptoRandomFactory.getCryptoRandom()  need a enum 
type (RandomProvider). client user do NOT have to search the long  key string 
such as "COMMONS_CRYPTO_SECURE_RANDOM_CLASSES_KEY". Modern IDE will tell client 
user how to config
3. we don't have to expose the implementation classes as public)
Summary: del  (was: improve factory api for constructing CryptoRandom & 
CryptoCipher)

> del
> ---
>
> Key: PIG-4929
> URL: https://issues.apache.org/jira/browse/PIG-4929
> Project: Pig
>  Issue Type: Bug
>Reporter: Xianda Ke
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (PIG-4929) improve factory api for constructing CryptoRandom & CryptoCipher

2016-06-16 Thread Xianda Ke (JIRA)
Xianda Ke created PIG-4929:
--

 Summary: improve factory api for constructing CryptoRandom & 
CryptoCipher
 Key: PIG-4929
 URL: https://issues.apache.org/jira/browse/PIG-4929
 Project: Pig
  Issue Type: Bug
Reporter: Xianda Ke


currently, the client code to construct a CryptoRandom or CryptoCipher looks 
like this:
{code}
// code snippet (a)
Properties props = new Properties();
props.setProperty(
 ConfigurationKeys.COMMONS_CRYPTO_SECURE_RANDOM_CLASSES_KEY,
OpensslCryptoRandom.class.getName());
CryptoRandom random = CryptoRandomFactory.getCryptoRandom(props);
{code}
or using configuration file, it looks like :

{code}
# config file
secure.random.classes="org.apache.commons.crypto.random.OpensslCryptoRandom"
{code}
{code}
// code snippet (b)
{
Properties props = loadMyApplicationConfig();
// ...
}
 
{
// bussiness logic ...
CryptoRandom random = CryptoRandomFactory.getCryptoRandom(props);
// ...
CryptoCipher cipher = CryptoCipherFactory.getInstance(transform, props);
}
{code}
disadvantages:
1. if client user just want use openssl engine,  trivial stuff in code snippet 
(a). it looks annoying.
2. Client user has to use the long long config key string such as  
"COMMONS_CRYPTO_SECURE_RANDOM_CLASSES_KEY" or full name of classes
 Client user has to read source to  learn how to config the properties. 
3. the implementation classes such as JavaCryptoRandom,  OsCryptoRandom and 
JavaCryptoRandom are public.
it would be hard to change library implementation in future.

if we just *use a enum (RandomProvider or CryptCipherProvider)*
{code}
// code snippet (c)
// client code looks simple and elegant now:
//RandomProvider.OS or RandomProvider.JAVA
CryptoRandom random = 
CryptoRandomFactory.getCryptoRandom(RandomProvider.OPENSSL);
{code}
still, client user can use configuration file
{code}
# config file
RandomProvider="OPENSSL"
CryptCipherProvider="JCE"
{code}
{code}
// code snippet 
{
Properties props = loadMyApplicationConfig();
RandomProvider randProvider = RandomProvider.valueOf(props.getProperty(p1));
CryptoProvider cryptoRrovider 
=RandomProvider.valueOf(props.getProperty(p1));
}
{
// bussiness logic ...
CryptoRandom random = CryptoRandomFactory.getCryptoRandom(randProvider);
// ...
CryptoCipher cipher = CryptoCipherFactory.getInstance(transform, 
cryptoRrovider);
}
{code}
advantages:
1. Simpler API.  snippet (c) is simpler than snippet (a).  
2. Modern IDE will hint that CryptoRandomFactory.getCryptoRandom()  need a enum 
type (RandomProvider). client user do NOT have to search the long  key string 
such as "COMMONS_CRYPTO_SECURE_RANDOM_CLASSES_KEY". Modern IDE will tell client 
user how to config
3. we don't have to expose the implementation classes as public



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4810) Implement Merge join for spark engine

2016-06-15 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4810:
---
Attachment: PIG-4810-7.patch

Thanks [~kellyzly],  
updated patch PIG-4810-7.patch is attached.  issues are fixed.
Review Board:  https://reviews.apache.org/r/44771/diff/5-6/

> Implement Merge join for spark engine
> -
>
> Key: PIG-4810
> URL: https://issues.apache.org/jira/browse/PIG-4810
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4810-2.patch, PIG-4810-3.patch, PIG-4810-4.patch, 
> PIG-4810-5.patch, PIG-4810-6.patch, PIG-4810-7.patch, PIG-4810.patch
>
>
> In current code base(a9151ac), we use regular join to implement merge join in 
> spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4810) Implement Merge join for spark engine

2016-06-14 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4810:
---
Attachment: PIG-4810-6.patch

PIG-4810-6.patch is uploaded. please help review.

> Implement Merge join for spark engine
> -
>
> Key: PIG-4810
> URL: https://issues.apache.org/jira/browse/PIG-4810
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4810-2.patch, PIG-4810-3.patch, PIG-4810-4.patch, 
> PIG-4810-5.patch, PIG-4810-6.patch, PIG-4810.patch
>
>
> In current code base(a9151ac), we use regular join to implement merge join in 
> spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4810) Implement Merge join for spark engine

2016-06-13 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328798#comment-15328798
 ] 

Xianda Ke commented on PIG-4810:


Hi [~kellyzly], Thanks for your comments. 
1. setReplication() make sense. Thanks.
2. MergeJoin require sorted data as input. MergeJoin optimization will fail UT. 
That why ORDER query is added.
3. I will fix indent issue.

I will update the patch soon.

> Implement Merge join for spark engine
> -
>
> Key: PIG-4810
> URL: https://issues.apache.org/jira/browse/PIG-4810
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4810-2.patch, PIG-4810-3.patch, PIG-4810-4.patch, 
> PIG-4810-5.patch, PIG-4810.patch
>
>
> In current code base(a9151ac), we use regular join to implement merge join in 
> spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (PIG-4857) Last record is missing in STREAM operator

2016-05-31 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke resolved PIG-4857.

Resolution: Fixed

already fixed in PIG-4876

> Last record is missing in STREAM operator
> -
>
> Key: PIG-4857
> URL: https://issues.apache.org/jira/browse/PIG-4857
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4857.patch
>
>
> This bug is similar to PIG-4842.
> Scenario:
> {code}
> cat input.txt
> 1
> 1
> 2
> {code}
> Pig script:
> {code}
> REGISTER myudfs.jar;
> A = LOAD 'input.txt' USING myudfs.DummyCollectableLoader() AS (id); 
> B = GROUP A by $0 USING 'collected';-- (1, {(1),(1)}), (2,{(2)})
> C = STREAM B THROUGH ` awk '{
>  print $0;
> }'`;
> DUMP C;
> {code}
> Expected Result:
> {code}
> (1,{(1),(1)})
> (2,{(2)})
> {code}
> Actual Result:
> {code}
> (1,{(1),(1)})
> {code}
> The last record is missing...
> Root Cause:
> When the flag endOfAllInput was set as true by the predecessor,  the 
> predecessor buffers the last record which is the input of Stream.   Then 
> POStream find endOfAllInput is true, in fact, the last input is not consumed 
> yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4876) OutputConsumeIterator can't handle the last buffered tuples for some Operators

2016-04-28 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263473#comment-15263473
 ] 

Xianda Ke commented on PIG-4876:


I prefer Option (b).

> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> --
>
> Key: PIG-4876
> URL: https://issues.apache.org/jira/browse/PIG-4876
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4876.patch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
> input records to constitute the result tuples. The last result tuples are 
> buffered in the operator.  These Operators need a flag to indicate the end of 
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
> buffered tuples in MR mode.  But it does not work with OutputConsumeIterator 
> in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4876) OutputConsumeIterator can't handle the last buffered tuples for some Operators

2016-04-28 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263459#comment-15263459
 ] 

Xianda Ke commented on PIG-4876:


*The fragility & complexity of sharing a flag between operators*:

hi [~mohitsabharwal], the flag may be reset by input.hasNext() or input.next() 
if any proceding operator reach at its end.
{code:title=OutputConsumerIterator.java}
if (result == null) {
  // it does not work if called here. input.hasNext() or input.next() may reset 
the flag
  //beginOfInput();  
  if (!input.hasNext()) {
 done = true;
 return;
  }
  Tuple v1 = input.next();
  beginOfInput();   // it seems ok to insert the call here (?), need to be 
tested carefully
  attach(v1); 
  }

  if (!input.hasNext()) {
endOfInput();  
// Another issue:   
// in MR mode, flag was set after the last input tuple is consumed by 
getNextResult()
// here, the flag was set before the last input is consumed in 
getNextResult()
// it doesn't matter for MergeCogroup, CollectedGroup. These operators 
work. but it is a problem for the current implemetation of POMergeJoin
// the implemetation of POMergeJoin is tightly coupling with MR 
runPipeline. 
// even beginOfInput() is called when attaching,  we still have to add some 
dirty code in getNextResult() for MergeJoin
// it seem that beginOfInput() is not good enough to solve all the 
problems. 

// it is not so easy if we just move endOfInput() after getNextResult() in 
the case of POStatus.STATUS_EOP:
// i have struggled with regression bugs for days.   
  }

  result = getNextResult();

{code}

Summaries:
Option (a): don't touch non-spark code by reseting the flag. 
>From my point of view, it is just a workaround. it seems there no need to add 
>abstract method beginOfInput().  
Well, I will have a try, and do some testing for beginOfInput()

Option (b): add a flag for each operator in spark mode. These flags only work 
in spark mode, should not affect the logic in MR mode.

> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> --
>
> Key: PIG-4876
> URL: https://issues.apache.org/jira/browse/PIG-4876
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4876.patch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
> input records to constitute the result tuples. The last result tuples are 
> buffered in the operator.  These Operators need a flag to indicate the end of 
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
> buffered tuples in MR mode.  But it does not work with OutputConsumeIterator 
> in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4810) Implement Merge join for spark engine

2016-04-26 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4810:
---
Attachment: PIG-4810-5.patch

change log:  fix UT script. passed in spark mode now.

> Implement Merge join for spark engine
> -
>
> Key: PIG-4810
> URL: https://issues.apache.org/jira/browse/PIG-4810
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4810-2.patch, PIG-4810-3.patch, PIG-4810-4.patch, 
> PIG-4810-5.patch, PIG-4810.patch
>
>
> In current code base(a9151ac), we use regular join to implement merge join in 
> spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4810) Implement Merge join for spark engine

2016-04-25 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4810:
---
Attachment: PIG-4810-4.patch

refine the solution for flushing the last inputs.

> Implement Merge join for spark engine
> -
>
> Key: PIG-4810
> URL: https://issues.apache.org/jira/browse/PIG-4810
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4810-2.patch, PIG-4810-3.patch, PIG-4810-4.patch, 
> PIG-4810.patch
>
>
> In current code base(a9151ac), we use regular join to implement merge join in 
> spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4876) OutputConsumeIterator can't handle the last buffered tuples for some Operators

2016-04-24 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255821#comment-15255821
 ] 

Xianda Ke commented on PIG-4876:


I think your proposal is similar with the solution in PIG-4842 
([PIG-4842-2.patch|https://issues.apache.org/jira/secure/attachment/12795373/PIG-4842-2.patch])
 and PIG-4857( 
[PIG-4857.patch|https://issues.apache.org/jira/secure/attachment/12795997/PIG-4857.patch]),
 which just reset the flag and does not touch non-spark code.
it should work if we refactor the code by implementing the abstract method(like 
beginOfInput()).  

Yes, it minimize non-spark code change, but it seem a bit tricky. 



> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> --
>
> Key: PIG-4876
> URL: https://issues.apache.org/jira/browse/PIG-4876
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4876.patch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
> input records to constitute the result tuples. The last result tuples are 
> buffered in the operator.  These Operators need a flag to indicate the end of 
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
> buffered tuples in MR mode.  But it does not work with OutputConsumeIterator 
> in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4876) OutputConsumeIterator can't handle the last buffered tuples for some Operators

2016-04-24 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255800#comment-15255800
 ] 

Xianda Ke commented on PIG-4876:


Hi [~mohitsabharwal],  yes. your summary is correct. 


The testcase in PIG-4842 failed because there are two ColloctedGroup operators 
which share the same endOfAllInput flag. 

TestCollectedGroup do NOT fail, all the testcases pass, because there is NO 
such testcase like the the case in PIG-4842.

It seems that adding some testcases for spark mode would be better.

> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> --
>
> Key: PIG-4876
> URL: https://issues.apache.org/jira/browse/PIG-4876
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4876.patch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
> input records to constitute the result tuples. The last result tuples are 
> buffered in the operator.  These Operators need a flag to indicate the end of 
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
> buffered tuples in MR mode.  But it does not work with OutputConsumeIterator 
> in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-4876) OutputConsumeIterator can't handle the last buffered tuples for some Operators

2016-04-18 Thread Xianda Ke (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianda Ke updated PIG-4876:
---
Attachment: PIG-4876.patch

hi [~mohitsabharwal], PIG-4876.patch is attached. please help review. Thanks.

> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> --
>
> Key: PIG-4876
> URL: https://issues.apache.org/jira/browse/PIG-4876
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4876.patch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
> input records to constitute the result tuples. The last result tuples are 
> buffered in the operator.  These Operators need a flag to indicate the end of 
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
> buffered tuples in MR mode.  But it does not work with OutputConsumeIterator 
> in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4876) OutputConsumeIterator can't handle the last buffered tuples for some Operators

2016-04-18 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247024#comment-15247024
 ] 

Xianda Ke commented on PIG-4876:


the expected result of D:
{code}
((1,1),{((1,1),{(1,1)},{(1,1)})})
((1,2),{((1,2),{(1,2)},{(1,2)})})
((1,3),{((1,3),{(1,3)},{(1,3)})})
((2,1),{((2,1),{(2,1)},{(2,1)})})
((2,2),{((2,2),{(2,2)},{(2,2)})})
((2,3),{((2,3),{(2,3)},{(2,3)})})
((3,1),{((3,1),{(3,1)},{(3,1)})})
((3,2),{((3,2),{(3,2)},{(3,2)})})
((3,3),{((3,3),{(3,3)},{(3,3)})})
{code}

> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> --
>
> Key: PIG-4876
> URL: https://issues.apache.org/jira/browse/PIG-4876
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
> input records to constitute the result tuples. The last result tuples are 
> buffered in the operator.  These Operators need a flag to indicate the end of 
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
> buffered tuples in MR mode.  But it does not work with OutputConsumeIterator 
> in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4876) OutputConsumeIterator can't handle the last buffered tuples for some Operators

2016-04-18 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247008#comment-15247008
 ] 

Xianda Ke commented on PIG-4876:


add a testcase for this jira.

cat input.txt
{code}
1   1
1   2
1   3
2   1
2   2
2   3
3   1
3   2
3   3
{code}

test.pig
{code}
register myudfs.jar;
A = load 'input.txt' using myudfs.DummyCollectableLoader() as (c1:chararray, 
c2:chararray);
B = load 'input.txt' using myudfs.DummyIndexableLoader() as (c1:chararray, 
c2:chararray);
C = cogroup A by (c1,c2), B by (c1, c2) using 'merge';
D = group C by $0 using 'collected';
dump D;
E = stream C through ` awk '{ print $0 }'`;
dump E;
{code}

The expected results should be:
dump D;
{code}
((1,1),{(1,1)},{(1,1)})
((1,2),{(1,2)},{(1,2)})
((1,3),{(1,3)},{(1,3)})
((2,1),{(2,1)},{(2,1)})
((2,2),{(2,2)},{(2,2)})
((2,3),{(2,3)},{(2,3)})
((3,1),{(3,1)},{(3,1)})
((3,2),{(3,2)},{(3,2)})
((3,3),{(3,3)},{(3,3)})
{code}
dump E;
{code}
((1,1),{(1,1)},{(1,1)})
((1,2),{(1,2)},{(1,2)})
((1,3),{(1,3)},{(1,3)})
((2,1),{(2,1)},{(2,1)})
((2,2),{(2,2)},{(2,2)})
((2,3),{(2,3)},{(2,3)})
((3,1),{(3,1)},{(3,1)})
((3,2),{(3,2)},{(3,2)})
((3,3),{(3,3)},{(3,3)})
{code}

> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> --
>
> Key: PIG-4876
> URL: https://issues.apache.org/jira/browse/PIG-4876
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
> input records to constitute the result tuples. The last result tuples are 
> buffered in the operator.  These Operators need a flag to indicate the end of 
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
> buffered tuples in MR mode.  But it does not work with OutputConsumeIterator 
> in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PIG-4876) OutputConsumeIterator can't handle the last buffered tuples for some Operators

2016-04-18 Thread Xianda Ke (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15246982#comment-15246982
 ] 

Xianda Ke commented on PIG-4876:


Let me explain in details:
h6. 1. The Differences  between MR and Spark engine
The movement of tuples through the execution pipeline, is a pull( iterator) 
model. If an operator is asked to produce a tuple, it return a tuple when it is 
finished or return a pause signal(Result.returnStatus) to indicate that it is 
not finished but also not able to produce an output tuple at this time. 
 
In MR mode, an operator pull data from predecessor operator by directly calling 
PhysicalOperator.getNextTuple(). The signals(Result.returnStatus) propagate 
down the chain of operators, the signals were hanlded in 
PigGenericMapbase.runPipeline(). 

In Spark mode,  to support Spark engine, OutputConsumeIterator has to implement 
the iterator interface. OutputConsumeIterator is like a adapter of 
PhysicalOperator .  Like PigGenericMapbase.runPipeline(),  
OutputConsumeIterator has to handle these signals(Result.returnStatus).  
OutputConsumeIterator won't directly call predecessor's getNextTuple() to get 
the tuple from predecesso, it will use predecessor's iterator to get the tuple 
from predecessor and then attach the tuple for processing.  
OutputConsumeIterator will also buffer a result tuple when hasNext() is called. 

h6. 2. Why does endOfInput () of OutputConsumeIterator NOT work currently?
The flag endOfAllInput is placed in PhysicalPlan and is shared by all the 
operators in the same Plan. In MR mode, pig will run the pipeline one more 
time.  Currently, the implementation of the operators( such as Stream, 
CollectedGroup, etc) use returnStatus of processInp() and the shared flag 
endOfAllInput as conditions to flush the buffered tuples. These 
implementationis tightly coupling with this mechanism in MR mode. It works in 
MR mode.
However, in Spark mode, when the shared flag endOfAllInput is set as true by 
predecessor operators,  but current operator maybe still has input records. At 
this time, OutputConsumeIterator may find that output tuples of current 
operator is not finished, and try to call operator's getNextTuple() again.  
operator's getNextTuple() think it has been at the end of its input. Oops...

h6. 3. How to solve it?
When a predecessor operators reach at the end of its input. Because predecessor 
 operator buffer some output tuples, the successors don't reach at the end of 
its input. Semantically, it is suitable place a flag in each Operator. 
We don't reuse the shared flag endOfAllInput, which works only in MR mode. If 
reuse endOfAllInput in Spark mode, it would be very tricky and complex to 
handle the resultStatus signals in OutputConsumeIterator.  We add a flag to 
indicates the end of its own input for some Operators. It  make life easier and 
the code look more intuitive.

> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> --
>
> Key: PIG-4876
> URL: https://issues.apache.org/jira/browse/PIG-4876
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Xianda Ke
>Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
> input records to constitute the result tuples. The last result tuples are 
> buffered in the operator.  These Operators need a flag to indicate the end of 
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
> buffered tuples in MR mode.  But it does not work with OutputConsumeIterator 
> in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (PIG-4876) OutputConsumeIterator can't handle the last buffered tuples for some Operators

2016-04-18 Thread Xianda Ke (JIRA)
Xianda Ke created PIG-4876:
--

 Summary: OutputConsumeIterator can't handle the last buffered 
tuples for some Operators
 Key: PIG-4876
 URL: https://issues.apache.org/jira/browse/PIG-4876
 Project: Pig
  Issue Type: Sub-task
Reporter: Xianda Ke
Assignee: Xianda Ke


Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
input records to constitute the result tuples. The last result tuples are 
buffered in the operator.  These Operators need a flag to indicate the end of 
input, so that they can flush and constitute their last tuples.
Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
buffered tuples in MR mode.  But it does not work with OutputConsumeIterator in 
Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >