Hi all:
Any one can help see this issue?  If it is a bug, I will file  a jira for it . 
If pig code does not deal with this kind of script, please tell me.




Kelly Zhang/Zhang,Liyun
Best Regards


From: Rohini Palaniswamy [mailto:[email protected]]
Sent: Thursday, May 21, 2015 5:59 AM
To: Zhang, Liyun; [email protected]
Cc: Mohit Sabharwal; [email protected]; Xuefu Zhang
Subject: Re: A problem about implicit POSPLIT found in mr mode

Liyun,
    I was out and could not reply soon. Please send questions like this to dev@ 
so that other committers can also look into and answer.

  There is no bug here and behavior is as expected.

 C = group B by id;

D = foreach C {

E = order B by f desc;

F = E.f;

generate group, myudfs.AccumulativeSumBag(F);

};

There is no secondary key optimization applied in this mapreduce plan at all.  
Usually order by (POSort) above would usually be removed and replaced with 
secondary key sort. But in this case, the output of group by is just stored 
into HDFS as there is a split. Then on the map phase of the join, it is loaded 
twice (once for D and once for G) and the inner foreach of those processed and 
then joined in the reduce. since the UDF is executing in the map it is not run 
as accumulator, but run as normal EvalFunc and exec() is called.

 In this case it would have been better if the foreach statements where 
executed in the reduce of the group by and two different outputs stored.  But 
the MultiQueryOptimizer is not applied if secondary key optimization is 
possible.

MultiQueryOptimizer.java
if (successor.getUseSecondaryKey()) {
                log.debug("Splittee " + successor.getOperatorKey().getId()
                        + " uses secondary key, do not merge it");
                continue;
            }

In this case, both the splits have same secondary key as they both do order B 
by f desc;  (which is not the case most of the time) and is possible to apply 
MultiQueryOptimizer. But currently MultiQueryOptimizer does not have that 
intelligence to check and merge into one plan if all of them have same 
secondary key.

Regards,
Rohini


On Tue, May 19, 2015 at 1:53 AM, Zhang, Liyun 
<[email protected]<mailto:[email protected]>> wrote:
Hi Rohini:
   I found a problem when executing following script in mr mode:


testAccumulator.join.pig



REGISTER /home/zly/prj/oss/kellyzly/pig/bin/myudfs.jar;

A = load './testAccumulator.txt' as (id:int,f);

B = foreach A generate id, f, id as t;

C = group B by id;

D = foreach C {

E = order B by f desc;

F = E.f;

generate group, myudfs.AccumulativeSumBag(F);

};

G = foreach C {

E = order B by f desc;

F = E.f;

generate group, myudfs.AccumulativeSumBag(F);

};

H = join D by group, G by group;

store H into 'testAccumulator.join.out';

explain H

cat myudfs/AccumulativeSumBag.java:
package myudfs;

import java.io.IOException;
import java.util.Iterator;
import org.apache.pig.EvalFunc;
import org.apache.pig.Accumulator;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;

/**
* This class is for testing of accumulator udfs
*
*/
public class AccumulativeSumBag extends EvalFunc<String> implements 
Accumulator<String>
{

    StringBuffer sb;

    public AccumulativeSumBag() {
    }

    public void accumulate(Tuple tuple) throws IOException {
        DataBag databag = (DataBag)tuple.get(0);
        if(databag == null)
            return;

        if (sb == null) {
            sb = new StringBuffer();
        }

        Iterator<Tuple> iterator = databag.iterator();
        while(iterator.hasNext()) {
            Tuple t = iterator.next();
            if (t.size()>1 && t.get(1) == null) {
                continue;
            }

            sb.append(t.toString());
        }
    }

    public String getValue() {
        if (sb != null && sb.length()>0) {
            return sb.toString();
        }
        return null;
    }

    public void cleanup() {
        sb = null;
    }

    public String exec(Tuple tuple) throws IOException {
        throw new IOException("exec() should not be called");
    }
}

the error message is:
ava.lang.Exception: org.apache.pig.backend.executionengine.ExecException: ERROR 
0: Exception while executing (Name: H: Local Rearrange[tuple]{int}(fals     e) 
- scope-117 Operator Key: scope-117): 
org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error 
from UDF: myudfs.AccumulativeSum     Bag [exec() should not be called]
5619         at 
org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
5620         at 
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
5621 Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: 
Exception while executing (Name: H: Local Rearrange[tuple]{int}(false) - scope  
   -117 Operator Key: scope-117): 
org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error 
from UDF: myudfs.AccumulativeSumBag [exec(     ) should not be called]
5622         at 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:316)
5623         at 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNextTuple(POLocalRearrange.java:291)
5624         at 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion.getNextTuple(POUnion.java:167)
5625         at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:279)
5626         at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:274)
5627         at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
5628         at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
5629         at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
5630         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
5631         at 
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
5632         at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
5633         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
5634         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
5635         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
5636         at java.lang.Thread.run(Thread.java:744)

