Hi all
I found that:
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler#compile
Line 384~433 this code is for dealing with the situation where a load has
predecessors. Can anyone to give me a sample script that a load has precessors?
else if (predecessors != null && predecessors.size() > 0) {
// When processing an entire script (multiquery), we can
// get into a situation where a load has
// predecessors. This means that it depends on some store
// earlier in the plan. We need to take that dependency
// and connect the respective MR operators, while at the
// same time removing the connection between the Physical
// operators. That way the jobs will run in the right
// order.
if (op instanceof POLoad) {
if (predecessors.size() != 1) {
int errCode = 2125;
String msg = "Expected at most one predecessor of load. Got
"+predecessors.size();
throw new PlanException(msg, errCode, PigException.BUG);
}
PhysicalOperator p = predecessors.get(0);
MapReduceOper oper = null;
if(p instanceof POStore || p instanceof PONative){
oper = phyToMROpMap.get(p);
}else{
int errCode = 2126;
String msg = "Predecessor of load should be a store or
mapreduce operator. Got "+p.getClass();
throw new PlanException(msg, errCode, PigException.BUG);
}
// Need new operator
curMROp = getMROp();
curMROp.mapPlan.add(op);
MRPlan.add(curMROp);
plan.disconnect(op, p);
MRPlan.connect(oper, curMROp);
phyToMROpMap.put(op, curMROp);
return;
}
Collections.sort(predecessors);
compiledInputs = new MapReduceOper[predecessors.size()];
int i = -1;
for (PhysicalOperator pred : predecessors) {
if(pred instanceof POSplit &&
splitsSeen.containsKey(pred.getOperatorKey())){
compiledInputs[++i] =
startNew(((POSplit)pred).getSplitStore(),
splitsSeen.get(pred.getOperatorKey()));
continue;
}
compile(pred);
compiledInputs[++i] = curMROp;
}
} else {
Best Regards
Zhang,Liyun