Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
+import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
+import scala.Tuple2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.StoreFuncInterface;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.rdd.PairRDDFunctions;
+import org.apache.spark.rdd.RDD;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Converter that takes a POStore and stores it's content.
+ */
+@SuppressWarnings({ "serial" })
+public class StoreConverter implements
+        RDDConverter<Tuple, Tuple2<Text, Tuple>, POStore> {
+
+    private static final Log LOG = LogFactory.getLog(StoreConverter.class);
+
+    private JobConf jobConf = null;
+    public StoreConverter(JobConf jobConf) {
+        this.jobConf = jobConf;
+    }
+
+    @Override
+    public RDD<Tuple2<Text, Tuple>> convert(List<RDD<Tuple>> predecessors,
+            POStore op) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, op, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+
+        
SparkPigStatusReporter.getInstance().createCounter(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP,
+                SparkStatsUtil.getCounterName(op));
+
+        // convert back to KV pairs
+        JavaRDD<Tuple2<Text, Tuple>> rddPairs = rdd.toJavaRDD().map(
+                buildFromTupleFunction(op));
+
+        PairRDDFunctions<Text, Tuple> pairRDDFunctions = new 
PairRDDFunctions<Text, Tuple>(
+                rddPairs.rdd(), SparkUtil.getManifest(Text.class),
+                SparkUtil.getManifest(Tuple.class), null);
+
+        POStore poStore = configureStorer(jobConf, op);
+
+        if ("true".equalsIgnoreCase(jobConf
+                .get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+            Job storeJob = new Job(jobConf);
+            LazyOutputFormat.setOutputFormatClass(storeJob,
+                    PigOutputFormat.class);
+            jobConf = (JobConf) storeJob.getConfiguration();
+            jobConf.setOutputKeyClass(Text.class);
+            jobConf.setOutputValueClass(Tuple.class);
+            String fileName = poStore.getSFile().getFileName();
+            Path filePath = new Path(fileName);
+            FileOutputFormat.setOutputPath(jobConf,filePath);
+            pairRDDFunctions.saveAsNewAPIHadoopDataset(jobConf);
+        } else {
+            pairRDDFunctions.saveAsNewAPIHadoopFile(poStore.getSFile()
+                    .getFileName(), Text.class, Tuple.class,
+                    PigOutputFormat.class, jobConf);
+        }
+
+        RDD<Tuple2<Text, Tuple>> retRdd = rddPairs.rdd();
+        if (LOG.isDebugEnabled())
+            LOG.debug("RDD lineage: " + retRdd.toDebugString());
+        return retRdd;
+    }
+
+
+    private static POStore configureStorer(JobConf jobConf,
+            PhysicalOperator op) throws IOException {
+        ArrayList<POStore> storeLocations = Lists.newArrayList();
+        POStore poStore = (POStore) op;
+        storeLocations.add(poStore);
+        StoreFuncInterface sFunc = poStore.getStoreFunc();
+        sFunc.setStoreLocation(poStore.getSFile().getFileName(),
+                new org.apache.hadoop.mapreduce.Job(jobConf));
+        poStore.setInputs(null);
+        poStore.setParentPlan(null);
+
+        jobConf.set(JobControlCompiler.PIG_MAP_STORES,
+                ObjectSerializer.serialize(Lists.newArrayList()));
+        jobConf.set(JobControlCompiler.PIG_REDUCE_STORES,
+                ObjectSerializer.serialize(storeLocations));
+        return poStore;
+    }
+
+    private static class FromTupleFunction implements
+            Function<Tuple, Tuple2<Text, Tuple>> {
+
+        private static Text EMPTY_TEXT = new Text();
+        private String counterGroupName;
+        private String counterName;
+        private SparkCounters sparkCounters;
+        private boolean disableCounter;
+
+
+        public Tuple2<Text, Tuple> call(Tuple v1) {
+            if (sparkCounters != null && disableCounter == false) {
+                sparkCounters.increment(counterGroupName, counterName, 1L);
+            }
+            return new Tuple2<Text, Tuple>(EMPTY_TEXT, v1);
+        }
+
+        public void setCounterGroupName(String counterGroupName) {
+            this.counterGroupName = counterGroupName;
+        }
+
+        public void setCounterName(String counterName) {
+            this.counterName = counterName;
+        }
+
+        public void setSparkCounters(SparkCounters sparkCounter) {
+            this.sparkCounters = sparkCounter;
+        }
+
+        public void setDisableCounter(boolean disableCounter) {
+            this.disableCounter = disableCounter;
+        }
+    }
+
+    private FromTupleFunction buildFromTupleFunction(POStore op) {
+        FromTupleFunction ftf = new FromTupleFunction();
+        boolean disableCounter = op.disableCounter();
+        if (!op.isTmpStore() && !disableCounter) {
+            ftf.setDisableCounter(disableCounter);
+            ftf.setCounterGroupName(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP);
+            ftf.setCounterName(SparkStatsUtil.getCounterName(op));
+            SparkPigStatusReporter counterReporter = 
SparkPigStatusReporter.getInstance();
+            ftf.setSparkCounters(counterReporter.getCounters());
+        }
+        return ftf;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+public class StreamConverter implements
+        RDDConverter<Tuple, Tuple, POStream> {
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+            POStream poStream) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        StreamFunction streamFunction = new StreamFunction(poStream);
+        return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd();
+    }
+
+    private static class StreamFunction implements
+            FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+        private POStream poStream;
+
+        private StreamFunction(POStream poStream) {
+            this.poStream = poStream;
+        }
+
+        public Iterable<Tuple> call(final Iterator<Tuple> input) {
+            return new Iterable<Tuple>() {
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new OutputConsumerIterator(input) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            poStream.setInputs(null);
+                            poStream.attachInput(tuple);
+                        }
+
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            Result result = poStream.getNextTuple();
+                            return result;
+                        }
+
+                        @Override
+                        protected void endOfInput() {
+                            poStream.setFetchable(true);
+                        }
+                    };
+                }
+            };
+        }
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.util.List;
+
+import scala.collection.JavaConversions;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.SparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.rdd.UnionRDD;
+
+public class UnionConverter implements RDDConverter<Tuple, Tuple, POUnion> {
+
+    private final SparkContext sc;
+
+    public UnionConverter(SparkContext sc) {
+        this.sc = sc;
+    }
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+            POUnion physicalOperator) throws IOException {
+        SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
+                physicalOperator, 0);
+        UnionRDD<Tuple> unionRDD = new UnionRDD<Tuple>(sc,
+                JavaConversions.asScalaBuffer(predecessors),
+                SparkUtil.getManifest(Tuple.class));
+        return unionRDD;
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.hadoop.util.RunJar;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.RunJarSecurityManager;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
+
+/**
+ * NativeSparkOperator:
+ */
+public class NativeSparkOperator extends SparkOperator {
+    private static final long serialVersionUID = 1L;
+    private static int countJobs = 0;
+    private String nativeSparkJar;
+    private String[] params;
+    private String jobId;
+
+    public NativeSparkOperator(OperatorKey k, String sparkJar, String[] 
parameters) {
+        super(k);
+        nativeSparkJar = sparkJar;
+        params = parameters;
+        jobId = sparkJar + "_" + getJobNumber();
+    }
+
+    private static int getJobNumber() {
+        countJobs++;
+        return countJobs;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public void runJob() throws JobCreationException {
+        RunJarSecurityManager secMan = new RunJarSecurityManager();
+        try {
+            RunJar.main(getNativeMRParams());
+            SparkStatsUtil.addNativeJobStats(PigStats.get(), this);
+        } catch (SecurityException se) {   
//java.lang.reflect.InvocationTargetException
+            if (secMan.getExitInvoked()) {
+                if (secMan.getExitCode() != 0) {
+                    JobCreationException e = new JobCreationException("Native 
job returned with non-zero return code");
+                    SparkStatsUtil.addFailedNativeJobStats(PigStats.get(), 
this, e);
+                } else {
+                    SparkStatsUtil.addNativeJobStats(PigStats.get(), this);
+                }
+            }
+        } catch (Throwable t) {
+            JobCreationException e = new JobCreationException(
+                    "Cannot run native spark job " + t.getMessage(), t);
+            SparkStatsUtil.addFailedNativeJobStats(PigStats.get(), this, e);
+            throw e;
+        } finally {
+            secMan.retire();
+        }
+    }
+
+    private String[] getNativeMRParams() {
+        String[] paramArr = new String[params.length + 1];
+        paramArr[0] = nativeSparkJar;
+        for (int i = 0; i < params.length; i++) {
+            paramArr[i + 1] = params[i];
+        }
+        return paramArr;
+    }
+
+    public String getCommandString() {
+        StringBuilder sb = new StringBuilder("hadoop jar ");
+        sb.append(nativeSparkJar);
+        for (String pr : params) {
+            sb.append(" ");
+            sb.append(pr);
+        }
+        return sb.toString();
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+
+/**
+ * POGlobalRearrange for spark mode
+ */
+public class POGlobalRearrangeSpark extends POGlobalRearrange {
+    // Use secondary key
+    private boolean useSecondaryKey;
+    // Sort order for secondary keys;
+    private boolean[] secondarySortOrder;
+
+    public POGlobalRearrangeSpark(POGlobalRearrange copy)
+            throws ExecException {
+        super(copy);
+    }
+
+    public boolean isUseSecondaryKey() {
+        return useSecondaryKey;
+    }
+
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
+
+    public boolean[] getSecondarySortOrder() {
+        return secondarySortOrder;
+    }
+
+    public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+        this.secondarySortOrder = secondarySortOrder;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import java.util.List;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Collapse POLocalRearrange,POGlobalRearrange and POPackage to 
POJoinGroupSpark to reduce unnecessary map operations in the join/group
+ */
+public class POJoinGroupSpark extends PhysicalOperator {
+    private List<POLocalRearrange> lraOps;
+    private POGlobalRearrangeSpark glaOp;
+    private POPackage pkgOp;
+    private List<PhysicalOperator> predecessors;
+
+    public POJoinGroupSpark(List<POLocalRearrange> lraOps, 
POGlobalRearrangeSpark glaOp, POPackage pkgOp){
+        super(glaOp.getOperatorKey());
+        this.lraOps = lraOps;
+        this.glaOp = glaOp;
+        this.pkgOp = pkgOp;
+    }
+
+    public List<POLocalRearrange> getLROps() {
+        return lraOps;
+    }
+
+    public POGlobalRearrangeSpark getGROp() {
+        return glaOp;
+    }
+
+    public POPackage getPkgOp() {
+        return pkgOp;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "POJoinGroupSpark"+ "["
+                + DataType.findTypeName(resultType) + "]" + " - "
+                + mKey.toString();
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return null;
+    }
+
+    public void setPredecessors(List<PhysicalOperator> predecessors) {
+        this.predecessors = predecessors;
+    }
+
+    public List<PhysicalOperator> getPredecessors() {
+        return predecessors;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.builtin.PoissonSampleLoader;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POPoissonSampleSpark extends POPoissonSample {
+    private static final Log LOG = 
LogFactory.getLog(POPoissonSampleSpark.class);
+    // Only for Spark
+    private transient boolean endOfInput = false;
+
+    public boolean isEndOfInput() {
+        return endOfInput;
+    }
+
+    public void setEndOfInput(boolean isEndOfInput) {
+        endOfInput = isEndOfInput;
+    }
+
+    public POPoissonSampleSpark(OperatorKey k, int rp, int sr, float hp, long 
tm) {
+        super(k, rp, sr, hp, tm);
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        if (!initialized) {
+            numRowsSampled = 0;
+            avgTupleMemSz = 0;
+            rowNum = 0;
+            skipInterval = -1;
+            if (totalMemory == null) {
+                // Initialize in backend to get memory of task
+                totalMemory = Runtime.getRuntime().maxMemory();
+            }
+            long availRedMem = (long) (totalMemory * heapPerc);
+            memToSkipPerSample = availRedMem/sampleRate;
+            initialized = true;
+        }
+        if (numRowSplTupleReturned) {
+            // row num special row has been returned after all inputs
+            // were read, nothing more to read
+            return RESULT_EOP;
+        }
+
+        Result res;
+        res = processInput();
+
+        // if reaches at the end, pick a record and return
+        if (this.isEndOfInput()) {
+            // if skip enough, and the last record is OK.
+            if ( numSkipped == skipInterval
+                    && res.returnStatus == POStatus.STATUS_OK) {
+                return createNumRowTuple((Tuple) res.result);
+            } else if (newSample != null) {
+                return createNumRowTuple((Tuple) newSample.result);
+            }
+        }
+
+        // just return to read next record from input
+        if (res.returnStatus == POStatus.STATUS_NULL) {
+            return new Result(POStatus.STATUS_NULL, null);
+        } else if (res.returnStatus == POStatus.STATUS_EOP
+                    || res.returnStatus == POStatus.STATUS_ERR) {
+            return res;
+        }
+
+        // got a 'OK' record
+        rowNum++;
+
+        if (numSkipped < skipInterval) {
+            numSkipped++;
+
+            // skip this tuple, and continue to read from input
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+
+        // pick this record as sampled
+        newSample = res;
+        numSkipped = 0;
+        Result pickedSample = newSample;
+        updateSkipInterval((Tuple) pickedSample.result);
+
+        if( LOG.isDebugEnabled()) {
+            LOG.debug("pickedSample:");
+            if (pickedSample.result != null) {
+                for (int i = 0; i < ((Tuple) pickedSample.result).size(); i++) 
{
+                    LOG.debug("the " + i + " ele:" + ((Tuple) 
pickedSample.result).get(i));
+                }
+            }
+        }
+        return pickedSample;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "PoissonSampleSpark - " + mKey.toString();
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import java.util.List;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * ReduceBy operator that maps to Sparks ReduceBy.
+ * Extends ForEach and adds packager, secondary sort and partitioner support.
+ */
+public class POReduceBySpark extends POForEach {
+    private String customPartitioner;
+    protected POLocalRearrange lr;
+    protected POPackage pkg;
+
+    public POReduceBySpark(OperatorKey k, int rp, List<PhysicalPlan> inp, 
List<Boolean> isToBeFlattened, POPackage
+            pkg, POLocalRearrange lr){
+        super(k, rp, inp, isToBeFlattened);
+        this.pkg = pkg;
+        this.lr = lr;
+        this.addOriginalLocation(lr.getAlias(), lr.getOriginalLocations());
+    }
+
+    public POPackage getPKGOp() {
+        return pkg;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "Reduce By" + "(" + getFlatStr() + ")" + "["
+                + DataType.findTypeName(resultType) + "]" + " - "
+                + mKey.toString();
+    }
+
+    protected String getFlatStr() {
+        if(isToBeFlattenedArray ==null) {
+            return "";
+        }
+        StringBuilder sb = new StringBuilder();
+        for (Boolean b : isToBeFlattenedArray) {
+            sb.append(b);
+            sb.append(',');
+        }
+        if(sb.length()>0){
+            sb.deleteCharAt(sb.length()-1);
+        }
+        return sb.toString();
+    }
+
+    // Use secondary key
+    private boolean useSecondaryKey;
+    // Sort order for secondary keys;
+    private boolean[] secondarySortOrder;
+
+    public boolean isUseSecondaryKey() {
+        return useSecondaryKey;
+    }
+
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
+
+    public boolean[] getSecondarySortOrder() {
+        return secondarySortOrder;
+    }
+
+    public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+        this.secondarySortOrder = secondarySortOrder;
+    }
+
+    public String getCustomPartitioner() {
+        return customPartitioner;
+    }
+
+    public void setCustomPartitioner(String customPartitioner) {
+        this.customPartitioner = customPartitioner;
+    }
+
+    public POLocalRearrange getLROp() {
+        return lr;
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POSampleSortSpark extends POSort {
+
+    public POSampleSortSpark(POSort sort){
+        super(sort.getOperatorKey(), sort.getRequestedParallelism(), null, 
sort.getSortPlans(), sort.getMAscCols(), sort
+                .getMSortFunc());
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "POSparkSort" + "["
+                + DataType.findTypeName(resultType) + "]" + "("
+                + (super.getMSortFunc() != null ? 
super.getMSortFunc().getFuncSpec() : "") + ")"
+                + " - " + mKey.toString();
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return null;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+import java.util.List;
+
+/**
+ * A visitor to optimize plans that determines if a vertex plan can run in
+ * accumulative mode.
+ */
+public class AccumulatorOptimizer extends SparkOpPlanVisitor {
+
+    public AccumulatorOptimizer(SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOperator) throws
+            VisitorException {
+        PhysicalPlan plan = sparkOperator.physicalPlan;
+        List<PhysicalOperator> pos = plan.getRoots();
+        if (pos == null || pos.size() == 0) {
+            return;
+        }
+
+        List<POGlobalRearrange> glrs = PlanHelper.getPhysicalOperators(plan,
+                POGlobalRearrange.class);
+
+        for (POGlobalRearrange glr : glrs) {
+            List<PhysicalOperator> successors = plan.getSuccessors(glr);
+            AccumulatorOptimizerUtil.addAccumulator(plan, successors);
+        }
+    }
+}
\ No newline at end of file

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
+
+import com.google.common.collect.Maps;
+
+/**
+ * This class goes through the physical plan are replaces GlobalRearrange with 
ReduceBy
+ * where there are algebraic operations.
+ */
+public class CombinerOptimizer extends SparkOpPlanVisitor {
+
+    private static Log LOG = LogFactory.getLog(CombinerOptimizer.class);
+
+    public CombinerOptimizer(SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        try {
+            addCombiner(sparkOp.physicalPlan);
+        } catch (Exception e) {
+            throw new VisitorException(e);
+        }
+    }
+
+    // Checks for algebraic operations and if they exist.
+    // Replaces global rearrange (cogroup) with reduceBy as follows:
+    // Input:
+    // foreach (using algebraicOp)
+    //   -> packager
+    //      -> globalRearrange
+    //          -> localRearrange
+    // Output:
+    // foreach (using algebraicOp.Final)
+    //   -> reduceBy (uses algebraicOp.Intermediate)
+    //         -> foreach (using algebraicOp.Initial)
+    //             -> CombinerRearrange
+    private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, 
PlanException, CloneNotSupportedException {
+
+        List<PhysicalOperator> leaves = phyPlan.getLeaves();
+        if (leaves == null || leaves.size() != 1) {
+            return;
+        }
+
+        // Ensure there is grouping.
+        List<POGlobalRearrange> glrs = 
PlanHelper.getPhysicalOperators(phyPlan, POGlobalRearrange.class);
+        if (glrs == null || glrs.size() == 0) {
+            return;
+        }
+        for (POGlobalRearrange glr : glrs) {
+            List<PhysicalOperator> glrSuccessors = phyPlan.getSuccessors(glr);
+            if (glrSuccessors == null || glrSuccessors.isEmpty()) {
+                continue;
+            }
+
+            if (!(glrSuccessors.get(0) instanceof POPackage)) {
+                continue;
+            }
+            POPackage poPackage = (POPackage) glrSuccessors.get(0);
+
+            List<PhysicalOperator> poPackageSuccessors = 
phyPlan.getSuccessors(poPackage);
+            if (poPackageSuccessors == null || poPackageSuccessors.size() != 
1) {
+                continue;
+            }
+            PhysicalOperator successor = poPackageSuccessors.get(0);
+
+            // Retaining the original successor to be used later in modifying 
the plan.
+            PhysicalOperator packageSuccessor = successor;
+
+            if (successor instanceof POLimit) {
+                // POLimit is acceptable, as long as it has a single foreach as
+                // successor
+                List<PhysicalOperator> limitSucs = 
phyPlan.getSuccessors(successor);
+                if (limitSucs != null && limitSucs.size() == 1 &&
+                        limitSucs.get(0) instanceof POForEach) {
+                    // the code below will now further examine the foreach
+                    successor = limitSucs.get(0);
+                }
+            }
+            if (successor instanceof POForEach) {
+                POForEach foreach = (POForEach) successor;
+                List<PhysicalOperator> foreachSuccessors = 
phyPlan.getSuccessors(foreach);
+                // multi-query
+                if (foreachSuccessors == null || foreachSuccessors.size() != 
1) {
+                    continue;
+                }
+                // Clone foreach so it can be modified to a post-reduce 
foreach.
+                POForEach postReduceFE = foreach.clone();
+                List<PhysicalPlan> feInners = postReduceFE.getInputPlans();
+
+                // find algebraic operators and also check if the foreach 
statement
+                // is suitable for combiner use
+                List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = 
CombinerOptimizerUtil.findAlgebraicOps
+                        (feInners);
+                if (algebraicOps == null || algebraicOps.size() == 0) {
+                    // the plan is not combinable or there is nothing to 
combine
+                    // we're done
+                    continue;
+                }
+                try {
+                    List<PhysicalOperator> glrPredecessors = 
phyPlan.getPredecessors(glr);
+                    // Exclude co-group from optimization
+                    if (glrPredecessors == null || glrPredecessors.size() != 
1) {
+                        continue;
+                    }
+
+                    if (!(glrPredecessors.get(0) instanceof POLocalRearrange)) 
{
+                        continue;
+                    }
+
+                    POLocalRearrange rearrange = (POLocalRearrange) 
glrPredecessors.get(0);
+
+                    LOG.info("Algebraic operations found. Optimizing plan to 
use combiner.");
+
+                    // Trim the global rearrange and the preceeding package.
+                    convertToMapSideForEach(phyPlan, poPackage);
+
+                    // replace PODistinct->Project[*] with distinct udf (which 
is Algebraic)
+                    for (Pair<PhysicalOperator, PhysicalPlan> op2plan : 
algebraicOps) {
+                        if (!(op2plan.first instanceof PODistinct)) {
+                            continue;
+                        }
+                        CombinerOptimizerUtil.DistinctPatcher distinctPatcher
+                                = new 
CombinerOptimizerUtil.DistinctPatcher(op2plan.second);
+                        distinctPatcher.visit();
+                        if (distinctPatcher.getDistinct() == null) {
+                            int errCode = 2073;
+                            String msg = "Problem with replacing distinct 
operator with distinct built-in function.";
+                            throw new PlanException(msg, errCode, 
PigException.BUG);
+                        }
+                        op2plan.first = distinctPatcher.getDistinct();
+                    }
+
+                    // create new map foreach -
+                    POForEach mfe = 
CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
+                            .getKeyType());
+                    Map<PhysicalOperator, Integer> op2newpos = 
Maps.newHashMap();
+                    Integer pos = 1;
+                    // create plan for each algebraic udf and add as inner 
plan in map-foreach
+                    for (Pair<PhysicalOperator, PhysicalPlan> op2plan : 
algebraicOps) {
+                        PhysicalPlan udfPlan = 
CombinerOptimizerUtil.createPlanWithPredecessors(op2plan.first,
+                                op2plan.second);
+                        mfe.addInputPlan(udfPlan, false);
+                        op2newpos.put(op2plan.first, pos++);
+                    }
+                    CombinerOptimizerUtil.changeFunc(mfe, POUserFunc.INITIAL);
+
+                    // since we will only be creating SingleTupleBag as input 
to
+                    // the map foreach, we should flag the POProjects in the 
map
+                    // foreach inner plans to also use SingleTupleBag
+                    for (PhysicalPlan mpl : mfe.getInputPlans()) {
+                        try {
+                            new 
CombinerOptimizerUtil.fixMapProjects(mpl).visit();
+                        } catch (VisitorException e) {
+                            int errCode = 2089;
+                            String msg = "Unable to flag project operator to 
use single tuple bag.";
+                            throw new PlanException(msg, errCode, 
PigException.BUG, e);
+                        }
+                    }
+
+                    // create new combine foreach
+                    POForEach cfe = 
CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
+                            .getKeyType());
+                    // add algebraic functions with appropriate projection
+                    CombinerOptimizerUtil.addAlgebraicFuncToCombineFE(cfe, 
op2newpos);
+
+                    // we have modified the foreach inner plans - so set them 
again
+                    // for the foreach so that foreach can do any 
re-initialization
+                    // around them.
+                    mfe.setInputPlans(mfe.getInputPlans());
+                    cfe.setInputPlans(cfe.getInputPlans());
+
+                    // tell POCombinerPackage which fields need projected and 
which
+                    // placed in bags. First field is simple project rest need 
to go
+                    // into bags
+                    int numFields = algebraicOps.size() + 1; // algebraic 
funcs + group key
+                    boolean[] bags = new boolean[numFields];
+                    bags[0] = false;
+                    for (int i = 1; i < numFields; i++) {
+                        bags[i] = true;
+                    }
+
+                    // Use the POCombiner package in the combine plan
+                    // as it needs to act differently than the regular
+                    // package operator.
+                    CombinerPackager pkgr = new 
CombinerPackager(poPackage.getPkgr(), bags);
+                    POPackage combinePack = poPackage.clone();
+                    combinePack.setPkgr(pkgr);
+
+                    // A specialized local rearrange operator will replace
+                    // the normal local rearrange in the map plan.
+                    POLocalRearrange newRearrange = 
CombinerOptimizerUtil.getNewRearrange(rearrange);
+                    POPreCombinerLocalRearrange combinerLocalRearrange = 
CombinerOptimizerUtil.getPreCombinerLR
+                            (rearrange);
+                    phyPlan.replace(rearrange, combinerLocalRearrange);
+
+                    // Create a reduceBy operator.
+                    POReduceBySpark reduceOperator = new 
POReduceBySpark(cfe.getOperatorKey(), combinerLocalRearrange
+                            .getRequestedParallelism(), cfe.getInputPlans(), 
cfe.getToBeFlattened(), combinePack,
+                            newRearrange);
+                    
reduceOperator.setCustomPartitioner(glr.getCustomPartitioner());
+                    fixReduceSideFE(postReduceFE, algebraicOps);
+                    CombinerOptimizerUtil.changeFunc(reduceOperator, 
POUserFunc.INTERMEDIATE);
+                    updatePackager(reduceOperator, newRearrange);
+
+                    // Add the new operators
+                    phyPlan.add(reduceOperator);
+                    phyPlan.add(mfe);
+                    // Connect the new operators as follows:
+                    // reduceBy (using algebraicOp.Intermediate)
+                    //      -> foreach (using algebraicOp.Initial)
+                     phyPlan.connect(mfe, reduceOperator);
+
+                    // Insert the reduce stage between combiner rearrange and 
its successor.
+                    phyPlan.disconnect(combinerLocalRearrange, 
packageSuccessor);
+                    phyPlan.connect(reduceOperator, packageSuccessor);
+                    phyPlan.connect(combinerLocalRearrange, mfe);
+
+                    // Replace foreach with post reduce foreach
+                    phyPlan.add(postReduceFE);
+                    phyPlan.replace(foreach, postReduceFE);
+                } catch (Exception e) {
+                    int errCode = 2018;
+                    String msg = "Internal error. Unable to introduce the 
combiner for optimization.";
+                    throw new OptimizerException(msg, errCode, 
PigException.BUG, e);
+                }
+            }
+        }
+    }
+
+    // Modifies the input plans of the post reduce foreach to match the output 
of reduce stage.
+    private void fixReduceSideFE(POForEach postReduceFE, 
List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps)
+            throws ExecException, PlanException {
+        int i=1;
+        for (Pair<PhysicalOperator, PhysicalPlan> algebraicOp : algebraicOps) {
+            POUserFunc combineUdf = (POUserFunc) algebraicOp.first;
+            PhysicalPlan pplan = algebraicOp.second;
+            combineUdf.setAlgebraicFunction(POUserFunc.FINAL);
+
+            POProject newProj = new POProject(
+                    
CombinerOptimizerUtil.createOperatorKey(postReduceFE.getOperatorKey().getScope()),
+                    1, i
+            );
+            newProj.setResultType(DataType.BAG);
+
+            PhysicalOperator udfInput = 
pplan.getPredecessors(combineUdf).get(0);
+            pplan.disconnect(udfInput, combineUdf);
+            pplan.add(newProj);
+            pplan.connect(newProj, combineUdf);
+            i++;
+        }
+        postReduceFE.setResultType(DataType.TUPLE);
+    }
+
+    // Modifies the map side of foreach (before reduce).
+    private void convertToMapSideForEach(PhysicalPlan physicalPlan, POPackage 
poPackage)
+            throws PlanException {
+        LinkedList<PhysicalOperator> operatorsToRemove = new LinkedList<>();
+        for (PhysicalOperator physicalOperator : 
physicalPlan.getPredecessors(poPackage)) {
+            if (physicalOperator instanceof POGlobalRearrangeSpark) {
+                operatorsToRemove.add(physicalOperator);
+                break;
+            }
+        }
+        // Remove global rearranges preceeding POPackage
+        for (PhysicalOperator po : operatorsToRemove) {
+            physicalPlan.removeAndReconnect(po);
+        }
+        // Remove POPackage itself.
+        physicalPlan.removeAndReconnect(poPackage);
+    }
+
+    // Update the ReduceBy Operator with the packaging used by Local rearrange.
+    private void updatePackager(POReduceBySpark reduceOperator, 
POLocalRearrange lrearrange) throws OptimizerException {
+        Packager pkgr = reduceOperator.getPKGOp().getPkgr();
+        // annotate the package with information from the LORearrange
+        // update the keyInfo information if already present in the POPackage
+        Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = 
pkgr.getKeyInfo();
+        if (keyInfo == null)
+            keyInfo = new HashMap<>();
+
+        if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
+            // something is wrong - we should not be getting key info
+            // for the same index from two different Local Rearranges
+            int errCode = 2087;
+            String msg = "Unexpected problem during optimization." +
+                    " Found index:" + lrearrange.getIndex() +
+                    " in multiple LocalRearrange operators.";
+            throw new OptimizerException(msg, errCode, PigException.BUG);
+
+        }
+        keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+                new Pair<Boolean, Map<Integer, Integer>>(
+                        lrearrange.isProjectStar(), 
lrearrange.getProjectedColsMap()));
+        pkgr.setKeyInfo(keyInfo);
+        pkgr.setKeyTuple(lrearrange.isKeyTuple());
+        pkgr.setKeyCompound(lrearrange.isKeyCompound());
+    }
+
+    /**
+     * Look for a algebraic POUserFunc that is the leaf of an input plan.
+     *
+     * @param pplan physical plan
+     * @return null if any operator other POProject or non-algebraic 
POUserFunc is
+     * found while going down the plan, otherwise algebraic POUserFunc is 
returned
+     */
+    private static POUserFunc getAlgebraicSuccessor(PhysicalPlan pplan) {
+        // check if it ends in an UDF
+        List<PhysicalOperator> leaves = pplan.getLeaves();
+        if (leaves == null || leaves.size() != 1) {
+            return null;
+        }
+
+        PhysicalOperator succ = leaves.get(0);
+        if (succ instanceof POUserFunc && ((POUserFunc) succ).combinable()) {
+            return (POUserFunc) succ;
+        }
+
+        // some other operator ? can't combine
+        return null;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+
+/**
+ * Collapse LocalRearrange,GlobalRearrange,Package to POJoinGroupSpark to 
reduce unnecessary
+ * map operations to optimize join/group. Detail see PIG-4797
+ */
+public class JoinGroupOptimizerSpark extends SparkOpPlanVisitor {
+    private static final Log LOG = 
LogFactory.getLog(JoinGroupOptimizerSpark.class);
+
+    public JoinGroupOptimizerSpark(SparkOperPlan plan) {
+        super(plan, new DependencyOrderWalker<SparkOperator, 
SparkOperPlan>(plan, true));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        if (sparkOp.physicalPlan != null) {
+            GlobalRearrangeDiscover glrDiscover = new 
GlobalRearrangeDiscover(sparkOp.physicalPlan);
+            glrDiscover.visit();
+            List<PhysicalPlan> plans = glrDiscover.getPlansWithJoinAndGroup();
+            handlePlans(plans, sparkOp);
+        }
+
+    }
+
+    private void handlePlans(List<PhysicalPlan> plans, SparkOperator sparkOp) 
throws VisitorException {
+        for(int i=0;i<plans.size();i++){
+            PhysicalPlan planWithJoinAndGroup = plans.get(i);
+            POGlobalRearrangeSpark glrSpark = 
PlanHelper.getPhysicalOperators(planWithJoinAndGroup,POGlobalRearrangeSpark.class).get(0);
+            if (verifyJoinOrGroupCase(plans.get(i), glrSpark)) {
+                try {
+                    restructSparkOp(planWithJoinAndGroup, glrSpark, sparkOp);
+                } catch (PlanException e) {
+                    throw new 
VisitorException("GlobalRearrangeDiscover#visitSparkOp fails: ", e);
+                }
+            }
+        }
+    }
+
+    static class GlobalRearrangeDiscover extends PhyPlanVisitor {
+        private List<PhysicalPlan> plansWithJoinAndGroup = new 
ArrayList<PhysicalPlan>();
+        public GlobalRearrangeDiscover(PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+        }
+
+        @Override
+        public void visitGlobalRearrange(POGlobalRearrange glr) throws 
VisitorException {
+            PhysicalPlan currentPlan = this.mCurrentWalker.getPlan();//If 
there are POSplit, we need traverse the POSplit.getPlans(), so use 
mCurrentWalker.getPlan()
+            if( currentPlan != null) {
+                plansWithJoinAndGroup.add(currentPlan);
+            }
+        }
+
+        public List<PhysicalPlan> getPlansWithJoinAndGroup() {
+            return plansWithJoinAndGroup;
+        }
+    }
+
+    //collapse LRA,GRA,PKG to POJoinGroupSpark
+    private void restructSparkOp(PhysicalPlan plan,POGlobalRearrangeSpark 
glaOp, SparkOperator sparkOp) throws PlanException {
+
+        List<PhysicalOperator> predes = plan.getPredecessors(glaOp);
+        if (predes != null) {
+            List<POLocalRearrange> lraOps = new ArrayList<POLocalRearrange>();
+            List<PhysicalOperator> allPredsOfLRA = new 
ArrayList<PhysicalOperator>();
+
+            //Get the predecessors of POJoinGroupSpark with correct order 
after JoinOptimizationSpark
+            //For other PhysicalOperator, we usually use 
OperatorPlan#getPredecessor(op) to get predecessors and sort 
predecessors[JobGraphBuilder#getPredecessors] to
+            //get the predecessor with correct order(in common case, 
PhysicalOperator
+            //with small OperatorKey must be executed before that with bigger 
OperatorKey),but this is not suitable for POJoinGroupSpark
+            //Give an example to explain this:
+            //original:
+            //POLOAD(scope-1)                                POLOAD(scope-2)
+            //               \                                   /
+            //   POFOREach(scope-3)                              
POLocalRearrange(scope-5)
+            //                  \                                /
+            //              POLocalRearrange(scope-4)       
POLocalRearrange(scope-5)
+            //                      \                           /
+            //                              POGlobalRearrange(scope-6)
+            //                                      |
+            //                              POPackage(scope-7)
+            //after JoinOptimizationSpark:
+            //POLOAD(scope-1)                                POLOAD(scope-2)
+            //               \                                   /
+            //   POFOREach(scope-3)                             /
+            //                     \                           /
+            //                        POJoinGroupSpark(scope-8)
+
+            //the predecessor of POJoinGroupSpark(scope-8) is 
POForEach(scope-3) and POLoad(scope-2) because they are
+            //the predecessor of POLocalRearrange(scope-4) and 
POLocalRearrange(scope-5) while we will get
+            //will be POLoad(scope-2) and POForEach(scope-3) if use 
OperatorPlan#getPredecessor(op)to gain predecessors and sort predecessors
+            Collections.sort(predes);
+            for (PhysicalOperator lra : predes) {
+                lraOps.add((POLocalRearrange) lra);
+                List<PhysicalOperator> predOfLRAList = 
plan.getPredecessors(lra);
+                if( predOfLRAList != null && predOfLRAList.size() ==1) {
+                    PhysicalOperator predOfLRA = predOfLRAList.get(0);
+                    plan.disconnect(predOfLRA, lra);
+                    allPredsOfLRA.add(predOfLRA);
+                }
+            }
+
+            POPackage pkgOp = (POPackage) plan.getSuccessors(glaOp).get(0);
+            PhysicalOperator pkgSuccessor = plan.getSuccessors(pkgOp).get(0);
+            POJoinGroupSpark joinSpark = new POJoinGroupSpark(lraOps, glaOp, 
pkgOp);
+            if(allPredsOfLRA.size()>0) {
+                joinSpark.setPredecessors(allPredsOfLRA);
+            }
+            plan.add(joinSpark);
+
+            for (PhysicalOperator predOfLRA : allPredsOfLRA) {
+                plan.connect(predOfLRA, joinSpark);
+            }
+
+            plan.disconnect(pkgOp, pkgSuccessor);
+            plan.connect(joinSpark, pkgSuccessor);
+            for (POLocalRearrange lra : lraOps) {
+                replaceMultiqueryMapping(sparkOp, lra, joinSpark);
+                plan.remove(lra);
+            }
+            plan.remove(glaOp);
+            plan.remove(pkgOp);
+        }
+    }
+
+    private void replaceMultiqueryMapping(SparkOperator sparkOperator, 
PhysicalOperator from, PhysicalOperator to) {
+        MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionItems = 
sparkOperator.getMultiQueryOptimizeConnectionItem();
+        if 
(multiQueryOptimizeConnectionItems.containsKey(from.getOperatorKey())) {
+            List<OperatorKey> value = 
multiQueryOptimizeConnectionItems.get(from.getOperatorKey());
+            multiQueryOptimizeConnectionItems.removeKey(from.getOperatorKey());
+            multiQueryOptimizeConnectionItems.put(to.getOperatorKey(), value);
+        }
+    }
+
+    private boolean verifyJoinOrGroupCase(PhysicalPlan plan, 
POGlobalRearrangeSpark glaOp) {
+        List<PhysicalOperator> lraOps = plan.getPredecessors(glaOp);
+        List<PhysicalOperator> pkgOps = plan.getSuccessors(glaOp);
+        boolean isAllPredecessorLRA = isAllPredecessorLRA(lraOps);
+        boolean isSuccessorPKG = isSuccessorPKG(pkgOps);
+        return isAllPredecessorLRA && isSuccessorPKG;
+    }
+
+    private boolean isSuccessorPKG(List<PhysicalOperator> pkgOps) {
+        boolean result = false;
+        if (pkgOps != null && (pkgOps.size() == 1)) {
+            if (pkgOps.get(0) instanceof POPackage) {
+                result = true;
+            }
+        } else {
+            result = false;
+        }
+
+
+        return result;
+    }
+
+    private boolean isAllPredecessorLRA(List<PhysicalOperator> lraOps) {
+        boolean result = true;
+        if (lraOps != null) {
+            for (PhysicalOperator lraOp : lraOps) {
+                if (!(lraOp instanceof POLocalRearrange)) {
+                    result = false;
+                    break;
+                }
+            }
+        } else {
+            result = false;
+        }
+
+        return result;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+
+
+/**
+ * MultiQueryOptimizer for spark
+ */
+public class MultiQueryOptimizerSpark extends SparkOpPlanVisitor {
+
+    private static final Log LOG = 
LogFactory.getLog(MultiQueryOptimizerSpark.class);
+
+    private String scope;
+    private NodeIdGenerator nig;
+
+    public MultiQueryOptimizerSpark(SparkOperPlan plan) {
+        super(plan, new ReverseDependencyOrderWalker<SparkOperator, 
SparkOperPlan>(plan));
+        nig = NodeIdGenerator.getGenerator();
+        List<SparkOperator> roots = plan.getRoots();
+        scope = roots.get(0).getOperatorKey().getScope();
+    }
+
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        try {
+            if (!sparkOp.isSplitter()) {
+                return;
+            }
+
+            List<SparkOperator> splittees = getPlan().getSuccessors(sparkOp);
+
+            if (splittees == null) {
+                return;
+            }
+
+            //If the size of predecessors of splittee is more than 1, then not 
do multiquery optimization
+            //@see TestMultiQueryBasic#testMultiQueryWithFJ_2
+            for (SparkOperator splittee : splittees) {
+                if (getPlan().getPredecessors(splittee).size() > 1) {
+                    return;
+                }
+            }
+
+            if (splittees.size() == 1) {
+                // We don't need a POSplit here, we can merge the splittee 
into spliter
+                SparkOperator spliter = sparkOp;
+                SparkOperator singleSplitee = splittees.get(0);
+                List<PhysicalOperator> roots = 
singleSplitee.physicalPlan.getRoots();
+                List<PhysicalOperator> rootCopys = new 
ArrayList<PhysicalOperator>(roots);
+                //sort the roots by OperatorKey
+                //for the first element of roots, merge the physical plan of 
spliter and splittee
+                //for the other elements of roots,merge the clone physical 
plan of spliter and splittee
+                //the clone physical plan will have same type of physical 
operators but have more bigger OperatorKey
+                //thus physical operator with bigger OperatorKey will be 
executed later than those have small OperatorKey(see 
JobGraphBuilder.sortPredecessorRDDs())
+                Collections.sort(rootCopys);
+                List<PhysicalPlan> spliterPhysicalPlan = 
getPhysicalPlans(spliter.physicalPlan, rootCopys.size());
+                int i = 0;
+                for (PhysicalOperator root : rootCopys) {
+                    if (root instanceof POLoad) {
+                        POLoad load = (POLoad) root;
+                        PhysicalPlan plClone = spliterPhysicalPlan.get(i);
+                        POStore store = (POStore) plClone.getLeaves().get(0);
+                        if 
(load.getLFile().getFileName().equals(store.getSFile().getFileName())) {
+                            plClone.remove(store);
+                            PhysicalOperator succOfload = 
singleSplitee.physicalPlan.getSuccessors(load).get(0);
+                            singleSplitee.physicalPlan.remove(load);
+                            mergePlanAWithPlanB(singleSplitee.physicalPlan, 
plClone, succOfload);
+                            i++;
+                        }
+                    }
+                }
+
+                addSubPlanPropertiesToParent(singleSplitee, spliter);
+                removeSpliter(getPlan(), spliter, singleSplitee);
+            } else {
+                //If the size of splittee is more than 1, we need create a 
split which type is POSplit, merge all the physical plans
+                // of splittees to the physical plan of split and remove the 
splittees.
+                List<PhysicalOperator> firstNodeLeaves = 
sparkOp.physicalPlan.getLeaves();
+                PhysicalOperator firstNodeLeaf = firstNodeLeaves.size() > 0 ? 
firstNodeLeaves.get(0) : null;
+                POStore poStore = null;
+                if (firstNodeLeaf != null && firstNodeLeaf instanceof POStore) 
{
+                    poStore = (POStore) firstNodeLeaf;
+                    PhysicalOperator predOfPoStore = 
sparkOp.physicalPlan.getPredecessors(poStore).get(0);
+                    sparkOp.physicalPlan.remove(poStore); // remove  
unnecessary store
+                    POSplit poSplit = createSplit();
+                    ArrayList<SparkOperator> spliteesCopy = new ArrayList
+                            <SparkOperator>(splittees);
+                    for (SparkOperator splitee : spliteesCopy) {
+                        List<PhysicalOperator> rootsOfSplitee = new 
ArrayList(splitee.physicalPlan.getRoots());
+                        for (int i = 0; i < rootsOfSplitee.size(); i++) {
+                            if (rootsOfSplitee.get(i) instanceof POLoad) {
+                                POLoad poLoad = (POLoad) rootsOfSplitee.get(i);
+                                if 
(poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
+                                    List<PhysicalOperator> successorsOfPoLoad 
= splitee.physicalPlan.getSuccessors(poLoad);
+                                    List<PhysicalOperator> 
successorofPoLoadsCopy = new ArrayList<PhysicalOperator>(successorsOfPoLoad);
+                                    splitee.physicalPlan.remove(poLoad);  // 
remove  unnecessary load
+                                    for (PhysicalOperator successorOfPoLoad : 
successorofPoLoadsCopy) {
+                                        //we store from to relationship in 
SparkOperator#multiQueryOptimizeConnectionMap
+                                        
sparkOp.addMultiQueryOptimizeConnectionItem(successorOfPoLoad.getOperatorKey(), 
predOfPoStore.getOperatorKey());
+                                        LOG.debug(String.format("add 
multiQueryOptimize connection item: to:%s, from:%s for %s",
+                                                successorOfPoLoad.toString(), 
predOfPoStore.getOperatorKey().toString(), splitee.getOperatorKey()));
+                                    }
+                                }
+                            }
+                        }
+                        poSplit.addPlan(splitee.physicalPlan);
+                        addSubPlanPropertiesToParent(sparkOp, splitee);
+                        removeSplittee(getPlan(), sparkOp, splitee);
+                    }
+                    sparkOp.physicalPlan.addAsLeaf(poSplit);
+                }
+            }
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }
+    }
+
+    private List<PhysicalPlan> getPhysicalPlans(PhysicalPlan physicalPlan, int 
size) throws OptimizerException {
+        List<PhysicalPlan> ppList = new ArrayList<PhysicalPlan>();
+        try {
+            ppList.add(physicalPlan);
+            for (int i = 1; i < size; i++) {
+                ppList.add(physicalPlan.clone());
+            }
+        } catch (CloneNotSupportedException e) {
+            int errCode = 2127;
+            String msg = "Internal Error: Cloning of plan failed for 
optimization.";
+            throw new OptimizerException(msg, errCode, PigException.BUG, e);
+        }
+        return ppList;
+    }
+
+    //Merge every operators in planB to operator "to" of planA
+    private void mergePlanAWithPlanB(PhysicalPlan planA, PhysicalPlan planB, 
PhysicalOperator to) throws PlanException {
+        PhysicalOperator predOfStore = planB.getLeaves().get(0);
+        planA.merge(planB);
+        planA.connect(predOfStore, to);
+    }
+
+    private void removeSpliter(SparkOperPlan plan, SparkOperator spliter, 
SparkOperator splittee) throws PlanException {
+        if (plan.getPredecessors(spliter) != null) {
+            List<SparkOperator> preds = new 
ArrayList(plan.getPredecessors(spliter));
+            plan.disconnect(spliter, splittee);
+            for (SparkOperator pred : preds) {
+                plan.disconnect(pred, spliter);
+                plan.connect(pred, splittee);
+            }
+        }
+        plan.remove(spliter);
+    }
+
+    private void removeSplittee(SparkOperPlan plan, SparkOperator splitter,
+                                SparkOperator splittee) throws PlanException {
+        if (plan.getSuccessors(splittee) != null) {
+            List<SparkOperator> succs = new ArrayList();
+            succs.addAll(plan.getSuccessors(splittee));
+            plan.disconnect(splitter, splittee);
+            for (SparkOperator succSparkOperator : succs) {
+                plan.disconnect(splittee, succSparkOperator);
+                plan.connect(splitter, succSparkOperator);
+            }
+        }
+        getPlan().remove(splittee);
+    }
+
+    private POSplit createSplit() {
+        return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
+    }
+
+    static public void addSubPlanPropertiesToParent(SparkOperator parentOper, 
SparkOperator subPlanOper) {
+        // Copy only map side properties. For eg: crossKeys.
+        // Do not copy reduce side specific properties. For eg: 
useSecondaryKey, segmentBelow, sortOrder, etc
+        if (subPlanOper.getCrossKeys() != null) {
+            for (String key : subPlanOper.getCrossKeys()) {
+                parentOper.addCrossKey(key);
+            }
+        }
+        parentOper.copyFeatures(subPlanOper, null);
+
+        if (subPlanOper.getRequestedParallelism() > 
parentOper.getRequestedParallelism()) {
+            
parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
+        }
+        subPlanOper.setRequestedParallelismByReference(parentOper);
+        parentOper.UDFs.addAll(subPlanOper.UDFs);
+        parentOper.scalars.addAll(subPlanOper.scalars);
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NoopFilterRemoverUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * For historical reasons splits will always produce filters that pass
+ * everything through unchanged. This optimizer removes these.
+ * <p/>
+ * The condition we look for is POFilters with a constant boolean
+ * (true) expression as it's plan.
+ */
+public class NoopFilterRemover extends SparkOpPlanVisitor {
+    private Log log = LogFactory.getLog(NoopFilterRemover.class);
+
+    public NoopFilterRemover(SparkOperPlan plan) {
+        super(plan, new DependencyOrderWalker<SparkOperator, 
SparkOperPlan>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        List<POFilter> filters = PlanHelper.getPhysicalOperators(sparkOp
+                .physicalPlan, POFilter.class);
+        for (POFilter filter : filters) {
+            PhysicalPlan filterPlan = filter.getPlan();
+            if (filterPlan.size() == 1) {
+                PhysicalOperator fp = filterPlan.getRoots().get(0);
+                if (fp instanceof ConstantExpression) {
+                    ConstantExpression exp = (ConstantExpression) fp;
+                    Object value = exp.getValue();
+                    if (value instanceof Boolean) {
+                        Boolean filterValue = (Boolean) value;
+                        if (filterValue) {
+                            NoopFilterRemoverUtil.removeFilter(filter, 
sparkOp.physicalPlan);
+                        }
+                    }
+                }
+            }
+        }
+    }
+}


Reply via email to