Following is the physical plan and mr plan.
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: New For Each(true,true)[tuple] - scope-56
    |   |
    |   Project[bag][1] - scope-54
    |   |
    |   Project[bag][2] - scope-55
    |
    |---H: Package(Packager)[tuple]{int} - scope-49
        |
        |---H: Global Rearrange[tuple] - scope-48
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-50
            |   |   |
            |   |   Project[int][0] - scope-51
            |   |
            |   |---D: New For Each(false,false)[bag] - scope-32
            |       |   |
            |       |   Project[int][0] - scope-22
            |       |   |
            |       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - 
scope-25
            |       |   |
            |       |   |---RelationToExpressionProject[bag][*] - scope-24
            |       |       |
            |       |       |---F: New For Each(false)[bag] - scope-31
            |       |           |   |
            |       |           |   Project[bytearray][1] - scope-29
            |       |           |
            |       |           |---E: POSort[bag]() - scope-28
            |       |               |   |
            |       |               |   Project[bytearray][1] - scope-27
            |       |               |
            |       |               |---Project[bag][1] - scope-26
            |       |
            |       |---C: Filter[bag] - scope-20
            |           |   |
            |           |   Constant(true) - scope-21
            |           |
            |           |---C: Split - scope-19    // here an implicit Split is 
generated
            |               |
            |               |---C: Package(Packager)[tuple]{int} - scope-16
            |                   |
            |                   |---C: Global Rearrange[tuple] - scope-15
            |                       |
            |                       |---C: Local Rearrange[tuple]{int}(false) - 
scope-17
            |                           |   |
            |                           |   Project[int][0] - scope-18
            |                           |
            |                           |---B: New For 
Each(false,false,false)[bag] - scope-14
            |                               |   |
            |                               |   Project[int][0] - scope-7
           |                               |   |
            |                               |   Project[bytearray][1] - scope-9
            |                               |   |
            |                               |   
POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
            |                               |   |
            |                               |   |---Project[int][0] - scope-11
            |                               |
            |                               |---A: New For 
Each(false,false)[bag] - scope-6
            |                                   |   |
            |                                   |   Cast[int] - scope-2
            |                                   |   |
            |                                   |   |---Project[bytearray][0] - 
scope-1
            |                                   |   |
            |                                   |   Project[bytearray][1] - 
scope-4
            |                                   |
            |                                   |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage<http://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage>)
 - scope-0
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-52
                |   |
                |   Project[int][0] - scope-53
                |
                |---G: New For Each(false,false)[bag] - scope-45
                    |   |
                    |   Project[int][0] - scope-35
                    |   |
                    |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - 
scope-38
                    |   |
                    |   |---RelationToExpressionProject[bag][*] - scope-37
                    |       |
                    |       |---F: New For Each(false)[bag] - scope-44
                    |           |   |
                    |           |   Project[bytearray][1] - scope-42
                    |           |
                    |           |---E: POSort[bag]() - scope-41
                    |               |   |
                    |               |   Project[bytearray][1] - scope-40
                    |               |
                    |               |---Project[bag][1] - scope-39
                    |
                    |---C: Filter[bag] - scope-33
                        |   |
                        |   Constant(true) - scope-34
                        |
                        |---C: Split - scope-19
                            |
                            |---C: Package(Packager)[tuple]{int} - scope-16
                                |
                                |---C: Global Rearrange[tuple] - scope-15
                                    |
                                    |---C: Local Rearrange[tuple]{int}(false) - 
scope-17
                                        |   |
                                        |   Project[int][0] - scope-18
                                        |
                                        |---B: New For 
Each(false,false,false)[bag] - scope-14
                                            |   |
                                            |   Project[int][0] - scope-7
                                            |   |
                                            |   Project[bytearray][1] - scope-9
                                            |   |
                                            |   
POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
                                            |   |
                                            |   |---Project[int][0] - scope-11
                                            |
                                            |---A: New For 
Each(false,false)[bag] - scope-6
                                                |   |
                                                |   Cast[int] - scope-2
                                                |   |
                                                |   |---Project[bytearray][0] - 
scope-1
                                                |   |
                                                |   Project[bytearray][1] - 
scope-4
                                                |
                                                |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage<http://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage>)
 - scope-0

#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-58
Map Plan
C: Local Rearrange[tuple]{int}(false) - scope-17
|   |
|   Project[int][0] - scope-18
|
|---B: New For Each(false,false,false)[bag] - scope-14
    |   |
    |   Project[int][0] - scope-7
    |   |
    |   Project[bytearray][1] - scope-9
    |   |
    |   POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
    |   |
    |   |---Project[int][0] - scope-11
    |
    |---A: New For Each(false,false)[bag] - scope-6
        |   |
        |   Cast[int] - scope-2
        |   |
        |   |---Project[bytearray][0] - scope-1
        |   |
        |   Project[bytearray][1] - scope-4
        |
        |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage<http://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage>)
 - scope-0--------
Reduce Plan
Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage<http://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage>)
 - scope-59
