[
https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15624415#comment-15624415
]
liyunzhang_intel commented on PIG-5051:
---------------------------------------
[~xuefuz]: please checked in PIG-5051.patch, thanks
> Initialize PigContants.TASK_INDEX in spark mode correctly
> ---------------------------------------------------------
>
> Key: PIG-5051
> URL: https://issues.apache.org/jira/browse/PIG-5051
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4920_6_5051.patch, PIG-5051.patch
>
>
> in MR, we initialize PigContants.TASK_INDEX in
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
>
> {code}
> protected void setup(Context context) throws IOException,
> InterruptedException {
> ...
> context.getConfiguration().set(PigConstants.TASK_INDEX,
> Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
> ...
> }
> {code}
> But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to
> initialize PigContants.TASK_INDEX when job starts. We need find a solution to
> initialize PigContants.TASK_INDEX correctly.
> After this jira is fixed. The behavior of TestBuiltin#testUniqueID in spark
> mode will be same with what in mr.
> Now we divide two cases in TestBuiltin#testUniqueID
> {code}
> @Test
> public void testUniqueID() throws Exception {
> ...
> if (!Util.isSparkExecType(cluster.getExecType())) {
> assertEquals("0-0", iter.next().get(1));
> assertEquals("0-1", iter.next().get(1));
> assertEquals("0-2", iter.next().get(1));
> assertEquals("0-3", iter.next().get(1));
> assertEquals("0-4", iter.next().get(1));
> assertEquals("1-0", iter.next().get(1));
> assertEquals("1-1", iter.next().get(1));
> assertEquals("1-2", iter.next().get(1));
> assertEquals("1-3", iter.next().get(1));
> assertEquals("1-4", iter.next().get(1));
> } else {
> // because we set PigConstants.TASK_INDEX as 0 in
> // ForEachConverter#ForEachFunction#initializeJobConf
> // UniqueID.exec() will output like 0-*
> // the behavior of spark will be same with mr until PIG-5051 is
> fixed.
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> assertEquals(iter.next().get(1), "0-0");
> assertEquals(iter.next().get(1), "0-1");
> assertEquals(iter.next().get(1), "0-2");
> assertEquals(iter.next().get(1), "0-3");
> assertEquals(iter.next().get(1), "0-4");
> }
> ...
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)