[jira] [Updated] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly
[ https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang updated PIG-5051: - Resolution: Fixed Status: Resolved (was: Patch Available) Committed to spark branch. Thanks, Liyun! > Initialize PigContants.TASK_INDEX in spark mode correctly > - > > Key: PIG-5051 > URL: https://issues.apache.org/jira/browse/PIG-5051 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-4920_6_5051.patch, PIG-5051.patch > > > in MR, we initialize PigContants.TASK_INDEX in > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup > > {code} > protected void setup(Context context) throws IOException, > InterruptedException { >... > context.getConfiguration().set(PigConstants.TASK_INDEX, > Integer.toString(context.getTaskAttemptID().getTaskID().getId())); > ... > } > {code} > But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to > initialize PigContants.TASK_INDEX when job starts. We need find a solution to > initialize PigContants.TASK_INDEX correctly. > After this jira is fixed. The behavior of TestBuiltin#testUniqueID in spark > mode will be same with what in mr. > Now we divide two cases in TestBuiltin#testUniqueID > {code} > @Test > public void testUniqueID() throws Exception { > ... > if (!Util.isSparkExecType(cluster.getExecType())) { > assertEquals("0-0", iter.next().get(1)); > assertEquals("0-1", iter.next().get(1)); > assertEquals("0-2", iter.next().get(1)); > assertEquals("0-3", iter.next().get(1)); > assertEquals("0-4", iter.next().get(1)); > assertEquals("1-0", iter.next().get(1)); > assertEquals("1-1", iter.next().get(1)); > assertEquals("1-2", iter.next().get(1)); > assertEquals("1-3", iter.next().get(1)); > assertEquals("1-4", iter.next().get(1)); > } else { > // because we set PigConstants.TASK_INDEX as 0 in > // ForEachConverter#ForEachFunction#initializeJobConf > // UniqueID.exec() will output like 0-* > // the behavior of spark will be same with mr until PIG-5051 is > fixed. > assertEquals(iter.next().get(1), "0-0"); > assertEquals(iter.next().get(1), "0-1"); > assertEquals(iter.next().get(1), "0-2"); > assertEquals(iter.next().get(1), "0-3"); > assertEquals(iter.next().get(1), "0-4"); > assertEquals(iter.next().get(1), "0-0"); > assertEquals(iter.next().get(1), "0-1"); > assertEquals(iter.next().get(1), "0-2"); > assertEquals(iter.next().get(1), "0-3"); > assertEquals(iter.next().get(1), "0-4"); > } >... > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly
[ https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated PIG-5051: -- Status: Patch Available (was: Open) > Initialize PigContants.TASK_INDEX in spark mode correctly > - > > Key: PIG-5051 > URL: https://issues.apache.org/jira/browse/PIG-5051 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-4920_6_5051.patch, PIG-5051.patch > > > in MR, we initialize PigContants.TASK_INDEX in > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup > > {code} > protected void setup(Context context) throws IOException, > InterruptedException { >... > context.getConfiguration().set(PigConstants.TASK_INDEX, > Integer.toString(context.getTaskAttemptID().getTaskID().getId())); > ... > } > {code} > But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to > initialize PigContants.TASK_INDEX when job starts. We need find a solution to > initialize PigContants.TASK_INDEX correctly. > After this jira is fixed. The behavior of TestBuiltin#testUniqueID in spark > mode will be same with what in mr. > Now we divide two cases in TestBuiltin#testUniqueID > {code} > @Test > public void testUniqueID() throws Exception { > ... > if (!Util.isSparkExecType(cluster.getExecType())) { > assertEquals("0-0", iter.next().get(1)); > assertEquals("0-1", iter.next().get(1)); > assertEquals("0-2", iter.next().get(1)); > assertEquals("0-3", iter.next().get(1)); > assertEquals("0-4", iter.next().get(1)); > assertEquals("1-0", iter.next().get(1)); > assertEquals("1-1", iter.next().get(1)); > assertEquals("1-2", iter.next().get(1)); > assertEquals("1-3", iter.next().get(1)); > assertEquals("1-4", iter.next().get(1)); > } else { > // because we set PigConstants.TASK_INDEX as 0 in > // ForEachConverter#ForEachFunction#initializeJobConf > // UniqueID.exec() will output like 0-* > // the behavior of spark will be same with mr until PIG-5051 is > fixed. > assertEquals(iter.next().get(1), "0-0"); > assertEquals(iter.next().get(1), "0-1"); > assertEquals(iter.next().get(1), "0-2"); > assertEquals(iter.next().get(1), "0-3"); > assertEquals(iter.next().get(1), "0-4"); > assertEquals(iter.next().get(1), "0-0"); > assertEquals(iter.next().get(1), "0-1"); > assertEquals(iter.next().get(1), "0-2"); > assertEquals(iter.next().get(1), "0-3"); > assertEquals(iter.next().get(1), "0-4"); > } >... > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly
[ https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated PIG-5051: -- Attachment: PIG-4920_6_5051.patch [~kexianda]: As PIG-4920_6.patch seems not be checked in although the status is resolved. Please use PIG-4920_6_5051.patch to review first. I will contact [~xuefuz] to recover the spark branch. > Initialize PigContants.TASK_INDEX in spark mode correctly > - > > Key: PIG-5051 > URL: https://issues.apache.org/jira/browse/PIG-5051 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-4920_6_5051.patch, PIG-5051.patch > > > in MR, we initialize PigContants.TASK_INDEX in > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup > > {code} > protected void setup(Context context) throws IOException, > InterruptedException { >... > context.getConfiguration().set(PigConstants.TASK_INDEX, > Integer.toString(context.getTaskAttemptID().getTaskID().getId())); > ... > } > {code} > But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to > initialize PigContants.TASK_INDEX when job starts. We need find a solution to > initialize PigContants.TASK_INDEX correctly. > After this jira is fixed. The behavior of TestBuiltin#testUniqueID in spark > mode will be same with what in mr. > Now we divide two cases in TestBuiltin#testUniqueID > {code} > @Test > public void testUniqueID() throws Exception { > ... > if (!Util.isSparkExecType(cluster.getExecType())) { > assertEquals("0-0", iter.next().get(1)); > assertEquals("0-1", iter.next().get(1)); > assertEquals("0-2", iter.next().get(1)); > assertEquals("0-3", iter.next().get(1)); > assertEquals("0-4", iter.next().get(1)); > assertEquals("1-0", iter.next().get(1)); > assertEquals("1-1", iter.next().get(1)); > assertEquals("1-2", iter.next().get(1)); > assertEquals("1-3", iter.next().get(1)); > assertEquals("1-4", iter.next().get(1)); > } else { > // because we set PigConstants.TASK_INDEX as 0 in > // ForEachConverter#ForEachFunction#initializeJobConf > // UniqueID.exec() will output like 0-* > // the behavior of spark will be same with mr until PIG-5051 is > fixed. > assertEquals(iter.next().get(1), "0-0"); > assertEquals(iter.next().get(1), "0-1"); > assertEquals(iter.next().get(1), "0-2"); > assertEquals(iter.next().get(1), "0-3"); > assertEquals(iter.next().get(1), "0-4"); > assertEquals(iter.next().get(1), "0-0"); > assertEquals(iter.next().get(1), "0-1"); > assertEquals(iter.next().get(1), "0-2"); > assertEquals(iter.next().get(1), "0-3"); > assertEquals(iter.next().get(1), "0-4"); > } >... > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly
[ https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated PIG-5051: -- Attachment: PIG-5051.patch [~kexianda]: before we set PigConstants.TASK_INDEX in SparkUtil#newJobConf and set the value as "0". Spark provides [api|https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/TaskContext.html#partitionId()] to task id which is similar to taskid in mapreduce. Modification in PIG-5051.patch: 1. set the value of PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX) by spark api TaskContext.get().partitionId() in LoadConverter.java 2. remove previous code about PigConstants.TASK_INDEX in SparkUtil.java 3. modify code related to spark mode in TestBuiltIn#testUniqueID as now the value of PigConstants.TASK_INDEX is different in different spark tasks in UniqueID.java > Initialize PigContants.TASK_INDEX in spark mode correctly > - > > Key: PIG-5051 > URL: https://issues.apache.org/jira/browse/PIG-5051 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5051.patch > > > in MR, we initialize PigContants.TASK_INDEX in > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup > > {code} > protected void setup(Context context) throws IOException, > InterruptedException { >... > context.getConfiguration().set(PigConstants.TASK_INDEX, > Integer.toString(context.getTaskAttemptID().getTaskID().getId())); > ... > } > {code} > But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to > initialize PigContants.TASK_INDEX when job starts. We need find a solution to > initialize PigContants.TASK_INDEX correctly. > After this jira is fixed. The behavior of TestBuiltin#testUniqueID in spark > mode will be same with what in mr. > Now we divide two cases in TestBuiltin#testUniqueID > {code} > @Test > public void testUniqueID() throws Exception { > ... > if (!Util.isSparkExecType(cluster.getExecType())) { > assertEquals("0-0", iter.next().get(1)); > assertEquals("0-1", iter.next().get(1)); > assertEquals("0-2", iter.next().get(1)); > assertEquals("0-3", iter.next().get(1)); > assertEquals("0-4", iter.next().get(1)); > assertEquals("1-0", iter.next().get(1)); > assertEquals("1-1", iter.next().get(1)); > assertEquals("1-2", iter.next().get(1)); > assertEquals("1-3", iter.next().get(1)); > assertEquals("1-4", iter.next().get(1)); > } else { > // because we set PigConstants.TASK_INDEX as 0 in > // ForEachConverter#ForEachFunction#initializeJobConf > // UniqueID.exec() will output like 0-* > // the behavior of spark will be same with mr until PIG-5051 is > fixed. > assertEquals(iter.next().get(1), "0-0"); > assertEquals(iter.next().get(1), "0-1"); > assertEquals(iter.next().get(1), "0-2"); > assertEquals(iter.next().get(1), "0-3"); > assertEquals(iter.next().get(1), "0-4"); > assertEquals(iter.next().get(1), "0-0"); > assertEquals(iter.next().get(1), "0-1"); > assertEquals(iter.next().get(1), "0-2"); > assertEquals(iter.next().get(1), "0-3"); > assertEquals(iter.next().get(1), "0-4"); > } >... > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly
[ https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated PIG-5051: -- Description: in MR, we initialize PigContants.TASK_INDEX in org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup {code} protected void setup(Context context) throws IOException, InterruptedException { ... context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId())); ... } {code} But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to initialize PigContants.TASK_INDEX when job starts. We need find a solution to initialize PigContants.TASK_INDEX correctly. After this jira is fixed. The behavior of TestBuiltin#testUniqueID in spark mode will be same with what in mr. Now we divide two cases in TestBuiltin#testUniqueID {code} @Test public void testUniqueID() throws Exception { ... if (!Util.isSparkExecType(cluster.getExecType())) { assertEquals("0-0", iter.next().get(1)); assertEquals("0-1", iter.next().get(1)); assertEquals("0-2", iter.next().get(1)); assertEquals("0-3", iter.next().get(1)); assertEquals("0-4", iter.next().get(1)); assertEquals("1-0", iter.next().get(1)); assertEquals("1-1", iter.next().get(1)); assertEquals("1-2", iter.next().get(1)); assertEquals("1-3", iter.next().get(1)); assertEquals("1-4", iter.next().get(1)); } else { // because we set PigConstants.TASK_INDEX as 0 in // ForEachConverter#ForEachFunction#initializeJobConf // UniqueID.exec() will output like 0-* // the behavior of spark will be same with mr until PIG-5051 is fixed. assertEquals(iter.next().get(1), "0-0"); assertEquals(iter.next().get(1), "0-1"); assertEquals(iter.next().get(1), "0-2"); assertEquals(iter.next().get(1), "0-3"); assertEquals(iter.next().get(1), "0-4"); assertEquals(iter.next().get(1), "0-0"); assertEquals(iter.next().get(1), "0-1"); assertEquals(iter.next().get(1), "0-2"); assertEquals(iter.next().get(1), "0-3"); assertEquals(iter.next().get(1), "0-4"); } ... } {code} was: in MR, we initialize PigContants.TASK_INDEX in org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup {code} protected void setup(Context context) throws IOException, InterruptedException { ... context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId())); ... } {code} But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to initialize PigContants.TASK_INDEX when job starts. We need find a solution to initialize PigContants.TASK_INDEX correctly. > Initialize PigContants.TASK_INDEX in spark mode correctly > - > > Key: PIG-5051 > URL: https://issues.apache.org/jira/browse/PIG-5051 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel > Fix For: spark-branch > > > in MR, we initialize PigContants.TASK_INDEX in > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup > > {code} > protected void setup(Context context) throws IOException, > InterruptedException { >... > context.getConfiguration().set(PigConstants.TASK_INDEX, > Integer.toString(context.getTaskAttemptID().getTaskID().getId())); > ... > } > {code} > But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to > initialize PigContants.TASK_INDEX when job starts. We need find a solution to > initialize PigContants.TASK_INDEX correctly. > After this jira is fixed. The behavior of TestBuiltin#testUniqueID in spark > mode will be same with what in mr. > Now we divide two cases in TestBuiltin#testUniqueID > {code} > @Test > public void testUniqueID() throws Exception { > ... > if (!Util.isSparkExecType(cluster.getExecType())) { > assertEquals("0-0", iter.next().get(1)); > assertEquals("0-1", iter.next().get(1)); > assertEquals("0-2", iter.next().get(1)); > assertEquals("0-3", iter.next().get(1)); > assertEquals("0-4", iter.next().get(1)); > assertEquals("1-0", iter.next().get(1)); > assertEquals("1-1", iter.next().get(1)); > assertEquals("1-2", iter.next().get(1)); > assertEquals("1-3", iter.next().get(1)); > assertEquals("1-4", iter.next().get(1)); > } else { > // because we set PigConstants.TASK_INDE