|
|---C: Package(Packager)[tuple]{int} - scope-16--------
Global sort: false
----------------

MapReduce node scope-64
Map Plan
Union[tuple] - scope-65
|
|---H: Local Rearrange[tuple]{int}(false) - scope-50
|   |   |
|   |   Project[int][0] - scope-51
|   |
|   |---D: New For Each(false,false)[bag] - scope-32
|       |   |
|       |   Project[int][0] - scope-22
|       |   |
|       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-25
|       |   |
|       |   |---RelationToExpressionProject[bag][*] - scope-24
|       |       |
|       |       |---F: New For Each(false)[bag] - scope-31
|       |           |   |
|       |           |   Project[bytearray][1] - scope-29
|       |           |
|       |           |---E: POSort[bag]() - scope-28   // POSort should be 
deleted in  SecondaryKeyOptimizerUtil.java#applySecondaryKeySort . but it is 
not deleted because the POSplit(scope-19 in physical plan ) makes group and 
foreach in different operators
|       |               |   |
|       |               |   Project[bytearray][1] - scope-27
|       |               |
|       |               |---Project[bag][1] - scope-26
|       |
|       
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage<http://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage>)
 - scope-60
|
|---H: Local Rearrange[tuple]{int}(false) - scope-52
    |   |
    |   Project[int][0] - scope-53
    |
    |---G: New For Each(false,false)[bag] - scope-45
        |   |
        |   Project[int][0] - scope-35
        |   |
        |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-38
        |   |
        |   |---RelationToExpressionProject[bag][*] - scope-37
        |       |
        |       |---F: New For Each(false)[bag] - scope-44
        |           |   |
        |           |   Project[bytearray][1] - scope-42
        |           |
        |           |---E: POSort[bag]() - scope-41
        |               |   |
        |               |   Project[bytearray][1] - scope-40
        |               |
        |               |---Project[bag][1] - scope-39
        |
        
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage<http://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage>)
 - scope-62--------
Reduce Plan
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: Package(JoinPackager(true,true))[tuple]{int} - scope-49--------
Global sort: false
----------------

The reason why it fails is POSort is not deleted when secondary key sort is 
enables(POSort should be deleted in 
SecondaryKeyOptimizerUtil.java#applySecondaryKeySort). If POSplit is not 
deleted, the value “foundUDF “ in AccumulatorOptimizerUtil#addAccumulator is 
false and po_foreach.setAccumulative will not be called. This causes “Caught 
error from UDF: myudfs.AccumulativeSum     Bag [exec() should not be called]”. 
Because an implicit POSplit is generated, when poSplit is encounted in mr plan, 
a new mr Operator is generated(POSplit(scope-19) splits the physical plan into 
MapReduceNode scope-58  and MapReduceNode scope-64) . So 
SecondaryKeyOptimizerUtil.java#applySecondaryKeySort does not work.

AccumulatorOptimizerUtil#addAccumulator
  public static void addAccumulator(PhysicalPlan plan) {
        // See if this is a map-reduce job
        List<PhysicalOperator> pos = plan.getRoots();
        if (pos == null || pos.size() == 0) {
            return;
        }

       // See if this is a POPackage
        PhysicalOperator po_package = pos.get(0);
        if (!po_package.getClass().equals(POPackage.class)) {
            return;
        }

        Packager pkgr = ((POPackage) po_package).getPkgr();
        // Check that this is a standard package, not a subclass
        if (!pkgr.getClass().equals(Packager.class)) {
            return;
        }

        // if POPackage is for distinct, just return
        if (pkgr.isDistinct()) {
            return;
        }

        // if any input to POPackage is inner, just return
        boolean[] isInner = pkgr.getInner();
        for (boolean b: isInner) {
            if (b) {
                return;
            }
        }

        List<PhysicalOperator> l = plan.getSuccessors(po_package);
        // there should be only one POForEach
        if (l == null || l.size() == 0 || l.size() > 1) {
            return;
        }

        PhysicalOperator po_foreach = l.get(0);
        if (!(po_foreach instanceof POForEach)) {
            return;
        }

        boolean foundUDF = false;
        List<PhysicalPlan> list = ((POForEach)po_foreach).getInputPlans();
        for (PhysicalPlan p: list) {
            PhysicalOperator po = p.getLeaves().get(0);

            // only expression operators are allowed
            if (!(po instanceof ExpressionOperator)) {
                return;
            }

            if (((ExpressionOperator)po).containUDF()) {
                foundUDF = true;
            }

            if (!check(po)) {
                return;
            }
        }

        if (foundUDF) {
            // if all tests are passed, reducer can run in accumulative mode
            LOG.info("Reducer is to run in accumulative mode.");
            po_package.setAccumulative();
            po_foreach.setAccumulative();
        }
    }


My question: is it a bug  or pig does not deal with this kind of script case 
when implicit posplit is generated when secondary key optimization enables ?



Kelly Zhang/Zhang,Liyun
Best Regards


Reply via email to