[jira] [Updated] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly

2016-11-03 Thread Xuefu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated PIG-5051:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Committed to spark branch. Thanks, Liyun!

> Initialize PigContants.TASK_INDEX in spark mode correctly
> -
>
> Key: PIG-5051
> URL: https://issues.apache.org/jira/browse/PIG-5051
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4920_6_5051.patch, PIG-5051.patch
>
>
> in MR, we initialize PigContants.TASK_INDEX in  
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
>  
> {code}
> protected void setup(Context context) throws IOException, 
> InterruptedException {
>...
> context.getConfiguration().set(PigConstants.TASK_INDEX, 
> Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
> ...
> }
> {code}
> But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to 
> initialize PigContants.TASK_INDEX when job starts. We need find a solution to 
> initialize PigContants.TASK_INDEX correctly.
> After this jira is fixed.  The behavior of TestBuiltin#testUniqueID in spark 
> mode will be same with what in mr.
> Now we divide two cases in  TestBuiltin#testUniqueID
> {code}
>  @Test
> public void testUniqueID() throws Exception {
>  ...
> if (!Util.isSparkExecType(cluster.getExecType())) {
> assertEquals("0-0", iter.next().get(1));
> assertEquals("0-1", iter.next().get(1));
> assertEquals("0-2", iter.next().get(1));
> assertEquals("0-3", iter.next().get(1));
> assertEquals("0-4", iter.next().get(1));
> assertEquals("1-0", iter.next().get(1));
> assertEquals("1-1", iter.next().get(1));
> assertEquals("1-2", iter.next().get(1));
> assertEquals("1-3", iter.next().get(1));
> assertEquals("1-4", iter.next().get(1));
> } else {
> // because we set PigConstants.TASK_INDEX as 0 in
> // ForEachConverter#ForEachFunction#initializeJobConf
> // UniqueID.exec() will output like 0-*
> // the behavior of spark will be same with mr until PIG-5051 is 
> fixed.
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> }
>...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly

2016-10-27 Thread liyunzhang_intel (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-5051:
--
Status: Patch Available  (was: Open)

> Initialize PigContants.TASK_INDEX in spark mode correctly
> -
>
> Key: PIG-5051
> URL: https://issues.apache.org/jira/browse/PIG-5051
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
>Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4920_6_5051.patch, PIG-5051.patch
>
>
> in MR, we initialize PigContants.TASK_INDEX in  
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
>  
> {code}
> protected void setup(Context context) throws IOException, 
> InterruptedException {
>...
> context.getConfiguration().set(PigConstants.TASK_INDEX, 
> Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
> ...
> }
> {code}
> But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to 
> initialize PigContants.TASK_INDEX when job starts. We need find a solution to 
> initialize PigContants.TASK_INDEX correctly.
> After this jira is fixed.  The behavior of TestBuiltin#testUniqueID in spark 
> mode will be same with what in mr.
> Now we divide two cases in  TestBuiltin#testUniqueID
> {code}
>  @Test
> public void testUniqueID() throws Exception {
>  ...
> if (!Util.isSparkExecType(cluster.getExecType())) {
> assertEquals("0-0", iter.next().get(1));
> assertEquals("0-1", iter.next().get(1));
> assertEquals("0-2", iter.next().get(1));
> assertEquals("0-3", iter.next().get(1));
> assertEquals("0-4", iter.next().get(1));
> assertEquals("1-0", iter.next().get(1));
> assertEquals("1-1", iter.next().get(1));
> assertEquals("1-2", iter.next().get(1));
> assertEquals("1-3", iter.next().get(1));
> assertEquals("1-4", iter.next().get(1));
> } else {
> // because we set PigConstants.TASK_INDEX as 0 in
> // ForEachConverter#ForEachFunction#initializeJobConf
> // UniqueID.exec() will output like 0-*
> // the behavior of spark will be same with mr until PIG-5051 is 
> fixed.
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> }
>...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly

2016-10-27 Thread liyunzhang_intel (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-5051:
--
Attachment: PIG-4920_6_5051.patch

[~kexianda]: As PIG-4920_6.patch seems not be checked in although the status is 
resolved.  Please use PIG-4920_6_5051.patch to review first. I will contact 
[~xuefuz] to recover the spark branch.

> Initialize PigContants.TASK_INDEX in spark mode correctly
> -
>
> Key: PIG-5051
> URL: https://issues.apache.org/jira/browse/PIG-5051
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4920_6_5051.patch, PIG-5051.patch
>
>
> in MR, we initialize PigContants.TASK_INDEX in  
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
>  
> {code}
> protected void setup(Context context) throws IOException, 
> InterruptedException {
>...
> context.getConfiguration().set(PigConstants.TASK_INDEX, 
> Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
> ...
> }
> {code}
> But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to 
> initialize PigContants.TASK_INDEX when job starts. We need find a solution to 
> initialize PigContants.TASK_INDEX correctly.
> After this jira is fixed.  The behavior of TestBuiltin#testUniqueID in spark 
> mode will be same with what in mr.
> Now we divide two cases in  TestBuiltin#testUniqueID
> {code}
>  @Test
> public void testUniqueID() throws Exception {
>  ...
> if (!Util.isSparkExecType(cluster.getExecType())) {
> assertEquals("0-0", iter.next().get(1));
> assertEquals("0-1", iter.next().get(1));
> assertEquals("0-2", iter.next().get(1));
> assertEquals("0-3", iter.next().get(1));
> assertEquals("0-4", iter.next().get(1));
> assertEquals("1-0", iter.next().get(1));
> assertEquals("1-1", iter.next().get(1));
> assertEquals("1-2", iter.next().get(1));
> assertEquals("1-3", iter.next().get(1));
> assertEquals("1-4", iter.next().get(1));
> } else {
> // because we set PigConstants.TASK_INDEX as 0 in
> // ForEachConverter#ForEachFunction#initializeJobConf
> // UniqueID.exec() will output like 0-*
> // the behavior of spark will be same with mr until PIG-5051 is 
> fixed.
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> }
>...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly

2016-10-27 Thread liyunzhang_intel (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-5051:
--
Attachment: PIG-5051.patch

[~kexianda]: 
before we set PigConstants.TASK_INDEX in SparkUtil#newJobConf and set the value 
as "0". Spark provides 
[api|https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/TaskContext.html#partitionId()]
 to task id which is similar to taskid in mapreduce.

Modification in PIG-5051.patch:
1. set the value of 
PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX) by spark api 
TaskContext.get().partitionId() in LoadConverter.java
2. remove previous code about PigConstants.TASK_INDEX in SparkUtil.java
3. modify code related to spark mode in TestBuiltIn#testUniqueID  as now the 
value of PigConstants.TASK_INDEX is different in different spark tasks in 
UniqueID.java

> Initialize PigContants.TASK_INDEX in spark mode correctly
> -
>
> Key: PIG-5051
> URL: https://issues.apache.org/jira/browse/PIG-5051
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5051.patch
>
>
> in MR, we initialize PigContants.TASK_INDEX in  
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
>  
> {code}
> protected void setup(Context context) throws IOException, 
> InterruptedException {
>...
> context.getConfiguration().set(PigConstants.TASK_INDEX, 
> Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
> ...
> }
> {code}
> But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to 
> initialize PigContants.TASK_INDEX when job starts. We need find a solution to 
> initialize PigContants.TASK_INDEX correctly.
> After this jira is fixed.  The behavior of TestBuiltin#testUniqueID in spark 
> mode will be same with what in mr.
> Now we divide two cases in  TestBuiltin#testUniqueID
> {code}
>  @Test
> public void testUniqueID() throws Exception {
>  ...
> if (!Util.isSparkExecType(cluster.getExecType())) {
> assertEquals("0-0", iter.next().get(1));
> assertEquals("0-1", iter.next().get(1));
> assertEquals("0-2", iter.next().get(1));
> assertEquals("0-3", iter.next().get(1));
> assertEquals("0-4", iter.next().get(1));
> assertEquals("1-0", iter.next().get(1));
> assertEquals("1-1", iter.next().get(1));
> assertEquals("1-2", iter.next().get(1));
> assertEquals("1-3", iter.next().get(1));
> assertEquals("1-4", iter.next().get(1));
> } else {
> // because we set PigConstants.TASK_INDEX as 0 in
> // ForEachConverter#ForEachFunction#initializeJobConf
> // UniqueID.exec() will output like 0-*
> // the behavior of spark will be same with mr until PIG-5051 is 
> fixed.
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> }
>...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly

2016-10-26 Thread liyunzhang_intel (JIRA)

 [ 
https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-5051:
--
Description: 
in MR, we initialize PigContants.TASK_INDEX in  
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
 
{code}
protected void setup(Context context) throws IOException, InterruptedException {
   ...
context.getConfiguration().set(PigConstants.TASK_INDEX, 
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
...
}
{code}
But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to 
initialize PigContants.TASK_INDEX when job starts. We need find a solution to 
initialize PigContants.TASK_INDEX correctly.

After this jira is fixed.  The behavior of TestBuiltin#testUniqueID in spark 
mode will be same with what in mr.
Now we divide two cases in  TestBuiltin#testUniqueID
{code}

 @Test
public void testUniqueID() throws Exception {
 ...
if (!Util.isSparkExecType(cluster.getExecType())) {
assertEquals("0-0", iter.next().get(1));
assertEquals("0-1", iter.next().get(1));
assertEquals("0-2", iter.next().get(1));
assertEquals("0-3", iter.next().get(1));
assertEquals("0-4", iter.next().get(1));
assertEquals("1-0", iter.next().get(1));
assertEquals("1-1", iter.next().get(1));
assertEquals("1-2", iter.next().get(1));
assertEquals("1-3", iter.next().get(1));
assertEquals("1-4", iter.next().get(1));
} else {
// because we set PigConstants.TASK_INDEX as 0 in
// ForEachConverter#ForEachFunction#initializeJobConf
// UniqueID.exec() will output like 0-*
// the behavior of spark will be same with mr until PIG-5051 is 
fixed.
assertEquals(iter.next().get(1), "0-0");
assertEquals(iter.next().get(1), "0-1");
assertEquals(iter.next().get(1), "0-2");
assertEquals(iter.next().get(1), "0-3");
assertEquals(iter.next().get(1), "0-4");
assertEquals(iter.next().get(1), "0-0");
assertEquals(iter.next().get(1), "0-1");
assertEquals(iter.next().get(1), "0-2");
assertEquals(iter.next().get(1), "0-3");
assertEquals(iter.next().get(1), "0-4");
}
   ...
}
{code}

  was:
in MR, we initialize PigContants.TASK_INDEX in  
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
 
{code}
protected void setup(Context context) throws IOException, InterruptedException {
   ...
context.getConfiguration().set(PigConstants.TASK_INDEX, 
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
...
}
{code}
But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to 
initialize PigContants.TASK_INDEX when job starts. We need find a solution to 
initialize PigContants.TASK_INDEX correctly.


> Initialize PigContants.TASK_INDEX in spark mode correctly
> -
>
> Key: PIG-5051
> URL: https://issues.apache.org/jira/browse/PIG-5051
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: liyunzhang_intel
> Fix For: spark-branch
>
>
> in MR, we initialize PigContants.TASK_INDEX in  
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
>  
> {code}
> protected void setup(Context context) throws IOException, 
> InterruptedException {
>...
> context.getConfiguration().set(PigConstants.TASK_INDEX, 
> Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
> ...
> }
> {code}
> But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to 
> initialize PigContants.TASK_INDEX when job starts. We need find a solution to 
> initialize PigContants.TASK_INDEX correctly.
> After this jira is fixed.  The behavior of TestBuiltin#testUniqueID in spark 
> mode will be same with what in mr.
> Now we divide two cases in  TestBuiltin#testUniqueID
> {code}
>  @Test
> public void testUniqueID() throws Exception {
>  ...
> if (!Util.isSparkExecType(cluster.getExecType())) {
> assertEquals("0-0", iter.next().get(1));
> assertEquals("0-1", iter.next().get(1));
> assertEquals("0-2", iter.next().get(1));
> assertEquals("0-3", iter.next().get(1));
> assertEquals("0-4", iter.next().get(1));
> assertEquals("1-0", iter.next().get(1));
> assertEquals("1-1", iter.next().get(1));
> assertEquals("1-2", iter.next().get(1));
> assertEquals("1-3", iter.next().get(1));
> assertEquals("1-4", iter.next().get(1));
> } else {
> // because we set PigConstants.TASK_INDE