Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.TransformerException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.BroadcastConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.JoinGroupSparkConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeCogroupConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.PoissonSampleConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.ReduceByConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.SparkSampleSortConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.JoinGroupOptimizerSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.DotSparkPrinter;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.XMLSparkPrinter;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.spark.SparkCounterGroup;
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
+import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.scheduler.JobLogger;
+import org.apache.spark.scheduler.StatsReportListener;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Main class that launches pig for Spark
+ */
+public class SparkLauncher extends Launcher {
+
+    private static final Log LOG = LogFactory.getLog(SparkLauncher.class);
+
+    // Our connection to Spark. It needs to be static so that it can be reused
+    // across jobs, because a
+    // new SparkLauncher gets created for each job.
+    private static JavaSparkContext sparkContext = null;
+    private static JobMetricsListener jobMetricsListener = new 
JobMetricsListener();
+    private String jobGroupID;
+    private PigContext pigContext = null;
+    private JobConf jobConf = null;
+    private String currentDirectoryPath = null;
+    private SparkEngineConf sparkEngineConf = new SparkEngineConf();
+    private static final String PIG_WARNING_FQCN = 
PigWarning.class.getCanonicalName();
+
+    @Override
+    public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
+                              PigContext pigContext) throws Exception {
+        if (LOG.isDebugEnabled())
+            LOG.debug(physicalPlan);
+        this.pigContext = pigContext;
+        initialize(physicalPlan);
+        SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(sparkplan);
+        }
+        SparkPigStats sparkStats = (SparkPigStats) pigContext
+                .getExecutionEngine().instantiatePigStats();
+        sparkStats.initialize(pigContext, sparkplan, jobConf);
+        PigStats.start(sparkStats);
+
+        startSparkIfNeeded(pigContext);
+
+        jobGroupID = String.format("%s-%s",sparkContext.getConf().getAppId(),
+                UUID.randomUUID().toString());
+        jobConf.set(MRConfiguration.JOB_ID,jobGroupID);
+
+        sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster",
+                false);
+        jobMetricsListener.reset();
+
+        this.currentDirectoryPath = Paths.get(".").toAbsolutePath()
+                .normalize().toString()
+                + "/";
+
+        new ParallelismSetter(sparkplan, jobConf).visit();
+
+        prepareSparkCounters(jobConf);
+
+        // Create conversion map, mapping between pig operator and spark 
convertor
+        Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
+                = new HashMap<Class<? extends PhysicalOperator>, 
RDDConverter>();
+        convertMap.put(POLoad.class, new LoadConverter(pigContext,
+                physicalPlan, sparkContext.sc(), jobConf, sparkEngineConf));
+        convertMap.put(POStore.class, new StoreConverter(jobConf));
+        convertMap.put(POForEach.class, new ForEachConverter(jobConf));
+        convertMap.put(POFilter.class, new FilterConverter());
+        convertMap.put(POPackage.class, new PackageConverter());
+        convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
+        convertMap.put(POGlobalRearrangeSpark.class, new 
GlobalRearrangeConverter());
+        convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter());
+        convertMap.put(POLimit.class, new LimitConverter());
+        convertMap.put(PODistinct.class, new DistinctConverter());
+        convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
+        convertMap.put(POSort.class, new SortConverter());
+        convertMap.put(POSplit.class, new SplitConverter());
+        convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
+        convertMap.put(POMergeJoin.class, new MergeJoinConverter());
+        convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
+        convertMap.put(POCounter.class, new CounterConverter());
+        convertMap.put(PORank.class, new RankConverter());
+        convertMap.put(POStream.class, new StreamConverter());
+        convertMap.put(POFRJoinSpark.class, new FRJoinConverter());
+        convertMap.put(POMergeCogroup.class, new MergeCogroupConverter());
+        convertMap.put(POReduceBySpark.class, new ReduceByConverter());
+        convertMap.put(POPreCombinerLocalRearrange.class, new 
LocalRearrangeConverter());
+        convertMap.put(POBroadcastSpark.class, new 
BroadcastConverter(sparkContext));
+        convertMap.put(POSampleSortSpark.class, new 
SparkSampleSortConverter());
+        convertMap.put(POPoissonSampleSpark.class, new 
PoissonSampleConverter());
+        //Print SPARK plan before launching if needed
+        Configuration conf = 
ConfigurationUtil.toConfiguration(pigContext.getProperties());
+        if (conf.getBoolean(PigConfiguration.PIG_PRINT_EXEC_PLAN, false)) {
+            LOG.info(sparkplan);
+        }
+        uploadResources(sparkplan);
+
+        new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, 
jobMetricsListener, jobGroupID, jobConf, pigContext).visit();
+        cleanUpSparkJob(sparkStats);
+        sparkStats.finish();
+        resetUDFContext();
+        return sparkStats;
+    }
+
+    private void resetUDFContext() {
+        UDFContext.getUDFContext().addJobConf(null);
+    }
+
+    private void uploadResources(SparkOperPlan sparkPlan) throws IOException {
+        addFilesToSparkJob(sparkPlan);
+        addJarsToSparkJob(sparkPlan);
+    }
+
+    private void optimize(SparkOperPlan plan, PigContext pigContext) throws 
IOException {
+
+        Configuration conf = 
ConfigurationUtil.toConfiguration(pigContext.getProperties());
+
+        // Should be the first optimizer as it introduces new operators to the 
plan.
+        boolean noCombiner = 
conf.getBoolean(PigConfiguration.PIG_EXEC_NO_COMBINER, false);
+        if (!pigContext.inIllustrator && !noCombiner)  {
+            CombinerOptimizer combinerOptimizer = new CombinerOptimizer(plan);
+            combinerOptimizer.visit();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("After combiner optimization:");
+                LOG.debug(plan);
+            }
+        }
+
+        boolean noSecondaryKey = 
conf.getBoolean(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, false);
+        if (!pigContext.inIllustrator && !noSecondaryKey) {
+            SecondaryKeyOptimizerSpark skOptimizer = new 
SecondaryKeyOptimizerSpark(plan);
+            skOptimizer.visit();
+        }
+
+        boolean isAccum = 
conf.getBoolean(PigConfiguration.PIG_OPT_ACCUMULATOR, true);
+        if (isAccum) {
+            AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
+            accum.visit();
+        }
+
+        // removes the filter(constant(true)) operators introduced by
+        // splits.
+        NoopFilterRemover fRem = new NoopFilterRemover(plan);
+        fRem.visit();
+
+        boolean isMultiQuery = 
conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Before multiquery optimization:");
+            LOG.debug(plan);
+        }
+
+        if (isMultiQuery) {
+            // reduces the number of SparkOpers in the Spark plan generated
+            // by multi-query (multi-store) script.
+            MultiQueryOptimizerSpark mqOptimizer = new 
MultiQueryOptimizerSpark(plan);
+            mqOptimizer.visit();
+        }
+
+        //since JoinGroupOptimizerSpark modifies the plan and collapses 
LRA+GLA+PKG into POJoinGroupSpark while
+        //CombinerOptimizer collapses GLA+PKG into ReduceBy, so if 
JoinGroupOptimizerSpark first, the spark plan will be
+        //changed and not suitable for CombinerOptimizer.More detail see 
PIG-4797
+        JoinGroupOptimizerSpark joinOptimizer = new 
JoinGroupOptimizerSpark(plan);
+        joinOptimizer.visit();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("After multiquery optimization:");
+            LOG.debug(plan);
+        }
+    }
+
+    private void cleanUpSparkJob(SparkPigStats sparkStats) throws 
ExecException {
+        LOG.info("Clean up Spark Job");
+        boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
+                .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
+        if (isLocal) {
+            String shipFiles = pigContext.getProperties().getProperty(
+                    "pig.streaming.ship.files");
+            if (shipFiles != null) {
+                for (String file : shipFiles.split(",")) {
+                    File shipFile = new File(file);
+                    File deleteFile = new File(currentDirectoryPath + "/"
+                            + shipFile.getName());
+                    if (deleteFile.exists()) {
+                        LOG.info(String.format("Delete ship file result: %b",
+                                deleteFile.delete()));
+                    }
+                }
+            }
+            String cacheFiles = pigContext.getProperties().getProperty(
+                    "pig.streaming.cache.files");
+            if (cacheFiles != null) {
+                for (String file : cacheFiles.split(",")) {
+                    String fileName = extractFileName(file.trim());
+                    File deleteFile = new File(currentDirectoryPath + "/"
+                            + fileName);
+                    if (deleteFile.exists()) {
+                        LOG.info(String.format("Delete cache file result: %b",
+                                deleteFile.delete()));
+                    }
+                }
+            }
+        }
+
+        // run cleanup for all of the stores
+        for (OutputStats output : sparkStats.getOutputStats()) {
+            POStore store = output.getPOStore();
+            try {
+                if (!output.isSuccessful()) {
+                    store.getStoreFunc().cleanupOnFailure(
+                            store.getSFile().getFileName(),
+                            Job.getInstance(output.getConf()));
+                } else {
+                    store.getStoreFunc().cleanupOnSuccess(
+                            store.getSFile().getFileName(),
+                            Job.getInstance(output.getConf()));
+                }
+            } catch (IOException e) {
+                throw new ExecException(e);
+            } catch (AbstractMethodError nsme) {
+                // Just swallow it.  This means we're running against an
+                // older instance of a StoreFunc that doesn't implement
+                // this method.
+            }
+        }
+    }
+
+    private void addFilesToSparkJob(SparkOperPlan sparkPlan) throws 
IOException {
+        LOG.info("Add files Spark Job");
+        String shipFiles = pigContext.getProperties().getProperty(
+                "pig.streaming.ship.files");
+        shipFiles(shipFiles);
+        String cacheFiles = pigContext.getProperties().getProperty(
+                "pig.streaming.cache.files");
+        cacheFiles(cacheFiles);
+        addUdfResourcesToSparkJob(sparkPlan);
+    }
+
+    private void addUdfResourcesToSparkJob(SparkOperPlan sparkPlan) throws 
IOException {
+        SparkPOUserFuncVisitor sparkPOUserFuncVisitor = new 
SparkPOUserFuncVisitor(sparkPlan);
+        sparkPOUserFuncVisitor.visit();
+        Joiner joiner = Joiner.on(",");
+        String shipFiles = joiner.join(sparkPOUserFuncVisitor.getShipFiles());
+        shipFiles(shipFiles);
+        String cacheFiles = 
joiner.join(sparkPOUserFuncVisitor.getCacheFiles());
+        cacheFiles(cacheFiles);
+    }
+
+    private void shipFiles(String shipFiles)
+            throws IOException {
+        if (shipFiles != null && !shipFiles.isEmpty()) {
+            for (String file : shipFiles.split(",")) {
+                File shipFile = new File(file.trim());
+                if (shipFile.exists()) {
+                    addResourceToSparkJobWorkingDirectory(shipFile,
+                            shipFile.getName(), ResourceType.FILE);
+                }
+            }
+        }
+    }
+
+    private void cacheFiles(String cacheFiles) throws IOException {
+        if (cacheFiles != null && !cacheFiles.isEmpty()) {
+            File tmpFolder = Files.createTempDirectory("cache").toFile();
+            tmpFolder.deleteOnExit();
+            for (String file : cacheFiles.split(",")) {
+                String fileName = extractFileName(file.trim());
+                if( fileName != null) {
+                    String fileUrl = extractFileUrl(file.trim());
+                    if( fileUrl != null) {
+                        Path src = new Path(fileUrl);
+                        File tmpFile = new File(tmpFolder, fileName);
+                        Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
+                        FileSystem fs = tmpFilePath.getFileSystem(jobConf);
+                        //TODO:PIG-5241 Specify the hdfs path directly to 
spark and avoid the unnecessary download and upload in SparkLauncher.java
+                        fs.copyToLocalFile(src, tmpFilePath);
+                        tmpFile.deleteOnExit();
+                        LOG.info(String.format("CacheFile:%s", fileName));
+                        addResourceToSparkJobWorkingDirectory(tmpFile, 
fileName,
+                                ResourceType.FILE);
+                    }
+                }
+            }
+        }
+    }
+
+    public static enum ResourceType {
+        JAR,
+        FILE
+    }
+
+
+    private void addJarsToSparkJob(SparkOperPlan sparkPlan) throws IOException 
{
+        Set<String> allJars = new HashSet<String>();
+        LOG.info("Add default jars to Spark Job");
+        allJars.addAll(JarManager.getDefaultJars());
+        LOG.info("Add extra jars to Spark Job");
+        for (String scriptJar : pigContext.scriptJars) {
+            allJars.add(scriptJar);
+        }
+
+        LOG.info("Add udf jars to Spark Job");
+        UDFJarsFinder udfJarsFinder = new UDFJarsFinder(sparkPlan, pigContext);
+        udfJarsFinder.visit();
+        Set<String> udfJars = udfJarsFinder.getUdfJars();
+        for (String udfJar : udfJars) {
+            allJars.add(udfJar);
+        }
+
+        File scriptUDFJarFile = JarManager.createPigScriptUDFJar(pigContext);
+        if (scriptUDFJarFile != null) {
+            LOG.info("Add script udf jar to Spark job");
+            allJars.add(scriptUDFJarFile.getAbsolutePath().toString());
+        }
+
+        //Upload all jars to spark working directory
+        for (String jar : allJars) {
+            File jarFile = new File(jar);
+            addResourceToSparkJobWorkingDirectory(jarFile, jarFile.getName(),
+                    ResourceType.JAR);
+        }
+    }
+
+    private void addResourceToSparkJobWorkingDirectory(File resourcePath,
+                                                       String resourceName, 
ResourceType resourceType) throws IOException {
+        if (resourceType == ResourceType.JAR) {
+            LOG.info("Added jar " + resourceName);
+        } else {
+            LOG.info("Added file " + resourceName);
+        }
+        boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
+                .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
+        if (isLocal) {
+            File localFile = new File(currentDirectoryPath + "/"
+                    + resourceName);
+            if 
(resourcePath.getAbsolutePath().equals(localFile.getAbsolutePath())
+                    && resourcePath.exists()) {
+                return;
+            }
+            // When multiple threads start SparkLauncher, delete/copy actions 
should be in a critical section
+            synchronized(SparkLauncher.class) {
+                if (localFile.exists()) {
+                    LOG.info(String.format(
+                            "Jar file %s exists, ready to delete",
+                            localFile.getAbsolutePath()));
+                    localFile.delete();
+                } else {
+                    LOG.info(String.format("Jar file %s not exists,",
+                            localFile.getAbsolutePath()));
+                }
+                Files.copy(Paths.get(new 
Path(resourcePath.getAbsolutePath()).toString()),
+                    Paths.get(localFile.getAbsolutePath()));
+            }
+        } else {
+            if(resourceType == ResourceType.JAR){
+                sparkContext.addJar(resourcePath.toURI().toURL()
+                        .toExternalForm());
+            }else if( resourceType == ResourceType.FILE){
+                sparkContext.addFile(resourcePath.toURI().toURL()
+                        .toExternalForm());
+            }
+        }
+    }
+
+    private String extractFileName(String cacheFileUrl) {
+        String[] tmpAry = cacheFileUrl.split("#");
+        String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[1]
+                : null;
+        return fileName;
+    }
+
+    private String extractFileUrl(String cacheFileUrl) {
+        String[] tmpAry = cacheFileUrl.split("#");
+        String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[0]
+                : null;
+        return fileName;
+    }
+
+    public SparkOperPlan compile(PhysicalPlan physicalPlan,
+                                  PigContext pigContext) throws PlanException, 
IOException,
+            VisitorException {
+        SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
+                pigContext);
+        sparkCompiler.compile();
+        sparkCompiler.connectSoftLink();
+        SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan();
+
+        // optimize key - value handling in package
+        SparkPOPackageAnnotator pkgAnnotator = new SparkPOPackageAnnotator(
+                sparkPlan);
+        pkgAnnotator.visit();
+
+        optimize(sparkPlan, pigContext);
+        return sparkPlan;
+    }
+
+    /**
+     * Only one SparkContext may be active per JVM (SPARK-2243). When multiple 
threads start SparkLaucher,
+     * the static member sparkContext should be initialized only once
+     */
+    private static synchronized void startSparkIfNeeded(PigContext pc) throws 
PigException {
+        if (sparkContext == null) {
+            String master = null;
+            if (pc.getExecType().isLocal()) {
+                master = "local";
+            } else {
+                master = System.getenv("SPARK_MASTER");
+                if (master == null) {
+                    LOG.info("SPARK_MASTER not specified, using \"local\"");
+                    master = "local";
+                }
+            }
+
+            String sparkHome = System.getenv("SPARK_HOME");
+            if (!master.startsWith("local") && !master.equals("yarn-client")) {
+                // Check that we have the Mesos native library and Spark home
+                // are set
+                if (sparkHome == null) {
+                    System.err
+                            .println("You need to set SPARK_HOME to run on a 
Mesos cluster!");
+                    throw new PigException("SPARK_HOME is not set");
+                }
+            }
+
+            SparkConf sparkConf = new SparkConf();
+            Properties pigCtxtProperties = pc.getProperties();
+
+            sparkConf.setMaster(master);
+            
sparkConf.setAppName(pigCtxtProperties.getProperty(PigContext.JOB_NAME,"pig"));
+            // On Spark 1.6, Netty file server doesn't allow adding the same 
file with the same name twice
+            // This is a problem for streaming using a script + explicit ship 
the same script combination (PIG-5134)
+            // HTTP file server doesn't have this restriction, it overwrites 
the file if added twice
+            String useNettyFileServer = 
pigCtxtProperties.getProperty(PigConfiguration.PIG_SPARK_USE_NETTY_FILESERVER, 
"false");
+            sparkConf.set("spark.rpc.useNettyFileServer", useNettyFileServer);
+
+            if (sparkHome != null && !sparkHome.isEmpty()) {
+                sparkConf.setSparkHome(sparkHome);
+            } else {
+                LOG.warn("SPARK_HOME is not set");
+            }
+
+            //Copy all spark.* properties to SparkConf
+            for (String key : pigCtxtProperties.stringPropertyNames()) {
+                if (key.startsWith("spark.")) {
+                    LOG.debug("Copying key " + key + " with value " +
+                            pigCtxtProperties.getProperty(key) + " to 
SparkConf");
+                    sparkConf.set(key, pigCtxtProperties.getProperty(key));
+                }
+            }
+
+            //see PIG-5200 why need to set spark.executor.userClassPathFirst 
as true
+            sparkConf.set("spark.executor.userClassPathFirst", "true");
+            checkAndConfigureDynamicAllocation(master, sparkConf);
+
+            sparkContext = new JavaSparkContext(sparkConf);
+            sparkContext.sc().addSparkListener(new StatsReportListener());
+            sparkContext.sc().addSparkListener(new JobLogger());
+            sparkContext.sc().addSparkListener(jobMetricsListener);
+        }
+    }
+
+    private static void checkAndConfigureDynamicAllocation(String master, 
SparkConf sparkConf) {
+        if (sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
+            if (!master.startsWith("yarn")) {
+                LOG.warn("Dynamic allocation is enabled, but " +
+                        "script isn't running on yarn. Ignoring ...");
+            }
+            if (!sparkConf.getBoolean("spark.shuffle.service.enabled", false)) 
{
+                LOG.info("Spark shuffle service is being enabled as dynamic " +
+                        "allocation is enabled");
+                sparkConf.set("spark.shuffle.service.enabled", "true");
+            }
+        }
+    }
+
+    // You can use this in unit tests to stop the SparkContext between tests.
+    static void stopSpark() {
+        if (sparkContext != null) {
+            sparkContext.stop();
+            sparkContext = null;
+        }
+    }
+
+
+    @Override
+    public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
+                        String format, boolean verbose) throws IOException {
+        SparkOperPlan sparkPlan = compile(pp, pc);
+        explain(sparkPlan, ps, format, verbose);
+    }
+
+    private void explain(SparkOperPlan sparkPlan, PrintStream ps,
+                         String format, boolean verbose)
+            throws IOException {
+        Map<OperatorKey, SparkOperator> allOperKeys = sparkPlan.getKeys();
+        List<OperatorKey> operKeyList = new ArrayList<>(allOperKeys.keySet());
+        Collections.sort(operKeyList);
+
+        if (format.equals("text")) {
+            for (OperatorKey operatorKey : operKeyList) {
+                SparkOperator op = sparkPlan.getOperator(operatorKey);
+                ps.print(op.getOperatorKey());
+                List<SparkOperator> successors = sparkPlan.getSuccessors(op);
+                if (successors != null) {
+                    ps.print("->");
+                    for (SparkOperator suc : successors) {
+                        ps.print(suc.getOperatorKey() + " ");
+                    }
+                }
+                ps.println();
+            }
+            SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
+            printer.setVerbose(verbose);
+            printer.visit();
+        } else if (format.equals("dot")) {
+            ps.println("#--------------------------------------------------");
+            ps.println("# Spark Plan");
+            ps.println("#--------------------------------------------------");
+
+            DotSparkPrinter printer = new DotSparkPrinter(sparkPlan, ps);
+            printer.setVerbose(verbose);
+            printer.dump();
+            ps.println("");
+        } else if (format.equals("xml")) {
+            try {
+                XMLSparkPrinter printer = new XMLSparkPrinter(ps, sparkPlan);
+                printer.visit();
+                printer.closePlan();
+            } catch (ParserConfigurationException e) {
+                e.printStackTrace();
+            } catch (TransformerException e) {
+                e.printStackTrace();
+            }
+        }
+        else {
+            throw new IOException(
+                    "Unsupported explain format. Supported formats are: text, 
dot, xml");
+        }
+    }
+
+    @Override
+    public void kill() throws BackendException {
+        if (sparkContext != null) {
+            sparkContext.stop();
+            sparkContext = null;
+        }
+    }
+
+    @Override
+    public void killJob(String jobID, Configuration conf)
+            throws BackendException {
+        if (sparkContext != null) {
+            sparkContext.stop();
+            sparkContext = null;
+        }
+    }
+
+    /**
+     * We store the value of udf.import.list in SparkEngineConf#properties
+     * Later we will serialize it in SparkEngineConf#writeObject and 
deserialize in SparkEngineConf#readObject. More
+     * detail see PIG-4920
+     */
+    private void saveUdfImportList() {
+        String udfImportList = 
Joiner.on(",").join(PigContext.getPackageImportList());
+        sparkEngineConf.setSparkUdfImportListStr(udfImportList);
+    }
+
+    private void initialize(PhysicalPlan physicalPlan) throws IOException {
+        saveUdfImportList();
+        jobConf = SparkUtil.newJobConf(pigContext, physicalPlan, 
sparkEngineConf);
+        SchemaTupleBackend.initialize(jobConf, pigContext);
+        Utils.setDefaultTimeZone(jobConf);
+        PigMapReduce.sJobConfInternal.set(jobConf);
+        String parallelism = 
pigContext.getProperties().getProperty("spark.default.parallelism");
+        if (parallelism != null) {
+            
SparkPigContext.get().setDefaultParallelism(Integer.parseInt(parallelism));
+        }
+    }
+
+    /**
+     * Creates new SparkCounters instance for the job, initializes aggregate 
warning counters if required
+     * @param jobConf
+     * @throws IOException
+     */
+    private static void prepareSparkCounters(JobConf jobConf) throws 
IOException {
+        SparkPigStatusReporter statusReporter = 
SparkPigStatusReporter.getInstance();
+        SparkCounters counters = new SparkCounters(sparkContext);
+
+        if ("true".equalsIgnoreCase(jobConf.get("aggregate.warning"))) {
+            SparkCounterGroup pigWarningGroup = new 
SparkCounterGroup.MapSparkCounterGroup(
+                    PIG_WARNING_FQCN, PIG_WARNING_FQCN,sparkContext
+            );
+            pigWarningGroup.createCounter(PigWarning.SPARK_WARN.name(), new 
HashMap<String,Long>());
+            pigWarningGroup.createCounter(PigWarning.SPARK_CUSTOM_WARN.name(), 
new HashMap<String,Long>());
+            counters.getSparkCounterGroups().put(PIG_WARNING_FQCN, 
pigWarningGroup);
+        }
+        statusReporter.setCounters(counters);
+        jobConf.set("pig.spark.counters", 
ObjectSerializer.serialize(counters));
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.util.Properties;
+
+/**
+ * SparkLocalExecType is the ExecType for local mode in Spark.
+ */
+public class SparkLocalExecType extends SparkExecType {
+
+    private static final long serialVersionUID = 1L;
+    private static final String mode = "SPARK_LOCAL";
+
+    @Override
+    public boolean accepts(Properties properties) {
+        String execTypeSpecified = properties.getProperty("exectype", "")
+                .toUpperCase();
+        if (execTypeSpecified.equals(mode))
+            return true;
+        return false;
+    }
+
+    @Override
+    public boolean isLocal() {
+        return true;
+    }
+
+    @Override
+    public String name() {
+        return "SPARK_LOCAL";
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class SparkPOUserFuncVisitor extends SparkOpPlanVisitor {
+    private Set<String> cacheFiles = new HashSet<>();
+    private Set<String> shipFiles = new HashSet<>();
+
+    public SparkPOUserFuncVisitor(SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOperator) throws 
VisitorException {
+        if(!sparkOperator.physicalPlan.isEmpty()) {
+            UdfCacheShipFilesVisitor udfCacheFileVisitor = new 
UdfCacheShipFilesVisitor(sparkOperator.physicalPlan);
+            udfCacheFileVisitor.visit();
+            cacheFiles.addAll(udfCacheFileVisitor.getCacheFiles());
+            shipFiles.addAll(udfCacheFileVisitor.getShipFiles());
+        }
+    }
+
+    public Set<String> getCacheFiles() {
+        return cacheFiles;
+    }
+
+    public Set<String> getShipFiles() {
+        return shipFiles;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.rdd.RDD;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * singleton class like PigContext
+ */
+public class SparkPigContext {
+
+    private static SparkPigContext context =  null;
+    private static ThreadLocal<Integer> defaultParallelism = null;
+    private static ConcurrentHashMap<String, Broadcast<List<Tuple>>> 
broadcastedVars = new ConcurrentHashMap() ;
+
+    public static SparkPigContext get(){
+        if( context == null){
+            context = new SparkPigContext();
+        }
+        return context;
+    }
+    public static int getDefaultParallelism() {
+        return defaultParallelism.get();
+    }
+
+
+    public static int getParallelism(List<RDD<Tuple>> predecessors,
+                                     PhysicalOperator physicalOperator) {
+        if (defaultParallelism != null) {
+           return getDefaultParallelism();
+        }
+
+        int parallelism = physicalOperator.getRequestedParallelism();
+        if (parallelism <= 0) {
+            //Spark automatically sets the number of "map" tasks to run on 
each file according to its size (though
+            // you can control it through optional parameters to 
SparkContext.textFile, etc), and for distributed
+            //"reduce" operations, such as groupByKey and reduceByKey, it uses 
the largest parent RDD's number of
+            // partitions.
+            int maxParallism = 0;
+            for (int i = 0; i < predecessors.size(); i++) {
+                int tmpParallelism = predecessors.get(i).getNumPartitions();
+                if (tmpParallelism > maxParallism) {
+                    maxParallism = tmpParallelism;
+                }
+            }
+            parallelism = maxParallism;
+        }
+        return parallelism;
+    }
+
+    public static void setDefaultParallelism(int defaultParallelism) {
+        SparkPigContext.defaultParallelism.set(defaultParallelism);
+    }
+
+     public static ConcurrentHashMap<String, Broadcast<List<Tuple>>> 
getBroadcastedVars() {
+        return broadcastedVars;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.LoadFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+
+/**
+ * Record reader for Spark mode - handles SparkPigSplit
+ */
+public class SparkPigRecordReader extends PigRecordReader {
+
+
+    /**
+     * @param inputformat
+     * @param pigSplit
+     * @param loadFunc
+     * @param context
+     * @param limit
+     */
+    public SparkPigRecordReader(InputFormat<?, ?> inputformat, PigSplit 
pigSplit, LoadFunc loadFunc, TaskAttemptContext context, long limit) throws 
IOException, InterruptedException {
+        super(inputformat, pigSplit, loadFunc, context, limit);
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) 
throws IOException, InterruptedException {
+        SparkPigSplit sparkPigSplit = (SparkPigSplit)split;
+        super.initialize(sparkPigSplit.getWrappedPigSplit(), context);
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+
+/**
+ * Wrapper class for PigSplits in Spark mode
+ *
+ * Spark only counts HDFS bytes read if we provide a FileSplit, so we have to 
wrap PigSplits and have the wrapper
+ * class extend FileSplit
+ */
+public interface SparkPigSplit extends Writable, Configurable, Serializable {
+
+   InputSplit getWrappedSplit();
+
+   InputSplit getWrappedSplit(int idx);
+
+   SplitLocationInfo[] getLocationInfo() throws IOException;
+
+   long getLength(int idx) throws IOException, InterruptedException;
+
+   int getSplitIndex();
+
+   void setMultiInputs(boolean b);
+
+   boolean isMultiInputs();
+
+   int getNumPaths();
+
+   void setDisableCounter(boolean disableCounter);
+
+   boolean disableCounter();
+
+   void setCurrentIdx(int idx);
+
+   PigSplit getWrappedPigSplit();
+
+   public static class FileSparkPigSplit extends FileSplit implements 
SparkPigSplit {
+
+       private PigSplit pigSplit;
+
+       /**
+        * Spark executor's deserializer calls this, and we have to instantiate 
a default wrapped object
+        */
+       public FileSparkPigSplit () {
+           pigSplit = new PigSplit();
+       }
+
+       public FileSparkPigSplit(PigSplit pigSplit) {
+           this.pigSplit = pigSplit;
+       }
+
+       @Override
+       public SplitLocationInfo[] getLocationInfo() throws IOException {
+           return pigSplit.getLocationInfo();
+       }
+
+       @Override
+       public String toString() {
+           return pigSplit.toString();
+       }
+
+       @Override
+       public long getLength() {
+           try {
+               return pigSplit.getLength();
+           } catch (IOException | InterruptedException e) {
+               throw new RuntimeException(e);
+           }
+       }
+
+       @Override
+       public String[] getLocations() throws IOException {
+           try {
+               return pigSplit.getLocations();
+           } catch (InterruptedException e) {
+               throw new RuntimeException(e);
+           }
+       }
+
+       @Override
+       public InputSplit getWrappedSplit() {
+           return pigSplit.getWrappedSplit();
+       }
+
+       @Override
+       public InputSplit getWrappedSplit(int idx) {
+           return pigSplit.getWrappedSplit(idx);
+       }
+
+       @Override
+       public long getLength(int idx) throws IOException, InterruptedException 
{
+           return pigSplit.getLength(idx);
+       }
+
+       @Override
+       public void readFields(DataInput is) throws IOException {
+           pigSplit.readFields(is);
+       }
+
+       @Override
+       public void write(DataOutput os) throws IOException {
+           pigSplit.write(os);
+       }
+
+       @Override
+       public int getSplitIndex() {
+           return pigSplit.getSplitIndex();
+       }
+
+       @Override
+       public void setMultiInputs(boolean b) {
+           pigSplit.setMultiInputs(b);
+       }
+
+       @Override
+       public boolean isMultiInputs() {
+           return pigSplit.isMultiInputs();
+       }
+
+       @Override
+       public Configuration getConf() {
+           return pigSplit.getConf();
+       }
+
+       @Override
+       public void setConf(Configuration conf) {
+           pigSplit.setConf(conf);
+       }
+
+       @Override
+       public int getNumPaths() {
+           return pigSplit.getNumPaths();
+       }
+
+       @Override
+       public void setDisableCounter(boolean disableCounter) {
+           pigSplit.setDisableCounter(disableCounter);
+       }
+
+       @Override
+       public boolean disableCounter() {
+           return pigSplit.disableCounter();
+       }
+
+       @Override
+       public void setCurrentIdx(int idx) {
+           pigSplit.setCurrentIdx(idx);
+       }
+
+       @Override
+       public PigSplit getWrappedPigSplit() {
+           return this.pigSplit;
+       }
+
+       @Override
+       public Path getPath() {
+           return 
((FileSplit)getWrappedPigSplit().getWrappedSplit()).getPath();
+       }
+   }
+
+    public static class GenericSparkPigSplit extends InputSplit implements 
SparkPigSplit {
+
+        private static final long serialVersionUID = 1L;
+
+        private PigSplit pigSplit;
+
+        /**
+         * Spark executor's deserializer calls this, and we have to 
instantiate a default wrapped object
+         */
+        public GenericSparkPigSplit() {
+            pigSplit = new PigSplit();
+        }
+
+        public GenericSparkPigSplit(PigSplit pigSplit) {
+            this.pigSplit = pigSplit;
+        }
+
+        @Override
+        public SplitLocationInfo[] getLocationInfo() throws IOException {
+            return pigSplit.getLocationInfo();
+        }
+
+        @Override
+        public String toString() {
+            return pigSplit.toString();
+        }
+
+        @Override
+        public long getLength() throws IOException, InterruptedException {
+            return pigSplit.getLength();
+        }
+
+        @Override
+        public String[] getLocations() throws IOException, 
InterruptedException {
+            return pigSplit.getLocations();
+        }
+
+        @Override
+        public InputSplit getWrappedSplit() {
+            return pigSplit.getWrappedSplit();
+        }
+
+        @Override
+        public InputSplit getWrappedSplit(int idx) {
+            return pigSplit.getWrappedSplit(idx);
+        }
+
+        @Override
+        public long getLength(int idx) throws IOException, 
InterruptedException {
+            return pigSplit.getLength(idx);
+        }
+
+        @Override
+        public void readFields(DataInput is) throws IOException {
+            pigSplit.readFields(is);
+        }
+
+        @Override
+        public void write(DataOutput os) throws IOException {
+            pigSplit.write(os);
+        }
+
+        @Override
+        public int getSplitIndex() {
+            return pigSplit.getSplitIndex();
+        }
+
+        @Override
+        public void setMultiInputs(boolean b) {
+            pigSplit.setMultiInputs(b);
+        }
+
+        @Override
+        public boolean isMultiInputs() {
+            return pigSplit.isMultiInputs();
+        }
+
+        @Override
+        public Configuration getConf() {
+            return pigSplit.getConf();
+        }
+
+        @Override
+        public void setConf(Configuration conf) {
+            pigSplit.setConf(conf);
+        }
+
+        @Override
+        public int getNumPaths() {
+            return pigSplit.getNumPaths();
+        }
+
+        @Override
+        public void setDisableCounter(boolean disableCounter) {
+            pigSplit.setDisableCounter(disableCounter);
+        }
+
+        @Override
+        public boolean disableCounter() {
+            return pigSplit.disableCounter();
+        }
+
+        @Override
+        public void setCurrentIdx(int idx) {
+            pigSplit.setCurrentIdx(idx);
+        }
+
+        @Override
+        public PigSplit getWrappedPigSplit() {
+            return this.pigSplit;
+        }
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.PigConstants;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.rdd.RDD;
+
+public class SparkUtil {
+    public static <T> ClassTag<T> getManifest(Class<T> clazz) {
+        return ClassTag$.MODULE$.apply(clazz);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <K, V> ClassTag<Tuple2<K, V>> getTuple2Manifest() {
+        return (ClassTag<Tuple2<K, V>>) (Object) getManifest(Tuple2.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <K, V> ClassTag<Product2<K, V>> getProduct2Manifest() {
+        return (ClassTag<Product2<K, V>>) (Object) getManifest(Product2.class);
+    }
+
+    public static JobConf newJobConf(PigContext pigContext, PhysicalPlan 
physicalPlan, SparkEngineConf sparkEngineConf) throws
+            IOException {
+        JobConf jobConf = new JobConf(
+                ConfigurationUtil.toConfiguration(pigContext.getProperties()));
+        //Serialize the thread local variable UDFContext#udfConfs, 
UDFContext#clientSysProps and PigContext#packageImportList
+        //inside SparkEngineConf separately
+        
jobConf.set("spark.engine.conf",ObjectSerializer.serialize(sparkEngineConf));
+        //Serialize the PigContext so it's available in Executor thread.
+        jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
+        // Serialize the thread local variable inside PigContext separately
+        // Although after PIG-4920, we store udf.import.list in SparkEngineConf
+        // but we still need store it in jobConf because it will be used in 
PigOutputFormat#setupUdfEnvAndStores
+        jobConf.set("udf.import.list",
+                ObjectSerializer.serialize(PigContext.getPackageImportList()));
+
+        Random rand = new Random();
+        jobConf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, 
Integer.toString(rand.nextInt()));
+        jobConf.set(PigConstants.LOCAL_CODE_DIR,
+                System.getProperty("java.io.tmpdir"));
+        jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
+
+        LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
+                physicalPlan, POStore.class);
+        POStore firstStore = stores.getFirst();
+        if (firstStore != null) {
+            MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext,
+                    jobConf);
+        }
+        return jobConf;
+    }
+
+    public static <T> Seq<T> toScalaSeq(List<T> list) {
+        return JavaConversions.asScalaBuffer(list);
+    }
+
+    public static void assertPredecessorSize(List<RDD<Tuple>> predecessors,
+            PhysicalOperator physicalOperator, int size) {
+        if (predecessors.size() != size) {
+            throw new RuntimeException("Should have " + size
+                    + " predecessors for " + physicalOperator.getClass()
+                    + ". Got : " + predecessors.size());
+        }
+    }
+
+    public static void assertPredecessorSizeGreaterThan(
+            List<RDD<Tuple>> predecessors, PhysicalOperator physicalOperator,
+            int size) {
+        if (predecessors.size() <= size) {
+            throw new RuntimeException("Should have greater than" + size
+                    + " predecessors for " + physicalOperator.getClass()
+                    + ". Got : " + predecessors.size());
+        }
+    }
+
+    public static Partitioner getPartitioner(String customPartitioner, int 
parallelism) {
+        if (customPartitioner == null) {
+            return new HashPartitioner(parallelism);
+        } else {
+            return new MapReducePartitionerWrapper(customPartitioner, 
parallelism);
+        }
+    }
+
+    // createIndexerSparkNode is a utility to create an indexer spark node 
with baseSparkOp
+    static public void createIndexerSparkNode(SparkOperator baseSparkOp, 
String scope, NodeIdGenerator nig) throws PlanException, ExecException {
+        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+        PhysicalPlan ep = new PhysicalPlan();
+        POProject prj = new POProject(new OperatorKey(scope,
+                nig.getNextNodeId(scope)));
+        prj.setStar(true);
+        prj.setOverloaded(false);
+        prj.setResultType(DataType.TUPLE);
+        ep.add(prj);
+        eps.add(ep);
+
+        List<Boolean> ascCol = new ArrayList<Boolean>();
+        ascCol.add(true);
+
+        int requestedParallelism = baseSparkOp.requestedParallelism;
+        POSort sort = new POSort(new OperatorKey(scope, 
nig.getNextNodeId(scope)), requestedParallelism, null, eps, ascCol, null);
+        //POSort is added to sort the index tuples genereated by 
MergeJoinIndexer.More detail, see PIG-4601
+        baseSparkOp.physicalPlan.addAsLeaf(sort);
+    }
+
+
+
+}
\ No newline at end of file

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.JarManager;
+
+//find udf jars which will be downloaded with spark job on every nodes
+public class UDFJarsFinder extends SparkOpPlanVisitor {
+    private PigContext pigContext = null;
+    private Set<String> udfJars = new HashSet();
+
+    public UDFJarsFinder(SparkOperPlan plan, PigContext pigContext) {
+        super(plan, new DependencyOrderWalker(plan));
+        this.pigContext = pigContext;
+    }
+
+    public void visitSparkOp(SparkOperator sparkOp)
+            throws VisitorException {
+        for (String udf : sparkOp.UDFs) {
+            try {
+                Class clazz = this.pigContext.getClassForAlias(udf);
+                if (clazz != null) {
+                    String jar = JarManager.findContainingJar(clazz);
+                    if (jar != null) {
+                        this.udfJars.add(jar);
+                    }
+                }
+            } catch (IOException e) {
+                throw new VisitorException(e);
+            }
+        }
+    }
+
+    public Set<String> getUdfJars() {
+        return this.udfJars;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.rdd.RDD;
+
+import java.util.List;
+
+public class BroadcastConverter implements RDDConverter<Tuple, Tuple, 
POBroadcastSpark> {
+
+    private final JavaSparkContext sc;
+
+    public BroadcastConverter(JavaSparkContext sc) {
+        this.sc = sc;
+    }
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POBroadcastSpark 
po) {
+        SparkUtil.assertPredecessorSize(predecessors, po, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+
+        // Just collect the predecessor RDD, and broadcast it
+        JavaRDD<Tuple> javaRDD = new JavaRDD<>(rdd, 
SparkUtil.getManifest(Tuple.class));
+        Broadcast<List<Tuple>> broadcastedRDD = 
sc.broadcast(javaRDD.collect());
+
+        // Save the broadcast variable to broadcastedVars map, so that this
+        // broadcasted variable can be referenced by the driver client.
+        
SparkPigContext.get().getBroadcastedVars().put(po.getBroadcastedVariableName(), 
broadcastedRDD);
+
+        return rdd;
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+@SuppressWarnings({"serial"})
+public class CollectedGroupConverter implements RDDConverter<Tuple, Tuple, 
POCollectedGroup> {
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+                              POCollectedGroup physicalOperator) throws 
IOException {
+        SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        CollectedGroupFunction collectedGroupFunction
+                = new CollectedGroupFunction(physicalOperator);
+        return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true)
+                .rdd();
+    }
+
+    private static class CollectedGroupFunction
+            implements FlatMapFunction<Iterator<Tuple>, Tuple> {
+
+        private POCollectedGroup poCollectedGroup;
+
+        public long current_val;
+        public boolean proceed;
+
+        private CollectedGroupFunction(POCollectedGroup poCollectedGroup) {
+            this.poCollectedGroup = poCollectedGroup;
+            this.current_val = 0;
+        }
+
+        public Iterable<Tuple> call(final Iterator<Tuple> input) {
+
+            return new Iterable<Tuple>() {
+
+                @Override
+                public Iterator<Tuple> iterator() {
+
+                    return new OutputConsumerIterator(input) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            poCollectedGroup.setInputs(null);
+                            poCollectedGroup.attachInput(tuple);
+                        }
+
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            return poCollectedGroup.getNextTuple();
+                        }
+
+                        @Override
+                        protected void endOfInput() {
+                            poCollectedGroup.setEndOfInput(true);
+                        }
+                    };
+                }
+            };
+        }
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.rdd.RDD;
+
+public class CounterConverter implements RDDConverter<Tuple, Tuple, POCounter> 
{
+
+    private static final Log LOG = LogFactory.getLog(CounterConverter.class);
+    
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, 
+            POCounter poCounter) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, poCounter, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        CounterConverterFunction f = new CounterConverterFunction(poCounter);
+        JavaRDD<Tuple> jRdd = rdd.toJavaRDD().mapPartitionsWithIndex(f, true);
+//        jRdd = jRdd.cache();
+        return jRdd.rdd();
+    }
+    
+    @SuppressWarnings("serial")
+    private static class CounterConverterFunction implements 
+        Function2<Integer, Iterator<Tuple>, Iterator<Tuple>>, Serializable {
+
+        private final POCounter poCounter;
+        private long localCount = 1L;
+        private long sparkCount = 0L;
+        
+        private CounterConverterFunction(POCounter poCounter) {
+            this.poCounter = poCounter;
+        }
+        
+        @Override
+        public Iterator<Tuple> call(Integer index, final 
+                Iterator<Tuple> input) {
+            Tuple inp = null;
+            Tuple output = null;
+            long sizeBag = 0L;
+
+            List<Tuple> listOutput = new ArrayList<Tuple>();
+            
+            try {
+                while (input.hasNext()) {
+                    inp = input.next();
+                    output = TupleFactory.getInstance()
+                            .newTuple(inp.getAll().size() + 3);
+                    
+                    for (int i = 0; i < inp.getAll().size(); i++) {
+                        output.set(i + 3, inp.get(i));
+                    }
+                    
+                    if (poCounter.isRowNumber() || poCounter.isDenseRank()) {
+                        output.set(2, getLocalCounter());
+                        incrementSparkCounter();
+                        incrementLocalCounter();
+                    } else if (!poCounter.isDenseRank()) {
+                        int positionBag = inp.getAll().size()-1;
+                        if (inp.getType(positionBag) == DataType.BAG) {
+                            sizeBag = ((org.apache.pig.data.DefaultAbstractBag)
+                                    inp.get(positionBag)).size();
+                        }
+                        
+                        output.set(2, getLocalCounter());
+                        
+                        addToSparkCounter(sizeBag);
+                        addToLocalCounter(sizeBag);
+                    }
+                    
+                    output.set(0, index);
+                    output.set(1, getSparkCounter());
+                    listOutput.add(output);
+                }
+            } catch(ExecException e) {
+                throw new RuntimeException(e);
+            }
+            
+                    
+            return listOutput.iterator();
+        }
+        
+        private long getLocalCounter() {
+            return localCount;
+        }
+        
+        private long incrementLocalCounter() {
+            return localCount++;
+        }
+        
+        private long addToLocalCounter(long amount) {
+            return localCount += amount;
+        }
+        
+        private long getSparkCounter() {
+            return sparkCount;
+        }
+        
+        private long incrementSparkCounter() {
+            return sparkCount++;
+        }
+        
+        private long addToSparkCounter(long amount) {
+            return sparkCount += amount;
+        }
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.rdd.PairRDDFunctions;
+import org.apache.spark.rdd.RDD;
+
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.AbstractFunction2;
+
+@SuppressWarnings({ "serial" })
+public class DistinctConverter implements RDDConverter<Tuple, Tuple, 
PODistinct> {
+    private static final Log LOG = LogFactory.getLog(DistinctConverter.class);
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+            PODistinct op) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, op, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+
+        // In DISTINCT operation, the key is the entire tuple.
+        // RDD<Tuple> -> RDD<Tuple2<Tuple, null>>
+        RDD<Tuple2<Tuple, Object>> keyValRDD = rdd.map(new 
ToKeyValueFunction(),
+                SparkUtil.<Tuple, Object> getTuple2Manifest());
+        PairRDDFunctions<Tuple, Object> pairRDDFunctions
+          = new PairRDDFunctions<Tuple, Object>(keyValRDD,
+                SparkUtil.getManifest(Tuple.class),
+                SparkUtil.getManifest(Object.class), null);
+        int parallelism = SparkPigContext.get().getParallelism(predecessors, 
op);
+        return pairRDDFunctions.reduceByKey(
+                SparkUtil.getPartitioner(op.getCustomPartitioner(), 
parallelism),
+                new MergeValuesFunction())
+                .map(new ToValueFunction(), 
SparkUtil.getManifest(Tuple.class));
+    }
+
+    /**
+     * Tuple -> Tuple2<Tuple, null>
+     */
+    private static final class ToKeyValueFunction extends
+            AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
+            Serializable {
+        @Override
+        public Tuple2<Tuple, Object> apply(Tuple t) {
+            Tuple key = t;
+            Object value = null;
+            Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
+            return out;
+        }
+    }
+
+    /**
+     * No-op
+     */
+    private static final class MergeValuesFunction extends
+            AbstractFunction2<Object, Object, Object> implements Serializable {
+        @Override
+        public Object apply(Object arg0, Object arg1) {
+            return null;
+        }
+    }
+
+    /**
+     * Tuple2<Tuple, null> -> Tuple
+     */
+    private static final class ToValueFunction extends
+            AbstractFunction1<Tuple2<Tuple, Object>, Tuple> implements
+            Serializable {
+        @Override
+        public Tuple apply(Tuple2<Tuple, Object> input) {
+            return input._1;
+        }
+    }
+}
\ No newline at end of file


Reply via email to