[ 
https://issues.apache.org/jira/browse/PIG-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15208067#comment-15208067
 ] 

Xianda Ke commented on PIG-4848:
--------------------------------

In MR mode, the flag was set as true internally for a merge join.
{code}
MRCompiler.visitMergeJoin() {
     //...
     curMROp.noCombineSmallSplits();
     //...
}
{code}
and
{code}
JobControlCompiler.getJob() {
    //..
    if (!mro.combineSmallSplits() || 
pigContext.getProperties().getProperty("pig.splitCombination", 
"true").equals("false"))
                conf.setBoolean("pig.noSplitCombination", true);
    //..
}

{code}

However, it doesn't work now in MR mode. The output is still out of order, 
because the input splits of pig will be sorted again based on size by hadoop.
{code:title=org.apache.hadoop.mapreduce.JobSubmitter.java}
writeNewSplits () {
List<InputSplit> splits = input.getSplits(job);
    //...
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
}
{code}

In spark mode, there is no such sorting. if we set pig.noSplitCombination=true 
internally, it should work.

> pig.noSplitCombination=true should always be set internally for a merge join
> ----------------------------------------------------------------------------
>
>                 Key: PIG-4848
>                 URL: https://issues.apache.org/jira/browse/PIG-4848
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Xianda Ke
>            Assignee: Xianda Ke
>             Fix For: spark-branch
>
>
> In spark mode, for a merge join, the flag is NOT set as true internally. The 
> input splits will be in the order of file size. The output is out of order.
> Scenaro:
> cat input1
> {code}
> 1     1
> {code}
> cat input2
> {code}
> 2     2
> {code}
> cat input3
> {code}
> 33    33
> {code}
> A = LOAD 'input*' as (a:int, b:int);
> B = LOAD 'input*' as (a:int, b:int);
> C = JOIN A BY $0, B BY $0 USING 'merge';
> DUMP C;
> expected result:
> {code}
> (1,1,1,1)
> (2,2,2,2)
> (33,33,33,33)
> {code}
> actual result:
> {code}
> (33,33,33,33)
> (1,1,1,1)
> (2,2,2,2)
> {code}
> In MR mode, the flag was set as true internally for a merge join(see: 
> PIG-2773). However, it doesn't work now. The output is still out of order, 
> because the splits will be ordered again by hadoop-client. In spark mode, we 
> can solve this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to