Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,1565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.CollectableLoadFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.OrderedLoadFunc;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
+import org.apache.pig.impl.builtin.GetMemNumRows;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+
+
+/**
+ * The compiler that compiles a given physical physicalPlan into a DAG of Spark
+ * operators
+ */
+public class SparkCompiler extends PhyPlanVisitor {
+    private static final Log LOG = LogFactory.getLog(SparkCompiler.class);
+
+    private PigContext pigContext;
+    private Properties pigProperties;
+
+    // The physicalPlan that is being compiled
+    private PhysicalPlan physicalPlan;
+
+    // The physicalPlan of Spark Operators
+    private SparkOperPlan sparkPlan;
+
+    private SparkOperator curSparkOp;
+
+    private String scope;
+
+    private SparkOperator[] compiledInputs = null;
+
+    private Map<OperatorKey, SparkOperator> splitsSeen;
+
+    private NodeIdGenerator nig;
+
+    private Map<PhysicalOperator, SparkOperator> phyToSparkOpMap;
+    private UDFFinder udfFinder;
+
+    public SparkCompiler(PhysicalPlan physicalPlan, PigContext pigContext) {
+        super(physicalPlan,
+                new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                        physicalPlan));
+        this.physicalPlan = physicalPlan;
+        this.pigContext = pigContext;
+        this.pigProperties = pigContext.getProperties();
+        this.sparkPlan = new SparkOperPlan();
+        this.phyToSparkOpMap = new HashMap<PhysicalOperator, SparkOperator>();
+        this.udfFinder = new UDFFinder();
+        this.nig = NodeIdGenerator.getGenerator();
+        this.splitsSeen = new HashMap<OperatorKey, SparkOperator>();
+
+    }
+
+    public void compile() throws IOException, PlanException, VisitorException {
+        List<PhysicalOperator> roots = physicalPlan.getRoots();
+        if ((roots == null) || (roots.size() <= 0)) {
+            int errCode = 2053;
+            String msg = "Internal error. Did not find roots in the physical 
physicalPlan.";
+            throw new SparkCompilerException(msg, errCode, PigException.BUG);
+        }
+        scope = roots.get(0).getOperatorKey().getScope();
+        List<PhysicalOperator> leaves = physicalPlan.getLeaves();
+
+        if (!pigContext.inIllustrator) {
+            for (PhysicalOperator op : leaves) {
+                if (!(op instanceof POStore)) {
+                    int errCode = 2025;
+                    String msg = "Expected leaf of reduce physicalPlan to "
+                            + "always be POStore. Found "
+                            + op.getClass().getSimpleName();
+                    throw new SparkCompilerException(msg, errCode,
+                            PigException.BUG);
+                }
+            }
+        }
+
+        // get all stores and nativeSpark operators, sort them in 
order(operator
+        // id)
+        // and compile their plans
+        List<POStore> stores = PlanHelper.getPhysicalOperators(physicalPlan,
+                POStore.class);
+        List<PONative> nativeSparks = PlanHelper.getPhysicalOperators(
+                physicalPlan, PONative.class);
+        List<PhysicalOperator> ops;
+        if (!pigContext.inIllustrator) {
+            ops = new ArrayList<PhysicalOperator>(stores.size()
+                    + nativeSparks.size());
+            ops.addAll(stores);
+        } else {
+            ops = new ArrayList<PhysicalOperator>(leaves.size()
+                    + nativeSparks.size());
+            ops.addAll(leaves);
+        }
+        ops.addAll(nativeSparks);
+        Collections.sort(ops);
+
+        for (PhysicalOperator op : ops) {
+            if (LOG.isDebugEnabled())
+                LOG.debug("Starting compile of leaf-level operator " + op);
+            compile(op);
+        }
+    }
+
+    /**
+     * Compiles the physicalPlan below op into a Spark Operator and stores it 
in
+     * curSparkOp.
+     * 
+     * @param op
+     * @throws IOException
+     * @throws PlanException
+     * @throws VisitorException
+     */
+    private void compile(PhysicalOperator op) throws IOException,
+            PlanException, VisitorException {
+        SparkOperator[] prevCompInp = compiledInputs;
+
+        if (LOG.isDebugEnabled())
+            LOG.debug("Compiling physical operator " + op +
+                ". Current spark operator is " + curSparkOp);
+
+        List<PhysicalOperator> predecessors = physicalPlan.getPredecessors(op);
+        if (op instanceof PONative) {
+            // the predecessor (store) has already been processed
+            // don't process it again
+        } else if (predecessors != null && predecessors.size() > 0) {
+            // When processing an entire script (multiquery), we can
+            // get into a situation where a load has
+            // predecessors. This means that it depends on some store
+            // earlier in the physicalPlan. We need to take that dependency
+            // and connect the respective Spark operators, while at the
+            // same time removing the connection between the Physical
+            // operators. That way the jobs will run in the right
+            // order.
+            if (op instanceof POLoad) {
+
+                if (predecessors.size() != 1) {
+                    int errCode = 2125;
+                    String msg = "Expected at most one predecessor of load. 
Got "
+                            + predecessors.size();
+                    throw new PlanException(msg, errCode, PigException.BUG);
+                }
+
+                PhysicalOperator p = predecessors.get(0);
+                SparkOperator oper = null;
+                if (p instanceof POStore || p instanceof PONative) {
+                    oper = phyToSparkOpMap.get(p);
+                } else {
+                    int errCode = 2126;
+                    String msg = "Predecessor of load should be a store or 
spark operator. Got "
+                            + p.getClass();
+                    throw new PlanException(msg, errCode, PigException.BUG);
+                }
+
+                // Need new operator
+                curSparkOp = getSparkOp();
+                curSparkOp.add(op);
+                sparkPlan.add(curSparkOp);
+                physicalPlan.disconnect(op, p);
+                sparkPlan.connect(oper, curSparkOp);
+                phyToSparkOpMap.put(op, curSparkOp);
+                return;
+            }
+
+            Collections.sort(predecessors);
+            compiledInputs = new SparkOperator[predecessors.size()];
+            int i = -1;
+            for (PhysicalOperator pred : predecessors) {
+                if (pred instanceof POSplit
+                        && splitsSeen.containsKey(pred.getOperatorKey())) {
+                    POSplit split = (POSplit) pred;
+                    compiledInputs[++i] = startNew(
+                            split.getSplitStore(),
+                            splitsSeen.get(pred.getOperatorKey()), null);
+                    continue;
+                }
+                compile(pred);
+                compiledInputs[++i] = curSparkOp;
+            }
+        } else {
+            // No predecessors. Mostly a load. But this is where
+            // we start. We create a new sparkOp and add its first
+            // operator op. Also this should be added to the sparkPlan.
+            curSparkOp = getSparkOp();
+            curSparkOp.add(op);
+            if (op != null && op instanceof POLoad) {
+                if (((POLoad) op).getLFile() != null
+                        && ((POLoad) op).getLFile().getFuncSpec() != null)
+                    curSparkOp.UDFs.add(((POLoad) op).getLFile().getFuncSpec()
+                            .toString());
+            }
+            sparkPlan.add(curSparkOp);
+            phyToSparkOpMap.put(op, curSparkOp);
+            return;
+        }
+        op.visit(this);
+        compiledInputs = prevCompInp;
+    }
+
+    private SparkOperator getSparkOp() {
+        SparkOperator op = new SparkOperator(OperatorKey.genOpKey(scope));
+        if (LOG.isDebugEnabled())
+            LOG.debug("Created new Spark operator " + op);
+        return op;
+    }
+
+    public SparkOperPlan getSparkPlan() {
+        return sparkPlan;
+    }
+
+    public void connectSoftLink() throws PlanException, IOException {
+        for (PhysicalOperator op : physicalPlan) {
+            if (physicalPlan.getSoftLinkPredecessors(op) != null) {
+                for (PhysicalOperator pred : physicalPlan
+                        .getSoftLinkPredecessors(op)) {
+                    SparkOperator from = phyToSparkOpMap.get(pred);
+                    SparkOperator to = phyToSparkOpMap.get(op);
+                    if (from == to)
+                        continue;
+                    if (sparkPlan.getPredecessors(to) == null
+                            || !sparkPlan.getPredecessors(to).contains(from)) {
+                        sparkPlan.connect(from, to);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param fSpec
+     * @param old
+     * @param operatorKey: If operatorKey is not null, we assign the 
operatorKey to POLoad in the new SparkOperator
+     *                     ,otherwise the operatorKey of POLoad will be 
created by the program. Detail see PIG-5212
+     * @return
+     * @throws PlanException
+     */
+    private SparkOperator startNew(FileSpec fSpec, SparkOperator old, 
OperatorKey operatorKey) throws PlanException {
+        POLoad ld = getLoad(operatorKey);
+        ld.setLFile(fSpec);
+        SparkOperator ret = getSparkOp();
+        ret.add(ld);
+        sparkPlan.add(ret);
+        sparkPlan.connect(old, ret);
+        return ret;
+    }
+
+    private POLoad getLoad(OperatorKey operatorKey) {
+        POLoad ld = null;
+        if (operatorKey != null) {
+            ld = new POLoad(operatorKey);
+        } else {
+            ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        }
+        ld.setPc(pigContext);
+        ld.setIsTmpLoad(true);
+        return ld;
+    }
+        
+    @Override
+    public void visitSplit(POSplit op) throws VisitorException {
+        try {
+            List<PhysicalOperator> preds = 
this.physicalPlan.getPredecessors(op);
+            OperatorKey predOperatorKey = null;
+            if (preds != null && preds.size() > 0) {
+                predOperatorKey = preds.get(0).getOperatorKey();
+            }
+            FileSpec fSpec = op.getSplitStore();
+            SparkOperator sparkOp = endSingleInputPlanWithStr(fSpec);
+            sparkOp.setSplitter(true);
+            splitsSeen.put(op.getOperatorKey(), sparkOp);
+            curSparkOp = startNew(fSpec, sparkOp, predOperatorKey);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    public void visitDistinct(PODistinct op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    private SparkOperator endSingleInputPlanWithStr(FileSpec fSpec)
+            throws PlanException {
+        if (compiledInputs.length > 1) {
+            int errCode = 2023;
+            String msg = "Received a multi input physicalPlan when expecting 
only a single input one.";
+            throw new PlanException(msg, errCode, PigException.BUG);
+        }
+        SparkOperator sparkOp = compiledInputs[0]; // Load
+        POStore str = getStore();
+        str.setSFile(fSpec);
+        sparkOp.physicalPlan.addAsLeaf(str);
+        return sparkOp;
+    }
+
+    private POStore getStore() {
+        POStore st = new POStore(new OperatorKey(scope,
+                nig.getNextNodeId(scope)));
+        // mark store as tmp store. These could be removed by the
+        // optimizer, because it wasn't the user requesting it.
+        st.setIsTmpStore(true);
+        return st;
+    }
+
+    @Override
+    public void visitLoad(POLoad op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitNative(PONative op) throws VisitorException {
+        try {
+            SparkOperator nativesparkOpper = getNativeSparkOp(
+                    op.getNativeMRjar(), op.getParams());
+            nativesparkOpper.markNative();
+            sparkPlan.add(nativesparkOpper);
+            sparkPlan.connect(curSparkOp, nativesparkOpper);
+            phyToSparkOpMap.put(op, nativesparkOpper);
+            curSparkOp = nativesparkOpper;
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    private NativeSparkOperator getNativeSparkOp(String sparkJar,
+            String[] parameters) {
+        return new NativeSparkOperator(new OperatorKey(scope,
+                nig.getNextNodeId(scope)), sparkJar, parameters);
+    }
+
+    @Override
+    public void visitStore(POStore op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+            if (op.getSFile() != null && op.getSFile().getFuncSpec() != null)
+                curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString());
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitFilter(POFilter op) throws VisitorException {
+        try {
+            addToPlan(op);
+            processUDFs(op.getPlan());
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitCross(POCross op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitStream(POStream op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitSort(POSort op) throws VisitorException {
+        try {
+            addToPlan(op);
+            POSort sort = op;
+            long limit = sort.getLimit();
+            if (limit!=-1) {
+                POLimit pLimit2 = new POLimit(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+                pLimit2.setLimit(limit);
+                curSparkOp.physicalPlan.addAsLeaf(pLimit2);
+                curSparkOp.markLimitAfterSort();
+            }
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitLimit(POLimit op) throws VisitorException {
+        try {
+            addToPlan(op);
+            curSparkOp.markLimit();
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitLocalRearrange(POLocalRearrange op)
+            throws VisitorException {
+        try {
+            addToPlan(op);
+            List<PhysicalPlan> plans = op.getPlans();
+            if (plans != null)
+                for (PhysicalPlan ep : plans)
+                    processUDFs(ep);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitCollectedGroup(POCollectedGroup op)
+            throws VisitorException {
+        List<PhysicalOperator> roots = curSparkOp.physicalPlan.getRoots();
+        if (roots.size() != 1) {
+            int errCode = 2171;
+            String errMsg = "Expected one but found more then one root 
physical operator in physical physicalPlan.";
+            throw new SparkCompilerException(errMsg, errCode, 
PigException.BUG);
+        }
+
+        PhysicalOperator phyOp = roots.get(0);
+        if (!(phyOp instanceof POLoad)) {
+            int errCode = 2172;
+            String errMsg = "Expected physical operator at root to be POLoad. 
Found : "
+                    + phyOp.getClass().getCanonicalName();
+            throw new SparkCompilerException(errMsg, errCode, 
PigException.BUG);
+        }
+
+        LoadFunc loadFunc = ((POLoad) phyOp).getLoadFunc();
+        try {
+            if (!(CollectableLoadFunc.class.isAssignableFrom(loadFunc
+                    .getClass()))) {
+                int errCode = 2249;
+                throw new SparkCompilerException(
+                        "While using 'collected' on group; data must be loaded 
via loader implementing CollectableLoadFunc.",
+                        errCode);
+            }
+            ((CollectableLoadFunc) 
loadFunc).ensureAllKeyInstancesInSameSplit();
+        } catch (SparkCompilerException e) {
+            throw (e);
+        } catch (IOException e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitPOForEach(POForEach op) throws VisitorException {
+        try {
+            addToPlan(op);
+            List<PhysicalPlan> plans = op.getInputPlans();
+            if (plans != null) {
+                for (PhysicalPlan ep : plans) {
+                    processUDFs(ep);
+                }
+            }
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitCounter(POCounter op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitRank(PORank op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitGlobalRearrange(POGlobalRearrange op)
+            throws VisitorException {
+        try {
+            POGlobalRearrangeSpark glbOp = new POGlobalRearrangeSpark(op);
+            addToPlan(glbOp);
+            if (op.isCross()) {
+                curSparkOp.addCrossKey(op.getOperatorKey().toString());
+            }
+
+            curSparkOp.customPartitioner = op.getCustomPartitioner();
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitPackage(POPackage op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+            if (op.getPkgr().getPackageType() == Packager.PackageType.JOIN) {
+                curSparkOp.markRegularJoin();
+            } else if (op.getPkgr().getPackageType() == 
Packager.PackageType.GROUP) {
+                if (op.getNumInps() == 1) {
+                    curSparkOp.markGroupBy();
+                } else if (op.getNumInps() > 1) {
+                    curSparkOp.markCogroup();
+                }
+            }
+
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitUnion(POUnion op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+            curSparkOp.markUnion();
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    /**
+     * currently use regular join to replace skewedJoin
+     * Skewed join currently works with two-table inner join.
+     * More info about pig SkewedJoin, See 
https://wiki.apache.org/pig/PigSkewedJoinSpec
+     *
+     * @param op
+     * @throws VisitorException
+     */
+    @Override
+    public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
+        try {
+            Random r = new Random();
+            String pigKeyDistFile = "pig.keyDistFile" + r.nextInt();
+            // firstly, build sample job
+            SparkOperator sampleSparkOp = getSkewedJoinSampleJob(op);
+
+            buildBroadcastForSkewedJoin(sampleSparkOp, pigKeyDistFile);
+
+            sampleSparkOp.markSampler();
+            sparkPlan.add(sampleSparkOp);
+
+            // secondly, build the join job.
+            addToPlan(op);
+            curSparkOp.setSkewedJoinPartitionFile(pigKeyDistFile);
+
+            // do sampling job before join job
+            sparkPlan.connect(sampleSparkOp, curSparkOp);
+
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " +
+                    op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitFRJoin(POFRJoin op) throws VisitorException {
+        try {
+            curSparkOp = 
phyToSparkOpMap.get(op.getInputs().get(op.getFragment()));
+            for (int i = 0; i < compiledInputs.length; i++) {
+                SparkOperator sparkOperator = compiledInputs[i];
+                if (curSparkOp.equals(sparkOperator)) {
+                    continue;
+                }
+
+                OperatorKey broadcastKey = new OperatorKey(scope, 
nig.getNextNodeId(scope));
+                POBroadcastSpark poBroadcastSpark = new 
POBroadcastSpark(broadcastKey);
+                
poBroadcastSpark.setBroadcastedVariableName(broadcastKey.toString());
+
+                sparkOperator.physicalPlan.addAsLeaf(poBroadcastSpark);
+            }
+
+            POFRJoinSpark poFRJoinSpark = new POFRJoinSpark(op);
+            addToPlan(poFRJoinSpark);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + 
op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    @Override
+    public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
+        try {
+            if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
+                int errCode=1101;
+                throw new SparkCompilerException("Merge Join must have exactly 
two inputs. Found : "+compiledInputs.length, errCode);
+            }
+
+            curSparkOp = phyToSparkOpMap.get(joinOp.getInputs().get(0));
+            SparkOperator rightSparkOp;
+            if(curSparkOp.equals(compiledInputs[0])) {
+                rightSparkOp = compiledInputs[1];
+            } else {
+                rightSparkOp = compiledInputs[0];
+            }
+
+            PhysicalPlan rightPipelinePlan;
+            PhysicalPlan rightPhyPlan = rightSparkOp.physicalPlan;
+            if (rightPhyPlan.getRoots().size() != 1) {
+                int errCode = 2171;
+                String errMsg = "Expected one but found more then one root 
physical operator in physical plan.";
+                throw new SparkCompilerException(errMsg,errCode);
+            }
+            PhysicalOperator rightPhyLoader = rightPhyPlan.getRoots().get(0);
+            if (!(rightPhyLoader instanceof POLoad)) {
+                int errCode = 2172;
+                String errMsg = "Expected physical operator at root to be 
POLoad. Found : "+rightPhyLoader.getClass().getCanonicalName();
+                throw new SparkCompilerException(errMsg,errCode);
+            }
+            if (rightPhyPlan.getSuccessors(rightPhyLoader) == null || 
rightPhyPlan.getSuccessors(rightPhyLoader).isEmpty()) {
+                // Load - Join case.
+                rightPipelinePlan = null;
+            } else{ // We got something on right side. Yank it and set it as 
inner plan of right input.
+                rightPipelinePlan = rightPhyPlan.clone();
+                PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
+                rightPipelinePlan.disconnect(root, 
rightPipelinePlan.getSuccessors(root).get(0));
+                rightPipelinePlan.remove(root);
+                rightPhyPlan.trimBelow(rightPhyLoader);
+            }
+
+            joinOp.setupRightPipeline(rightPipelinePlan);
+            rightSparkOp.setRequestedParallelism(1); // for indexing job
+
+            POLoad rightLoader = 
(POLoad)rightSparkOp.physicalPlan.getRoots().get(0);
+            joinOp.setSignature(rightLoader.getSignature());
+            LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
+
+            
if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
+                
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
+                
joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
+                
curSparkOp.UDFs.add(rightLoader.getLFile().getFuncSpec().toString());
+
+                // we don't need the right rightSparkOp since
+                // the right loader is an IndexableLoadFunc which can handle 
the index itself
+                sparkPlan.remove(rightSparkOp);
+                if(rightSparkOp == compiledInputs[0]) {
+                    compiledInputs[0] = null;
+                } else if(rightSparkOp == compiledInputs[1]) {
+                    compiledInputs[1] = null;
+                }
+
+                // validate that the join keys in merge join are only
+                // simple column projections or '*' and not expression - 
expressions
+                // cannot be handled when the index is built by the storage 
layer on the sorted
+                // data when the sorted data (and corresponding index) is 
written.
+                // So merge join will be restricted not have expressions as 
join keys
+                int numInputs = mPlan.getPredecessors(joinOp).size(); // 
should be 2
+                for(int i = 0; i < numInputs; i++) {
+                    List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
+                    for (PhysicalPlan keyPlan : keyPlans) {
+                        for(PhysicalOperator op : keyPlan) {
+                            if(!(op instanceof POProject)) {
+                                int errCode = 1106;
+                                String errMsg = "Merge join is possible only 
for simple column or '*' join keys when using " +
+                                        rightLoader.getLFile().getFuncSpec() + 
" as the loader";
+                                throw new SparkCompilerException(errMsg, 
errCode, PigException.INPUT);
+                            }
+                        }
+                    }
+                }
+
+            } else {
+                //Replacing POLoad with indexer is disabled for 'merge-sparse' 
joins.  While
+                //this feature would be useful, the current implementation of 
DefaultIndexableLoader
+                //is not designed to handle multiple calls to seekNear.  
Specifically, it rereads the entire index
+                //for each call.  Some refactoring of this class is required - 
and then the check below could be removed.
+                if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
+                    int errCode = 1104;
+                    String errMsg = "Right input of merge-join must implement 
IndexableLoadFunc. " +
+                            "The specified loader " + rightLoadFunc + " 
doesn't implement it";
+                    throw new SparkCompilerException(errMsg,errCode);
+                }
+
+                // Replace POLoad with  indexer.
+                if (! 
(OrderedLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass()))){
+                    int errCode = 1104;
+                    String errMsg = "Right input of merge-join must implement 
" +
+                            "OrderedLoadFunc interface. The specified loader "
+                            + rightLoadFunc + " doesn't implement it";
+                    throw new SparkCompilerException(errMsg,errCode);
+                }
+
+                String[] indexerArgs = new String[6];
+                List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
+                FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
+
+                indexerArgs[0] = 
origRightLoaderFileSpec.getFuncSpec().toString();
+                indexerArgs[1] = 
ObjectSerializer.serialize((Serializable)rightInpPlans);
+                indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
+                indexerArgs[3] = rightLoader.getSignature();
+                indexerArgs[4] = rightLoader.getOperatorKey().scope;
+                indexerArgs[5] = Boolean.toString(true);
+
+                FileSpec lFile = new 
FileSpec(rightLoader.getLFile().getFileName(),new 
FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
+                rightLoader.setLFile(lFile);
+
+                // (keyFirst1, keyFirst2, .. , position, splitIndex) See 
MergeJoinIndexer
+                rightSparkOp.useTypedComparator(true);
+                POStore idxStore = getStore();
+                FileSpec idxStrFile = getTempFileSpec();
+                idxStore.setSFile(idxStrFile);
+                rightSparkOp.physicalPlan.addAsLeaf(idxStore);
+                rightSparkOp.markIndexer();
+
+                
curSparkOp.UDFs.add(origRightLoaderFileSpec.getFuncSpec().toString());
+
+                // We want to ensure indexing job runs prior to actual join 
job.
+                // So, connect them in order.
+                sparkPlan.connect(rightSparkOp, curSparkOp);
+
+                // set up the DefaultIndexableLoader for the join operator
+                String[] defaultIndexableLoaderArgs = new String[5];
+                defaultIndexableLoaderArgs[0] = 
origRightLoaderFileSpec.getFuncSpec().toString();
+                defaultIndexableLoaderArgs[1] = idxStrFile.getFileName();
+                defaultIndexableLoaderArgs[2] = 
idxStrFile.getFuncSpec().toString();
+                defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
+                defaultIndexableLoaderArgs[4] = 
origRightLoaderFileSpec.getFileName();
+                joinOp.setRightLoaderFuncSpec((new 
FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
+                
joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
+
+                joinOp.setIndexFile(idxStrFile.getFileName());
+            }
+
+            curSparkOp.physicalPlan.addAsLeaf(joinOp);
+            phyToSparkOpMap.put(joinOp, curSparkOp);
+
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator "
+                    + joinOp.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+
+    private void processUDFs(PhysicalPlan plan) throws VisitorException {
+        if (plan != null) {
+            // Process Scalars (UDF with referencedOperators)
+            ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan);
+            scalarPhyFinder.visit();
+            curSparkOp.scalars.addAll(scalarPhyFinder.getScalars());
+
+            // Process UDFs
+            udfFinder.setPlan(plan);
+            udfFinder.visit();
+            curSparkOp.UDFs.addAll(udfFinder.getUDFs());
+        }
+    }
+
+    private void addToPlan(PhysicalOperator op) throws PlanException,
+            IOException {
+        SparkOperator sparkOp = null;
+        if (compiledInputs.length == 1) {
+            sparkOp = compiledInputs[0];
+        } else {
+            sparkOp = merge(compiledInputs);
+        }
+        sparkOp.physicalPlan.addAsLeaf(op);
+        curSparkOp = sparkOp;
+    }
+
+    private SparkOperator merge(SparkOperator[] compiledInputs)
+            throws PlanException {
+        SparkOperator ret = getSparkOp();
+        sparkPlan.add(ret);
+
+        Set<SparkOperator> toBeConnected = new HashSet<SparkOperator>();
+        List<SparkOperator> toBeRemoved = new ArrayList<SparkOperator>();
+
+        List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>();
+
+        for (SparkOperator sparkOp : compiledInputs) {
+            if (LOG.isDebugEnabled())
+                LOG.debug("Merging Spark operator" + sparkOp);
+            toBeRemoved.add(sparkOp);
+            toBeMerged.add(sparkOp.physicalPlan);
+            List<SparkOperator> predecessors = sparkPlan
+                    .getPredecessors(sparkOp);
+            if (predecessors != null) {
+                for (SparkOperator predecessorSparkOp : predecessors) {
+                    toBeConnected.add(predecessorSparkOp);
+                }
+            }
+        }
+        merge(ret.physicalPlan, toBeMerged);
+
+        Iterator<SparkOperator> it = toBeConnected.iterator();
+        while (it.hasNext())
+            sparkPlan.connect(it.next(), ret);
+        for (SparkOperator removeSparkOp : toBeRemoved) {
+            if (removeSparkOp.requestedParallelism > ret.requestedParallelism)
+                ret.requestedParallelism = removeSparkOp.requestedParallelism;
+            for (String udf : removeSparkOp.UDFs) {
+                if (!ret.UDFs.contains(udf))
+                    ret.UDFs.add(udf);
+            }
+            // We also need to change scalar marking
+            for (PhysicalOperator physOp : removeSparkOp.scalars) {
+                if (!ret.scalars.contains(physOp)) {
+                    ret.scalars.add(physOp);
+                }
+            }
+            
+            if(removeSparkOp.getCrossKeys()!=null){
+                for(String crossKey: removeSparkOp.getCrossKeys())
+                   ret.addCrossKey(crossKey);
+            }
+            
+            
+            Set<PhysicalOperator> opsToChange = new 
HashSet<PhysicalOperator>();
+            for (Map.Entry<PhysicalOperator, SparkOperator> entry : 
phyToSparkOpMap
+                    .entrySet()) {
+                if (entry.getValue() == removeSparkOp) {
+                    opsToChange.add(entry.getKey());
+                }
+            }
+            for (PhysicalOperator op : opsToChange) {
+                phyToSparkOpMap.put(op, ret);
+            }
+
+            sparkPlan.remove(removeSparkOp);
+        }
+        return ret;
+    }
+
+    /**
+     * The merge of a list of plans into a single physicalPlan
+     * 
+     * @param <O>
+     * @param <E>
+     * @param finPlan
+     *            - Final Plan into which the list of plans is merged
+     * @param plans
+     *            - list of plans to be merged
+     * @throws PlanException
+     */
+    private <O extends Operator<?>, E extends OperatorPlan<O>> void merge(
+            E finPlan, List<E> plans) throws PlanException {
+        for (E e : plans) {
+            finPlan.merge(e);
+        }
+        Collections.sort(finPlan.getLeaves());
+    }
+
+    @Override
+    public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws 
VisitorException {
+        if (compiledInputs.length < 2) {
+            int errCode = 2251;
+            String errMsg = "Merge Cogroup work on two or more relations." +
+                    "To use map-side group-by on single relation, use 
'collected' qualifier.";
+            throw new SparkCompilerException(errMsg, errCode);
+        }
+
+        List<FuncSpec> funcSpecs = new 
ArrayList<FuncSpec>(compiledInputs.length - 1);
+        List<String> fileSpecs = new ArrayList<String>(compiledInputs.length - 
1);
+        List<String> loaderSigns = new ArrayList<String>(compiledInputs.length 
- 1);
+
+        try {
+            poCoGrp.setEndOfRecordMark(POStatus.STATUS_NULL);
+
+            // Iterate through all the SparkOpererators, disconnect side 
SparkOperators from
+            // SparkOperator and collect all the information needed in 
different lists.
+
+            for (int i = 0; i < compiledInputs.length; i++) {
+                SparkOperator sparkOper = compiledInputs[i];
+                PhysicalPlan plan = sparkOper.physicalPlan;
+                if (plan.getRoots().size() != 1) {
+                    int errCode = 2171;
+                    String errMsg = "Expected one but found more then one root 
physical operator in physical plan.";
+                    throw new SparkCompilerException(errMsg, errCode, 
PigException.BUG);
+                }
+
+                PhysicalOperator rootPOOp = plan.getRoots().get(0);
+                if (!(rootPOOp instanceof POLoad)) {
+                    int errCode = 2172;
+                    String errMsg = "Expected physical operator at root to be 
POLoad. Found : " + rootPOOp.getClass().getCanonicalName();
+                    throw new SparkCompilerException(errMsg, errCode);
+                }
+
+                POLoad sideLoader = (POLoad) rootPOOp;
+                FileSpec loadFileSpec = sideLoader.getLFile();
+                FuncSpec funcSpec = loadFileSpec.getFuncSpec();
+                LoadFunc loadfunc = sideLoader.getLoadFunc();
+                if (i == 0) {
+
+                    if 
(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) {
+                        int errCode = 2252;
+                        throw new SparkCompilerException("Base loader in 
Cogroup must implement CollectableLoadFunc.", errCode);
+                    }
+
+                    ((CollectableLoadFunc) 
loadfunc).ensureAllKeyInstancesInSameSplit();
+                    continue;
+                }
+                if 
(!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) {
+                    int errCode = 2253;
+                    throw new SparkCompilerException("Side loaders in cogroup 
must implement IndexableLoadFunc.", errCode);
+                }
+
+                funcSpecs.add(funcSpec);
+                fileSpecs.add(loadFileSpec.getFileName());
+                loaderSigns.add(sideLoader.getSignature());
+                sparkPlan.remove(sparkOper);
+            }
+
+            poCoGrp.setSideLoadFuncs(funcSpecs);
+            poCoGrp.setSideFileSpecs(fileSpecs);
+            poCoGrp.setLoaderSignatures(loaderSigns);
+
+            // Use spark operator of base relation for the cogroup operation.
+            SparkOperator baseSparkOp = 
phyToSparkOpMap.get(poCoGrp.getInputs().get(0));
+
+            // Create a spark operator to generate index file for tuples from 
leftmost relation
+            SparkOperator indexerSparkOp = getSparkOp();
+            FileSpec idxFileSpec = getIndexingJob(indexerSparkOp, baseSparkOp, 
poCoGrp.getLRInnerPlansOf(0));
+            poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
+            poCoGrp.setIndexFileName(idxFileSpec.getFileName());
+
+            baseSparkOp.physicalPlan.addAsLeaf(poCoGrp);
+            for (FuncSpec funcSpec : funcSpecs)
+                baseSparkOp.UDFs.add(funcSpec.toString());
+
+            sparkPlan.add(indexerSparkOp);
+            sparkPlan.connect(indexerSparkOp, baseSparkOp);
+            phyToSparkOpMap.put(poCoGrp, baseSparkOp);
+            curSparkOp = baseSparkOp;
+        } catch (ExecException e) {
+            throw new SparkCompilerException(e.getDetailedMessage(), 
e.getErrorCode(), e.getErrorSource(), e);
+        } catch (SparkCompilerException mrce) {
+            throw (mrce);
+        } catch (CloneNotSupportedException e) {
+            throw new SparkCompilerException(e);
+        } catch (PlanException e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + 
poCoGrp.getClass().getCanonicalName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        } catch (IOException e) {
+            int errCode = 3000;
+            String errMsg = "IOException caught while compiling 
POMergeCoGroup";
+            throw new SparkCompilerException(errMsg, errCode, e);
+        }
+    }
+
+    // Sets up the indexing job for single-stage cogroups.
+    private FileSpec getIndexingJob(SparkOperator indexerSparkOp,
+                                    final SparkOperator baseSparkOp, final 
List<PhysicalPlan> mapperLRInnerPlans)
+            throws SparkCompilerException, PlanException, ExecException, 
IOException, CloneNotSupportedException {
+
+        // First replace loader with  MergeJoinIndexer.
+        PhysicalPlan baseMapPlan = baseSparkOp.physicalPlan;
+        POLoad baseLoader = (POLoad) baseMapPlan.getRoots().get(0);
+        FileSpec origLoaderFileSpec = baseLoader.getLFile();
+        FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
+        LoadFunc loadFunc = baseLoader.getLoadFunc();
+
+        if (!(OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))) {
+            int errCode = 1104;
+            String errMsg = "Base relation of merge-coGroup must implement " +
+                    "OrderedLoadFunc interface. The specified loader "
+                    + funcSpec + " doesn't implement it";
+            throw new SparkCompilerException(errMsg, errCode);
+        }
+
+        String[] indexerArgs = new String[6];
+        indexerArgs[0] = funcSpec.toString();
+        indexerArgs[1] = ObjectSerializer.serialize((Serializable) 
mapperLRInnerPlans);
+        indexerArgs[3] = baseLoader.getSignature();
+        indexerArgs[4] = baseLoader.getOperatorKey().scope;
+        indexerArgs[5] = Boolean.toString(false); // we care for nulls.
+
+        PhysicalPlan phyPlan;
+        if (baseMapPlan.getSuccessors(baseLoader) == null
+                || baseMapPlan.getSuccessors(baseLoader).isEmpty()) {
+            // Load-Load-Cogroup case.
+            phyPlan = null;
+        } else { // We got something. Yank it and set it as inner plan.
+            phyPlan = baseMapPlan.clone();
+            PhysicalOperator root = phyPlan.getRoots().get(0);
+            phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0));
+            phyPlan.remove(root);
+
+        }
+        indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
+
+        POLoad idxJobLoader = getLoad(null);
+        idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
+                new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
+        indexerSparkOp.physicalPlan.add(idxJobLoader);
+        
indexerSparkOp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
+
+        // Loader of sparkOp will return a tuple of form -
+        // (key1, key2, .. , WritableComparable, splitIndex). See 
MergeJoinIndexer for details.
+        // Create a spark node to retrieve index file by MergeJoinIndexer
+        SparkUtil.createIndexerSparkNode(indexerSparkOp, scope, nig);
+
+        POStore st = getStore();
+        FileSpec strFile = getTempFileSpec();
+        st.setSFile(strFile);
+        indexerSparkOp.physicalPlan.addAsLeaf(st);
+
+        return strFile;
+    }
+
+    /**
+     * Returns a temporary DFS Path
+     *
+     * @return
+     * @throws IOException
+     */
+    private FileSpec getTempFileSpec() throws IOException {
+        return new 
FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
+                new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
+    }
+
+    private static class FindKeyTypeVisitor extends PhyPlanVisitor {
+
+        byte keyType = DataType.UNKNOWN;
+
+        FindKeyTypeVisitor(PhysicalPlan plan) {
+            super(plan,
+                    new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
+        }
+
+        @Override
+        public void visitProject(POProject p) throws VisitorException {
+            keyType = p.getResultType();
+        }
+    }
+
+
+    /**
+     * build a POPoissonSampleSpark operator for SkewedJoin's sampling job
+     */
+    private void addSampleOperatorForSkewedJoin(SparkOperator sampleSparkOp)
+            throws PlanException {
+        Configuration conf = ConfigurationUtil.toConfiguration(pigProperties);
+        int sampleRate = conf.getInt(
+                PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE,
+                POPoissonSampleSpark.DEFAULT_SAMPLE_RATE);
+        float heapPerc = conf.getFloat(
+                PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE,
+                PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
+        long totalMemory = conf.getLong(
+                PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, -1);
+
+        POPoissonSampleSpark poSample = new POPoissonSampleSpark(
+                new OperatorKey(scope, nig.getNextNodeId(scope)), -1,
+                sampleRate, heapPerc, totalMemory);
+
+        sampleSparkOp.physicalPlan.addAsLeaf(poSample);
+    }
+
+    private SparkOperator getSortJob(
+            POSort sort,
+            SparkOperator quantJob,
+            FileSpec lFile,
+            FileSpec quantFile,
+            int rp, Pair<POProject, Byte>[] fields) throws PlanException {
+        SparkOperator sparkOper = startNew(lFile, quantJob, null);
+        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+        byte keyType = DataType.UNKNOWN;
+        if (fields == null) {
+            // This is project *
+            PhysicalPlan ep = new PhysicalPlan();
+            POProject prj = new POProject(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+            prj.setStar(true);
+            prj.setOverloaded(false);
+            prj.setResultType(DataType.TUPLE);
+            ep.add(prj);
+            eps1.add(ep);
+        } else {
+            /*
+            for (int i : fields) {
+                PhysicalPlan ep = new PhysicalPlan();
+                POProject prj = new POProject(new OperatorKey(scope,
+                    nig.getNextNodeId(scope)));
+                prj.setColumn(i);
+                prj.setOverloaded(false);
+                prj.setResultType(DataType.BYTEARRAY);
+                ep.add(prj);
+                eps1.add(ep);
+            }
+            */
+            // Attach the sort plans to the local rearrange to get the
+            // projection.
+            eps1.addAll(sort.getSortPlans());
+
+            // Visit the first sort plan to figure out our key type.  We only
+            // have to visit the first because if we have more than one plan,
+            // then the key type will be tuple.
+            try {
+                FindKeyTypeVisitor fktv =
+                        new FindKeyTypeVisitor(sort.getSortPlans().get(0));
+                fktv.visit();
+                keyType = fktv.keyType;
+            } catch (VisitorException ve) {
+                int errCode = 2035;
+                String msg = "Internal error. Could not compute key type of 
sort operator.";
+                throw new PlanException(msg, errCode, PigException.BUG, ve);
+            }
+        }
+
+        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+        try {
+            lr.setIndex(0);
+        } catch (ExecException e) {
+            int errCode = 2058;
+            String msg = "Unable to set index on newly created 
POLocalRearrange.";
+            throw new PlanException(msg, errCode, PigException.BUG, e);
+        }
+        lr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE :
+                keyType);
+        lr.setPlans(eps1);
+        lr.setResultType(DataType.TUPLE);
+        lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
+        sparkOper.physicalPlan.addAsLeaf(lr);
+
+        sparkOper.setGlobalSort(true);
+        pigContext.getProperties().setProperty("pig.reduce.keytype", 
Byte.toString(lr.getKeyType()));
+        sparkOper.requestedParallelism = rp;
+        sparkOper.physicalPlan.addAsLeaf(sort);
+
+        long limit = sort.getLimit();
+        if (limit != -1) {
+            POLimit pLimit2 = new POLimit(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+            pLimit2.setLimit(limit);
+            sparkOper.physicalPlan.addAsLeaf(pLimit2);
+            sparkOper.markLimitAfterSort();
+        }
+
+        return sparkOper;
+    }
+
+    /**
+     * Create a sampling job to collect statistics by sampling an input file. 
The sequence of operations is as
+     * following:
+     * <li>Transform input sample tuples into another tuple.</li>
+     * <li>Add an extra field &quot;all&quot; into the tuple </li>
+     * <li>Package all tuples into one bag </li>
+     * <li>Add constant field for number of reducers. </li>
+     * <li>Sorting the bag </li>
+     * <li>Invoke UDF with the number of reducers and the sorted bag.</li>
+     * <li>Data generated by UDF is stored into a file.</li>
+     *
+     * @param sort           the POSort operator used to sort the bag
+     * @param sampleOperator current sampling job
+     * @param rp             configured parallemism
+     * @param udfClassName   the class name of UDF
+     * @param udfArgs        the arguments of UDF
+     * @return pair<SparkOper,integer>
+     * @throws PlanException
+     * @throws VisitorException
+     */
+    @SuppressWarnings("deprecation")
+    private SparkOperator getSamplingJob(POSort sort, SparkOperator 
sampleOperator, List<PhysicalPlan>
+            transformPlans,
+                                         int rp,
+                                         String udfClassName, String[] 
udfArgs) throws PlanException,
+            VisitorException, ExecException {
+        addSampleOperatorForSkewedJoin(sampleOperator);
+        List<Boolean> flat1 = new ArrayList<Boolean>();
+        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+
+        // if transform plans are not specified, project the columns of 
sorting keys
+        if (transformPlans == null) {
+            Pair<POProject, Byte>[] sortProjs = null;
+            sortProjs = getSortCols(sort.getSortPlans());
+            // Set up the projections of the key columns
+            if (sortProjs == null) {
+                PhysicalPlan ep = new PhysicalPlan();
+                POProject prj = new POProject(new OperatorKey(scope,
+                        nig.getNextNodeId(scope)));
+                prj.setStar(true);
+                prj.setOverloaded(false);
+                prj.setResultType(DataType.TUPLE);
+                ep.add(prj);
+                eps1.add(ep);
+                flat1.add(false);
+            } else {
+                for (Pair<POProject, Byte> sortProj : sortProjs) {
+                    // Check for proj being null, null is used by getSortCols 
for a non POProject
+                    // operator. Since Order by does not allow expression 
operators,
+                    //it should never be set to null
+                    if (sortProj == null) {
+                        int errCode = 2174;
+                        String msg = "Internal exception. Could not create a 
sampler job";
+                        throw new SparkCompilerException(msg, errCode, 
PigException.BUG);
+                    }
+                    PhysicalPlan ep = new PhysicalPlan();
+                    POProject prj;
+                    try {
+                        prj = sortProj.first.clone();
+                    } catch (CloneNotSupportedException e) {
+                        //should not get here
+                        throw new AssertionError(
+                                "Error cloning project caught exception" + e
+                        );
+                    }
+                    ep.add(prj);
+                    eps1.add(ep);
+                    flat1.add(false);
+                }
+            }
+        } else {
+            for (int i = 0; i < transformPlans.size(); i++) {
+                eps1.add(transformPlans.get(i));
+                flat1.add(i == transformPlans.size() - 1 ? true : false);
+            }
+        }
+        // This foreach will pick the sort key columns from the 
RandomSampleLoader output
+        POForEach nfe1 = new POForEach(new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1, eps1, flat1);
+        sampleOperator.physicalPlan.addAsLeaf(nfe1);
+
+        //sort the sample
+        POSampleSortSpark poSparkSampleSort = new POSampleSortSpark(sort);
+        sampleOperator.physicalPlan.addAsLeaf(poSparkSampleSort);
+
+        // for the foreach
+        PhysicalPlan fe2Plan = new PhysicalPlan();
+        POProject topPrj = new POProject(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+        topPrj.setColumn(1);
+        topPrj.setResultType(DataType.BAG);
+        topPrj.setOverloaded(true);
+        fe2Plan.add(topPrj);
+
+
+        // The plan which will have a constant representing the
+        // degree of parallelism for the final order by map-reduce job
+        // this will either come from a "order by parallel x" in the script
+        // or will be the default number of reducers for the cluster if
+        // "parallel x" is not used in the script
+        PhysicalPlan rpep = new PhysicalPlan();
+        ConstantExpression rpce = new ConstantExpression(new 
OperatorKey(scope, nig.getNextNodeId(scope)));
+        rpce.setRequestedParallelism(rp);
+
+        // We temporarily set it to rp and will adjust it at runtime, because 
the final degree of parallelism
+        // is unknown until we are ready to submit it. See PIG-2779.
+        rpce.setValue(rp);
+
+        rpce.setResultType(DataType.INTEGER);
+        rpep.add(rpce);
+
+        List<PhysicalPlan> genEps = new ArrayList<PhysicalPlan>();
+        genEps.add(rpep);
+        genEps.add(fe2Plan);
+
+        List<Boolean> flattened2 = new ArrayList<Boolean>();
+        flattened2.add(false);
+        flattened2.add(false);
+
+        POForEach nfe2 = new POForEach(new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1, genEps, flattened2);
+        sampleOperator.physicalPlan.addAsLeaf(nfe2);
+
+        // Let's connect the output from the foreach containing
+        // number of quantiles and the sorted bag of samples to
+        // another foreach with the FindQuantiles udf. The input
+        // to the FindQuantiles udf is a project(*) which takes the
+        // foreach input and gives it to the udf
+        PhysicalPlan ep4 = new PhysicalPlan();
+        POProject prjStar4 = new POProject(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+        prjStar4.setResultType(DataType.TUPLE);
+        prjStar4.setStar(true);
+        ep4.add(prjStar4);
+
+        List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+        ufInps.add(prjStar4);
+
+        POUserFunc uf = new POUserFunc(new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1, ufInps,
+                new FuncSpec(udfClassName, udfArgs));
+        ep4.add(uf);
+        ep4.connect(prjStar4, uf);
+
+        List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
+        ep4s.add(ep4);
+        List<Boolean> flattened3 = new ArrayList<Boolean>();
+        flattened3.add(false);
+        POForEach nfe3 = new POForEach(new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1, ep4s, flattened3);
+
+        sampleOperator.physicalPlan.addAsLeaf(nfe3);
+
+        sampleOperator.requestedParallelism = 1;
+        sampleOperator.markSampler();
+        return sampleOperator;
+    }
+
+    private Pair<POProject, Byte>[] getSortCols(List<PhysicalPlan> plans) 
throws PlanException, ExecException {
+        if (plans != null) {
+            @SuppressWarnings("unchecked")
+            Pair<POProject, Byte>[] ret = new Pair[plans.size()];
+            int i = -1;
+            for (PhysicalPlan plan : plans) {
+                PhysicalOperator op = plan.getLeaves().get(0);
+                POProject proj;
+                if (op instanceof POProject) {
+                    if (((POProject) op).isStar()) return null;
+                    proj = (POProject) op;
+                } else {
+                    proj = null;
+                }
+                byte type = op.getResultType();
+                ret[++i] = new Pair<POProject, Byte>(proj, type);
+            }
+            return ret;
+        }
+        int errCode = 2026;
+        String msg = "No expression plan found in POSort.";
+        throw new PlanException(msg, errCode, PigException.BUG);
+    }
+
+    /**
+     * Add POBroadcastSpark operator to broadcast key distribution for 
SkewedJoin's sampling job
+     * @param sampleSparkOp
+     * @throws PlanException
+     */
+    private void buildBroadcastForSkewedJoin(SparkOperator sampleSparkOp, 
String pigKeyDistFile) throws PlanException {
+
+        POBroadcastSpark poBroadcast = new POBroadcastSpark(new 
OperatorKey(scope, nig.getNextNodeId(scope)));
+        poBroadcast.setBroadcastedVariableName(pigKeyDistFile);
+        sampleSparkOp.physicalPlan.addAsLeaf(poBroadcast);
+    }
+
+    /**
+     * Create Sampling job for skewed join.
+     */
+    private SparkOperator getSkewedJoinSampleJob(POSkewedJoin skewedJoin) 
throws PlanException, VisitorException {
+        try {
+            SparkOperator sampleOperator = new SparkOperator(new 
OperatorKey(scope, nig.getNextNodeId(scope)));
+            sampleOperator.physicalPlan = 
compiledInputs[0].physicalPlan.clone();
+            MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = 
skewedJoin.getJoinPlans();
+
+            List<PhysicalOperator> l = 
physicalPlan.getPredecessors(skewedJoin);
+            List<PhysicalPlan> groups = joinPlans.get(l.get(0));
+            List<Boolean> ascCol = new ArrayList<Boolean>();
+            for (int i = 0; i < groups.size(); i++) {
+                ascCol.add(false);
+            }
+
+            POSort sort = new POSort(skewedJoin.getOperatorKey(), 
skewedJoin.getRequestedParallelism(), null, groups,
+                    ascCol, null);
+
+            // set up transform plan to get keys and memory size of input 
tuples
+            // it first adds all the plans to get key columns,
+            List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>();
+            transformPlans.addAll(groups);
+
+            // then it adds a column for memory size
+            POProject prjStar = new POProject(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+            prjStar.setResultType(DataType.TUPLE);
+            prjStar.setStar(true);
+
+            List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+            ufInps.add(prjStar);
+
+            PhysicalPlan ep = new PhysicalPlan();
+            POUserFunc uf = new POUserFunc(new OperatorKey(scope, 
nig.getNextNodeId(scope)), -1, ufInps,
+                    new FuncSpec(GetMemNumRows.class.getName(), (String[]) 
null));
+            uf.setResultType(DataType.TUPLE);
+            ep.add(uf);
+            ep.add(prjStar);
+            ep.connect(prjStar, uf);
+
+            transformPlans.add(ep);
+            // pass configurations to the User Function
+            String per = 
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage",
+                    
String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
+            String mc = 
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
+
+            return getSamplingJob(sort, sampleOperator, transformPlans, 
skewedJoin.getRequestedParallelism(),
+                    PartitionSkewedKeys.class.getName(), new String[]{per, 
mc});
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " +
+                    skewedJoin.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Create a new SparkCompilerException with null as the error message.
+ */
+public class SparkCompilerException extends VisitorException  {
+    private static final long serialVersionUID = 2L;
+
+    /**
+     * Create a new SparkCompilerException with null as the error message.
+     */
+    public SparkCompilerException() {
+        super();
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and 
cause.
+     *
+     * @param message - The error message (which is saved for later retrieval 
by the <link>Throwable.getMessage()</link> method) shown to the user 
+     */
+    public SparkCompilerException(String message) {
+        super(message);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified cause.
+     *
+     * @param cause - The cause (which is saved for later retrieval by the 
<link>Throwable.getCause()</link> method) indicating the source of this 
exception. A null value is permitted, and indicates that the cause is 
nonexistent or unknown.
+     */
+    public SparkCompilerException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and 
cause.
+     *
+     * @param message - The error message (which is saved for later retrieval 
by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param cause - The cause (which is saved for later retrieval by the 
<link>Throwable.getCause()</link> method) indicating the source of this 
exception. A null value is permitted, and indicates that the cause is 
nonexistent or unknown.
+     */
+    public SparkCompilerException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and 
cause.
+     *
+     * @param message - The error message (which is saved for later retrieval 
by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     */
+    public SparkCompilerException(String message, int errCode) {
+        super(message, errCode);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and 
cause.
+     *
+     * @param message - The error message (which is saved for later retrieval 
by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param cause - The cause (which is saved for later retrieval by the 
<link>Throwable.getCause()</link> method) indicating the source of this 
exception. A null value is permitted, and indicates that the cause is 
nonexistent or unknown. 
+     */
+    public SparkCompilerException(String message, int errCode, Throwable 
cause) {
+        super(message, errCode, cause);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and 
cause.
+     *
+     * @param message - The error message (which is saved for later retrieval 
by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     */
+    public SparkCompilerException(String message, int errCode, byte errSrc) {
+        super(message, errCode, errSrc);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and 
cause.
+     *
+     * @param message - The error message (which is saved for later retrieval 
by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source
+     * @param cause - The cause (which is saved for later retrieval by the 
<link>Throwable.getCause()</link> method) indicating the source of this 
exception. A null value is permitted, and indicates that the cause is 
nonexistent or unknown. 
+     */
+    public SparkCompilerException(String message, int errCode, byte errSrc,
+                               Throwable cause) {
+        super(message, errCode, errSrc, cause);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and 
cause.
+     *
+     * @param message - The error message (which is saved for later retrieval 
by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param retry - If the exception is retriable or not
+     */
+    public SparkCompilerException(String message, int errCode, boolean retry) {
+        super(message, errCode, retry);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and 
cause.
+     *
+     * @param message - The error message (which is saved for later retrieval 
by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     * @param retry - If the exception is retriable or not
+     */
+    public SparkCompilerException(String message, int errCode, byte errSrc,
+                               boolean retry) {
+        super(message, errCode, errSrc, retry);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message, error 
code, error source, retriable or not, detalied message for the developer and 
cause.
+     *
+     * @param message - The error message (which is saved for later retrieval 
by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     * @param retry - If the exception is retriable or not
+     * @param detailedMsg - The detailed message shown to the developer 
+     */
+    public SparkCompilerException(String message, int errCode, byte errSrc,
+                               boolean retry, String detailedMsg) {
+        super(message, errCode, errSrc, retry, detailedMsg);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message, error 
code, error source, retriable or not, detalied message for the developer and 
cause.
+     *
+     * @param message - The error message (which is saved for later retrieval 
by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     * @param retry - If the exception is retriable or not
+     * @param detailedMsg - The detailed message shown to the developer 
+     * @param cause - The cause (which is saved for later retrieval by the 
<link>Throwable.getCause()</link> method) indicating the source of this 
exception. A null value is permitted, and indicates that the cause is 
nonexistent or unknown.
+     */
+    public SparkCompilerException(String message, int errCode, byte errSrc,
+                               boolean retry, String detailedMsg, Throwable 
cause) {
+        super(message, errCode, errSrc, retry, detailedMsg, cause);
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor for the SparkOperPlan class
+ */
+public class SparkOpPlanVisitor extends
+        PlanVisitor<SparkOperator, SparkOperPlan> {
+
+    public SparkOpPlanVisitor(SparkOperPlan plan,
+            PlanWalker<SparkOperator, SparkOperPlan> walker) {
+        super(plan, walker);
+    }
+
+    public void visitSparkOp(SparkOperator sparkOperator)
+            throws VisitorException {
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A Plan used to create the physicalPlan of Spark Operators
+ */
+public class SparkOperPlan extends OperatorPlan<SparkOperator> {
+
+    @Override
+    public String toString() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        SparkPrinter printer = new SparkPrinter(ps, this);
+        printer.setVerbose(true);
+        try {
+            printer.visit();
+        } catch (VisitorException e) {
+            // TODO Auto-generated catch block
+            throw new RuntimeException(
+                    "Unable to get String representation of plan:" + e, e);
+        }
+        return baos.toString();
+    }
+}


Reply via email to