Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Mon 
May 29 15:00:39 2017
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.tools.pigstats.*;
+import scala.Option;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.PigWarning;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.executor.TaskMetrics;
+
+import com.google.common.collect.Maps;
+
+public class SparkJobStats extends JobStats {
+
+    private int jobId;
+    private Map<String, Long> stats = Maps.newLinkedHashMap();
+    private boolean disableCounter;
+    private Counters counters = null;
+    public static String FS_COUNTER_GROUP = "FS_GROUP";
+    private Map<String, SparkCounter<Map<String, Long>>> warningCounters = 
null;
+
+    protected SparkJobStats(int jobId, PigStats.JobGraph plan, Configuration 
conf) {
+        this(String.valueOf(jobId), plan, conf);
+        this.jobId = jobId;
+    }
+
+    protected SparkJobStats(String jobId, PigStats.JobGraph plan, 
Configuration conf) {
+        super(jobId, plan);
+        setConf(conf);
+    }
+
+    public void setConf(Configuration conf) {
+        super.setConf(conf);
+        disableCounter = conf.getBoolean("pig.disable.counter", false);
+        initializeHadoopCounter();
+    }
+
+    public void addOutputInfo(POStore poStore, boolean success,
+                              JobMetricsListener jobMetricsListener) {
+        if (!poStore.isTmpStore()) {
+            long bytes = getOutputSize(poStore, conf);
+            long recordsCount = -1;
+            if (disableCounter == false) {
+                recordsCount = SparkStatsUtil.getRecordCount(poStore);
+            }
+            OutputStats outputStats = new 
OutputStats(poStore.getSFile().getFileName(),
+                    bytes, recordsCount, success);
+            outputStats.setPOStore(poStore);
+            outputStats.setConf(conf);
+
+            outputs.add(outputStats);
+        }
+    }
+
+    public void addInputStats(POLoad po, boolean success,
+                              boolean singleInput) {
+
+        long recordsCount = -1;
+        if (disableCounter == false) {
+            recordsCount = SparkStatsUtil.getRecordCount(po);
+        }
+        long bytesRead = -1;
+        if (singleInput && stats.get("BytesRead") != null) {
+            bytesRead = stats.get("BytesRead");
+        }
+        InputStats inputStats = new InputStats(po.getLFile().getFileName(),
+                bytesRead, recordsCount, success);
+        inputStats.setConf(conf);
+
+        inputs.add(inputStats);
+    }
+
+    public void collectStats(JobMetricsListener jobMetricsListener) {
+        if (jobMetricsListener != null) {
+            Map<String, List<TaskMetrics>> taskMetrics = 
jobMetricsListener.getJobMetric(jobId);
+            if (taskMetrics == null) {
+                throw new RuntimeException("No task metrics available for 
jobId " + jobId);
+            }
+            stats = combineTaskMetrics(taskMetrics);
+        }
+    }
+
+    public Map<String, Long> getStats() {
+        return stats;
+    }
+
+    private Map<String, Long> combineTaskMetrics(Map<String, 
List<TaskMetrics>> jobMetric) {
+        Map<String, Long> results = Maps.newLinkedHashMap();
+
+        long executorDeserializeTime = 0;
+        long executorRunTime = 0;
+        long resultSize = 0;
+        long jvmGCTime = 0;
+        long resultSerializationTime = 0;
+        long memoryBytesSpilled = 0;
+        long diskBytesSpilled = 0;
+        long bytesRead = 0;
+        long bytesWritten = 0;
+        long remoteBlocksFetched = 0;
+        long localBlocksFetched = 0;
+        long fetchWaitTime = 0;
+        long remoteBytesRead = 0;
+        long shuffleBytesWritten = 0;
+        long shuffleWriteTime = 0;
+        boolean inputMetricExist = false;
+        boolean outputMetricExist = false;
+        boolean shuffleReadMetricExist = false;
+        boolean shuffleWriteMetricExist = false;
+
+        for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+            if (stageMetric != null) {
+                for (TaskMetrics taskMetrics : stageMetric) {
+                    if (taskMetrics != null) {
+                        executorDeserializeTime += 
taskMetrics.executorDeserializeTime();
+                        executorRunTime += taskMetrics.executorRunTime();
+                        resultSize += taskMetrics.resultSize();
+                        jvmGCTime += taskMetrics.jvmGCTime();
+                        resultSerializationTime += 
taskMetrics.resultSerializationTime();
+                        memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+                        diskBytesSpilled += taskMetrics.diskBytesSpilled();
+                        if (!taskMetrics.inputMetrics().isEmpty()) {
+                            inputMetricExist = true;
+                            bytesRead += 
taskMetrics.inputMetrics().get().bytesRead();
+                        }
+
+                        if (!taskMetrics.outputMetrics().isEmpty()) {
+                            outputMetricExist = true;
+                            bytesWritten += 
taskMetrics.outputMetrics().get().bytesWritten();
+                        }
+
+                        Option<ShuffleReadMetrics> shuffleReadMetricsOption = 
taskMetrics.shuffleReadMetrics();
+                        if (!shuffleReadMetricsOption.isEmpty()) {
+                            shuffleReadMetricExist = true;
+                            remoteBlocksFetched += 
shuffleReadMetricsOption.get().remoteBlocksFetched();
+                            localBlocksFetched += 
shuffleReadMetricsOption.get().localBlocksFetched();
+                            fetchWaitTime += 
shuffleReadMetricsOption.get().fetchWaitTime();
+                            remoteBytesRead += 
shuffleReadMetricsOption.get().remoteBytesRead();
+                        }
+
+                        Option<ShuffleWriteMetrics> shuffleWriteMetricsOption 
= taskMetrics.shuffleWriteMetrics();
+                        if (!shuffleWriteMetricsOption.isEmpty()) {
+                            shuffleWriteMetricExist = true;
+                            shuffleBytesWritten += 
shuffleWriteMetricsOption.get().shuffleBytesWritten();
+                            shuffleWriteTime += 
shuffleWriteMetricsOption.get().shuffleWriteTime();
+                        }
+
+                    }
+                }
+            }
+        }
+
+        results.put("EexcutorDeserializeTime", executorDeserializeTime);
+        results.put("ExecutorRunTime", executorRunTime);
+        results.put("ResultSize", resultSize);
+        results.put("JvmGCTime", jvmGCTime);
+        results.put("ResultSerializationTime", resultSerializationTime);
+        results.put("MemoryBytesSpilled", memoryBytesSpilled);
+        results.put("DiskBytesSpilled", diskBytesSpilled);
+        if (inputMetricExist) {
+            results.put("BytesRead", bytesRead);
+            hdfsBytesRead = bytesRead;
+            counters.incrCounter(FS_COUNTER_GROUP, 
PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
+        }
+
+        if (outputMetricExist) {
+            results.put("BytesWritten", bytesWritten);
+            hdfsBytesWritten = bytesWritten;
+            counters.incrCounter(FS_COUNTER_GROUP, 
PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
+        }
+
+        if (shuffleReadMetricExist) {
+            results.put("RemoteBlocksFetched", remoteBlocksFetched);
+            results.put("LocalBlocksFetched", localBlocksFetched);
+            results.put("TotalBlocksFetched", localBlocksFetched + 
remoteBlocksFetched);
+            results.put("FetchWaitTime", fetchWaitTime);
+            results.put("RemoteBytesRead", remoteBytesRead);
+        }
+
+        if (shuffleWriteMetricExist) {
+            results.put("ShuffleBytesWritten", shuffleBytesWritten);
+            results.put("ShuffleWriteTime", shuffleWriteTime);
+        }
+
+        return results;
+    }
+
+    @Override
+    public String getJobId() {
+        return String.valueOf(jobId);
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws FrontendException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getDisplayString() {
+        return null;
+    }
+
+    @Override
+    public int getNumberMaps() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getNumberReduces() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMaxMapTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMinMapTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getAvgMapTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMaxReduceTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMinReduceTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getAvgREduceTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMapInputRecords() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMapOutputRecords() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getReduceInputRecords() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getReduceOutputRecords() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getSMMSpillCount() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getProactiveSpillCountObjects() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getProactiveSpillCountRecs() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Counters getHadoopCounters() {
+        return counters;
+    }
+
+    @Override
+    public Map<String, Long> getMultiStoreCounters() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, Long> getMultiInputCounters() {
+        throw new UnsupportedOperationException();
+    }
+
+    public void setAlias(SparkOperator sparkOperator) {
+        SparkScriptState ss = (SparkScriptState) SparkScriptState.get();
+        SparkScriptState.SparkScriptInfo sparkScriptInfo = ss.getScriptInfo();
+        annotate(ALIAS, sparkScriptInfo.getAlias(sparkOperator));
+        annotate(ALIAS_LOCATION, 
sparkScriptInfo.getAliasLocation(sparkOperator));
+        annotate(FEATURE, sparkScriptInfo.getPigFeatures(sparkOperator));
+    }
+
+    private void initializeHadoopCounter() {
+        counters = new Counters();
+        Counters.Group fsGrp = counters.addGroup(FS_COUNTER_GROUP, 
FS_COUNTER_GROUP);
+        fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_READ, 
PigStatsUtil.HDFS_BYTES_READ, 0);
+        fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_WRITTEN, 
PigStatsUtil.HDFS_BYTES_WRITTEN, 0);
+    }
+
+
+    public Map<String, SparkCounter<Map<String, Long>>> getWarningCounters() {
+        return warningCounters;
+    }
+
+    public void initWarningCounters() {
+        SparkCounters counters = 
SparkPigStatusReporter.getInstance().getCounters();
+        SparkCounterGroup<Map<String, Long>> sparkCounterGroup = 
counters.getSparkCounterGroups().get(
+                PigWarning.class.getCanonicalName());
+        if (sparkCounterGroup != null) {
+            this.warningCounters = sparkCounterGroup.getSparkCounters();
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Mon 
May 29 15:00:39 2017
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.spark;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.pig.PigWarning;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.InputStats;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class SparkPigStats extends PigStats {
+
+    private Map<SparkJobStats,SparkOperator> jobSparkOperatorMap = new 
HashMap<SparkJobStats, SparkOperator>();
+    private static final Log LOG = LogFactory.getLog(SparkPigStats.class);
+
+    private Set<SparkOperator> sparkOperatorsSet = new 
HashSet<SparkOperator>();
+
+    private SparkScriptState sparkScriptState;
+
+    private Configuration conf;
+
+    public SparkPigStats() {
+        jobPlan = new JobGraph();
+        this.sparkScriptState = (SparkScriptState) ScriptState.get();
+    }
+
+    public void initialize(PigContext pigContext, SparkOperPlan sparkPlan, 
Configuration conf) {
+        super.start();
+        this.pigContext = pigContext;
+        this.conf = conf;
+        sparkScriptState.setScriptInfo(sparkPlan);
+    }
+
+    public void addJobStats(POStore poStore, SparkOperator sparkOperator, int 
jobId,
+                            JobMetricsListener jobMetricsListener,
+                            JavaSparkContext sparkContext) {
+        boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext);
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
+        jobStats.setSuccessful(isSuccess);
+        jobStats.collectStats(jobMetricsListener);
+        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener);
+        addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, 
jobMetricsListener, conf);
+        jobStats.initWarningCounters();
+        jobSparkOperatorMap.put(jobStats, sparkOperator);
+
+        jobPlan.add(jobStats);
+    }
+
+
+    public void addFailJobStats(POStore poStore, SparkOperator sparkOperator, 
String jobId,
+                                JobMetricsListener jobMetricsListener,
+                                JavaSparkContext sparkContext,
+                                Exception e) {
+        boolean isSuccess = false;
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
+        jobStats.setSuccessful(isSuccess);
+        jobStats.collectStats(jobMetricsListener);
+        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener);
+        addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, 
jobMetricsListener, conf);
+        jobSparkOperatorMap.put(jobStats, sparkOperator);
+        jobPlan.add(jobStats);
+        jobStats.setBackendException(e);
+    }
+
+    public void addNativeJobStats(NativeSparkOperator sparkOperator, String 
jobId, boolean isSuccess, Exception e) {
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
+        jobStats.setSuccessful(isSuccess);
+        jobSparkOperatorMap.put(jobStats, sparkOperator);
+        jobPlan.add(jobStats);
+        jobStats.setBackendException(e);
+    }
+
+    public void finish() {
+        super.stop();
+        display();
+    }
+
+    private void display() {
+        LOG.info(getDisplayString());
+        handleAggregateWarnings();
+    }
+
+    private void handleAggregateWarnings() {
+        Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
+
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            SparkJobStats js = (SparkJobStats) iter.next();
+            Map<String, SparkCounter<Map<String,Long>>> counterMap = 
js.getWarningCounters();
+            if (counterMap == null) {
+                continue;
+            }
+            Map<String, Long> warningCounters = 
counterMap.get(PigWarning.SPARK_WARN.name()).getValue();
+            if (warningCounters == null) {
+                continue;
+            }
+            for (String warnKey : warningCounters.keySet()) {
+                Long val = warningAggMap.get(warnKey);
+                if (val != null) {
+                    val += (Long)warningCounters.get(warnKey);
+                } else {
+                    val = (Long)warningCounters.get(warnKey);
+                }
+                warningAggMap.put(PigWarning.valueOf(warnKey), val);
+            }
+        }
+        CompilationMessageCollector.logAggregate(warningAggMap, 
CompilationMessageCollector.MessageType.Warning, LOG);
+    }
+
+    @Override
+    public String getDisplayString() {
+        StringBuilder sb = new StringBuilder();
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            SparkJobStats js = (SparkJobStats)iter.next();
+            if (jobSparkOperatorMap.containsKey(js)) {
+                SparkOperator sparkOperator = jobSparkOperatorMap.get(js);
+                js.setAlias(sparkOperator);
+            }
+            sb.append("Spark Job [" + js.getJobId() + "] Metrics");
+            Map<String, Long> stats = js.getStats();
+            if (stats == null) {
+                sb.append("No statistics found for job " + js.getJobId());
+                return sb.toString();
+            }
+
+            Iterator statIt = stats.entrySet().iterator();
+            while (statIt.hasNext()) {
+                Map.Entry pairs = (Map.Entry)statIt.next();
+                sb.append("\t" + pairs.getKey() + " : " + pairs.getValue());
+            }
+            for (InputStats inputStat : js.getInputs()){
+                sb.append("\t"+inputStat.getDisplayString());
+            }
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public JobClient getJobClient() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isEmbedded() {
+        return false;
+    }
+
+    @Override
+    public Map<String, List<PigStats>> getAllStats() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<String> getAllErrorMessages() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getSMMSpillCount() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getProactiveSpillCountObjects() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getProactiveSpillCountRecords() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getNumberJobs() {
+        return jobPlan.size();
+    }
+
+    /**
+     * SparkPlan can have many SparkOperators.
+     * Each SparkOperator can have multiple POStores
+     * We currently collect stats once for every POStore,
+     * But do not want to collect input stats for every POStore
+     *
+     * e.g. After multiQuery optimization, the sparkOperator may look like 
this:
+     * POLoad_1             (PhysicalPlan) ...POStore_A
+     *         \          /
+     *          ...POSplit
+     *         /          \
+     * POLoad_2            (PhysicalPlan) ...POStore_B
+     */
+    private void addInputInfoForSparkOper(SparkOperator sparkOperator,
+                                          SparkJobStats jobStats,
+                                          boolean isSuccess,
+                                          JobMetricsListener 
jobMetricsListener,
+                                          Configuration conf) {
+        //to avoid repetition
+        if (sparkOperatorsSet.contains(sparkOperator)) {
+            return;
+        }
+
+        try {
+            List<POLoad> poLoads = 
PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POLoad.class);
+            for (POLoad load : poLoads) {
+                if (!load.isTmpLoad()) {
+                    jobStats.addInputStats(load, isSuccess, (poLoads.size() == 
1));
+                }
+            }
+        } catch (VisitorException ve) {
+            LOG.warn(ve);
+        }
+
+        sparkOperatorsSet.add(sparkOperator);
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java 
(added)
+++ 
pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java 
Mon May 29 15:00:39 2017
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
+
+/**
+ * Just like PigStatusReporter which will create/reset Hadoop counters, 
SparkPigStatusReporter will
+ * create/reset Spark counters.
+ * Note that, it is not suitable to make SparkCounters as a Singleton, it will 
be created/reset for
+ * a given pig script or a Dump/Store action in Grunt mode.
+ */
+public class SparkPigStatusReporter {
+    private static SparkPigStatusReporter reporter;
+    private SparkCounters counters;
+
+    static {
+        
JVMReuseManager.getInstance().registerForStaticDataCleanup(SparkPigStatusReporter.class);
+    }
+
+    @StaticDataCleanup
+    public static void staticDataCleanup() {
+        reporter = null;
+    }
+
+    private SparkPigStatusReporter() {
+    }
+
+    public static SparkPigStatusReporter getInstance() {
+        if (reporter == null) {
+            reporter = new SparkPigStatusReporter();
+        }
+        return reporter;
+    }
+
+    public void createCounter(String groupName, String counterName) {
+        if (counters != null) {
+            counters.createCounter(groupName, counterName);
+        }
+    }
+
+    public SparkCounters getCounters() {
+        return counters;
+    }
+
+    public void setCounters(SparkCounters counters) {
+        this.counters = counters;
+    }
+}

Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java 
(added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java Mon 
May 29 15:00:39 2017
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.spark;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.ScriptState;
+
+import com.google.common.collect.Maps;
+
+/**
+ * ScriptStates encapsulates settings for a Pig script that runs on a hadoop
+ * cluster. These settings are added to all Spark jobs spawned by the script 
and
+ * in turn are persisted in the hadoop job xml. With the properties already in
+ * the job xml, users who want to know the relations between the script and 
Spark
+ * jobs can derive them from the job xmls.
+ */
+public class SparkScriptState extends ScriptState {
+    public SparkScriptState(String id) {
+        super(id);
+    }
+
+    private SparkScriptInfo scriptInfo = null;
+
+    public void setScriptInfo(SparkOperPlan plan) {
+        this.scriptInfo = new SparkScriptInfo(plan);
+    }
+
+    public SparkScriptInfo getScriptInfo() {
+        return scriptInfo;
+    }
+
+    public static class SparkScriptInfo {
+
+        private static final Log LOG = 
LogFactory.getLog(SparkScriptInfo.class);
+        private SparkOperPlan sparkPlan;
+        private String alias;
+        private String aliasLocation;
+        private String features;
+
+        private Map<OperatorKey, String> featuresMap = Maps.newHashMap();
+        private Map<OperatorKey, String> aliasMap = Maps.newHashMap();
+        private Map<OperatorKey, String> aliasLocationMap = Maps.newHashMap();
+
+        public SparkScriptInfo(SparkOperPlan sparkPlan) {
+            this.sparkPlan = sparkPlan;
+            initialize();
+        }
+
+        private void initialize() {
+            try {
+                new DAGAliasVisitor(sparkPlan).visit();
+            } catch (VisitorException e) {
+                LOG.warn("Cannot calculate alias information for DAG", e);
+            }
+        }
+
+        public String getAlias(SparkOperator sparkOp) {
+            return aliasMap.get(sparkOp.getOperatorKey());
+        }
+
+        public String getAliasLocation(SparkOperator sparkOp) {
+            return aliasLocationMap.get(sparkOp.getOperatorKey());
+        }
+
+        public String getPigFeatures(SparkOperator sparkOp) {
+            return featuresMap.get(sparkOp.getOperatorKey());
+        }
+
+        class DAGAliasVisitor extends SparkOpPlanVisitor {
+
+            private Set<String> aliases;
+            private Set<String> aliasLocations;
+            private BitSet featureSet;
+
+            public DAGAliasVisitor(SparkOperPlan plan) {
+                super(plan, new DependencyOrderWalker<SparkOperator, 
SparkOperPlan>(plan));
+                this.aliases = new HashSet<String>();
+                this.aliasLocations = new HashSet<String>();
+                this.featureSet = new BitSet();
+            }
+
+            @Override
+            public void visitSparkOp(SparkOperator sparkOp) throws 
VisitorException {
+
+                ArrayList<String> aliasList = new ArrayList<String>();
+                String aliasLocationStr = "";
+                try {
+                    ArrayList<String> aliasLocationList = new 
ArrayList<String>();
+                    new AliasVisitor(sparkOp.physicalPlan, aliasList, 
aliasLocationList).visit();
+                    aliasLocationStr += LoadFunc.join(aliasLocationList, ",");
+                    if (!aliasList.isEmpty()) {
+                        Collections.sort(aliasList);
+                        aliases.addAll(aliasList);
+                        aliasLocations.addAll(aliasLocationList);
+                    }
+                } catch (VisitorException e) {
+                    LOG.warn("unable to get alias", e);
+                }
+                aliasMap.put(sparkOp.getOperatorKey(), 
LoadFunc.join(aliasList, ","));
+                aliasLocationMap.put(sparkOp.getOperatorKey(), 
aliasLocationStr);
+
+
+                BitSet feature = new BitSet();
+                feature.clear();
+                if (sparkOp.isSampler()) {
+                    feature.set(PIG_FEATURE.SAMPLER.ordinal());
+                }
+                if (sparkOp.isIndexer()) {
+                    feature.set(PIG_FEATURE.INDEXER.ordinal());
+                }
+                if (sparkOp.isCogroup()) {
+                    feature.set(PIG_FEATURE.COGROUP.ordinal());
+                }
+                if (sparkOp.isGroupBy()) {
+                    feature.set(PIG_FEATURE.GROUP_BY.ordinal());
+                }
+                if (sparkOp.isRegularJoin()) {
+                    feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+                }
+                if (sparkOp.isUnion()) {
+                    feature.set(PIG_FEATURE.UNION.ordinal());
+                }
+                if (sparkOp.isNative()) {
+                    feature.set(PIG_FEATURE.NATIVE.ordinal());
+                }
+                if (sparkOp.isLimit() || sparkOp.isLimitAfterSort()) {
+                    feature.set(PIG_FEATURE.LIMIT.ordinal());
+                }
+                try {
+                    new FeatureVisitor(sparkOp.physicalPlan, feature).visit();
+                } catch (VisitorException e) {
+                    LOG.warn("Feature visitor failed", e);
+                }
+                StringBuilder sb = new StringBuilder();
+                for (int i = feature.nextSetBit(0); i >= 0; i = 
feature.nextSetBit(i + 1)) {
+                    if (sb.length() > 0) sb.append(",");
+                    sb.append(PIG_FEATURE.values()[i].name());
+                }
+                featuresMap.put(sparkOp.getOperatorKey(), sb.toString());
+                for (int i = 0; i < feature.length(); i++) {
+                    if (feature.get(i)) {
+                        featureSet.set(i);
+                    }
+                }
+            }
+
+            @Override
+            public void visit() throws VisitorException {
+                super.visit();
+                if (!aliases.isEmpty()) {
+                    ArrayList<String> aliasList = new 
ArrayList<String>(aliases);
+                    ArrayList<String> aliasLocationList = new 
ArrayList<String>(aliasLocations);
+                    Collections.sort(aliasList);
+                    Collections.sort(aliasLocationList);
+                    alias = LoadFunc.join(aliasList, ",");
+                    aliasLocation = LoadFunc.join(aliasLocationList, ",");
+                }
+                StringBuilder sb = new StringBuilder();
+                for (int i = featureSet.nextSetBit(0); i >= 0; i = 
featureSet.nextSetBit(i + 1)) {
+                    if (sb.length() > 0) sb.append(",");
+                    sb.append(PIG_FEATURE.values()[i].name());
+                }
+                features = sb.toString();
+            }
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
(added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Mon 
May 29 15:00:39 2017
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+import java.util.List;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobGraphBuilder;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class SparkStatsUtil {
+
+    public static final String SPARK_STORE_COUNTER_GROUP = 
PigStatsUtil.MULTI_STORE_COUNTER_GROUP;
+    public static final String SPARK_STORE_RECORD_COUNTER = 
PigStatsUtil.MULTI_STORE_RECORD_COUNTER;
+    public static final String SPARK_INPUT_COUNTER_GROUP = 
PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP;
+    public static final String SPARK_INPUT_RECORD_COUNTER = 
PigStatsUtil.MULTI_INPUTS_RECORD_COUNTER;
+
+    public static void waitForJobAddStats(int jobID,
+                                          POStore poStore, SparkOperator 
sparkOperator,
+                                          JobMetricsListener 
jobMetricsListener,
+                                          JavaSparkContext sparkContext,
+                                          SparkPigStats sparkPigStats)
+            throws InterruptedException {
+        // Even though we are not making any async calls to spark,
+        // the SparkStatusTracker can still return RUNNING status
+        // for a finished job.
+        // Looks like there is a race condition between spark
+        // "event bus" thread updating it's internal listener and
+        // this driver thread calling SparkStatusTracker.
+        // To workaround this, we will wait for this job to "finish".
+        jobMetricsListener.waitForJobToEnd(jobID);
+        sparkPigStats.addJobStats(poStore, sparkOperator, jobID, 
jobMetricsListener,
+                sparkContext);
+        jobMetricsListener.cleanup(jobID);
+    }
+
+    public static void addFailJobStats(String jobID,
+                                       POStore poStore, SparkOperator 
sparkOperator,
+                                       SparkPigStats sparkPigStats,
+                                       Exception e) {
+        JobMetricsListener jobMetricsListener = null;
+        JavaSparkContext sparkContext = null;
+        sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID, 
jobMetricsListener,
+                sparkContext, e);
+    }
+
+    public static String getCounterName(POStore store) {
+        String shortName = 
PigStatsUtil.getShortName(store.getSFile().getFileName());
+
+        StringBuffer sb = new StringBuffer(SPARK_STORE_RECORD_COUNTER);
+        sb.append("_");
+        sb.append(store.getIndex());
+        sb.append("_");
+        sb.append(store.getOperatorKey());
+        sb.append("_");
+        sb.append(shortName);
+        return sb.toString();
+    }
+
+    public static String getCounterName(POLoad load) {
+        String shortName = 
PigStatsUtil.getShortName(load.getLFile().getFileName());
+
+        StringBuffer sb = new StringBuffer(SPARK_INPUT_RECORD_COUNTER);
+        sb.append("_");
+        sb.append(load.getOperatorKey());
+        sb.append("_");
+        sb.append(shortName);
+        return sb.toString();
+    }
+
+    public static long getRecordCount(POStore store) {
+        SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
+        Object value = 
reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, 
getCounterName(store));
+        if (value == null) {
+            return 0L;
+        } else {
+            return (Long)value;
+        }
+    }
+
+    public static long getRecordCount(POLoad load) {
+        SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
+        int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan());
+        Object value = 
reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, 
getCounterName(load));
+        if (value == null) {
+            return 0L;
+        } else {
+            return (Long)value/loadersCount;
+        }
+    }
+
+    private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan 
pp){
+        List<PhysicalOperator> successors = pp.getSuccessors(op);
+        if (successors == null || successors.size()==0) return 1;
+        for (PhysicalOperator successor : successors){
+            if (successor instanceof POSplit){
+                return ((POSplit)successor).getPlans().size();
+            }else{
+                return countCoLoadsIfInSplit(successor,pp);
+            }
+        }
+        return 1;
+    }
+
+    public static boolean isJobSuccess(int jobID,
+                                       JavaSparkContext sparkContext) {
+        if (jobID == JobGraphBuilder.NULLPART_JOB_ID) {
+            return true;
+        }
+        JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();
+        if (status == JobExecutionStatus.SUCCEEDED) {
+            return true;
+        } else if (status != JobExecutionStatus.FAILED) {
+            throw new RuntimeException("Unexpected job execution status " +
+                    status);
+        }
+
+        return false;
+    }
+
+    private static SparkJobInfo getJobInfo(int jobID,
+                                           JavaSparkContext sparkContext) {
+        SparkJobInfo jobInfo = sparkContext.statusTracker().getJobInfo(jobID);
+        if (jobInfo == null) {
+            throw new RuntimeException("No jobInfo available for jobID "
+                    + jobID);
+        }
+
+        return jobInfo;
+    }
+
+    public static void addNativeJobStats(PigStats ps, NativeSparkOperator 
nativeSparkOperator) {
+        ((SparkPigStats) ps).addNativeJobStats(nativeSparkOperator, 
nativeSparkOperator.getJobId(), true, null);
+    }
+
+    public static void addFailedNativeJobStats(PigStats ps, 
NativeSparkOperator nativeSparkOperator, Exception e) {
+        ((SparkPigStats) ps).addNativeJobStats(nativeSparkOperator, 
nativeSparkOperator.getJobId(), false, e);
+    }
+}
\ No newline at end of file

Modified: pig/trunk/test/e2e/pig/build.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/build.xml?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/build.xml (original)
+++ pig/trunk/test/e2e/pig/build.xml Mon May 29 15:00:39 2017
@@ -353,6 +353,12 @@
     </antcall>
   </target>
 
+  <target name="test-spark">
+    <antcall target="test-base">
+      <param name="harness.conf.file" value="${basedir}/conf/spark.conf"/>
+    </antcall>
+  </target>
+
   <target name="deploy-base" depends="property-check, tar, init-test">
     <exec executable="perl" dir="${test.location}" failonerror="true">
       <env key="HARNESS_ROOT" value="."/>

Added: pig/trunk/test/e2e/pig/conf/spark.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/conf/spark.conf?rev=1796639&view=auto
==============================================================================
--- pig/trunk/test/e2e/pig/conf/spark.conf (added)
+++ pig/trunk/test/e2e/pig/conf/spark.conf Mon May 29 15:00:39 2017
@@ -0,0 +1,75 @@
+############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+my $me = `whoami`;
+chomp $me;
+
+# The contents of this file can be rewritten to fit your installation.
+# Also, you can define the following environment variables and set things up 
as in the test setup
+# PH_ROOT           Root directory where test harness is installed
+# PH_LOCAL         Root directory for input and output for local mode tests
+# PH_OUT                   Root directory where output data will be stored (on 
local disk, not HDFS)
+# PH_CLUSTER_BIN    Conf directory for cluster being used
+# HADOOP_CONF_DIR   Binary executable for cluster being used
+# PH_PIG           Root directory for Pig version being used
+
+my $hdfsBase = $ENV{PH_HDFS_BASE} || "/user/pig";
+
+$cfg = {
+    #HDFS
+      'inpathbase'     => "$hdfsBase/tests/data"
+    , 'outpathbase'    => "$hdfsBase/out"
+
+   #LOCAL
+    , 'localinpathbase'   => "$ENV{PH_LOCAL}/in"
+    , 'localoutpathbase'  => "$ENV{PH_LOCAL}/out/log"
+    , 'localxmlpathbase'  => "$ENV{PH_LOCAL}/out/xml"
+    , 'localpathbase'     => "$ENV{PH_LOCAL}/out/pigtest/$me"
+    , 'benchmarkcachepath'=> "$ENV{PH_BENCHMARK_CACHE_PATH}"
+
+    #TEST
+    , 'benchmarkPath'    => "$ENV{PH_OUT}/benchmarks"
+    , 'scriptPath'       => "$ENV{PH_ROOT}/libexec"
+    , 'tmpPath'          => '/tmp/pigtest'
+
+    #PIG
+    , 'testconfigpath'   => "$ENV{HADOOP_CONF_DIR}"
+    , 'funcjarPath'      => "$ENV{PH_ROOT}/lib/java"
+    , 'paramPath'        => "$ENV{PH_ROOT}/paramfiles"
+    , 'piggybankjarPath' => "$ENV{PH_PIGGYBANK_JAR}"
+    , 'pigpath'          => "$ENV{PH_PIG}"
+    , 'oldpigpath'       => "$ENV{PH_OLDPIG}"
+    , 'hcatbin'          => "$ENV{HCAT_BIN}"
+    , 'usePython'        => "$ENV{PIG_USE_PYTHON}"
+    , 'exectype'         => 'spark'
+    , 'benchmark_exectype'         => 'mapred'
+
+    #HADOOP
+    , 'mapredjars'       => "$ENV{PH_ROOT}/lib"
+
+    #HIVE
+    , 'hivelibdir'       => "$ENV{PH_HIVE_LIB_DIR}"
+    , 'hiveversion'      =>  "$ENV{PH_HIVE_VERSION}"
+    , 'hiveshimsversion' => "$ENV{PH_HIVE_SHIMS_VERSION}"
+
+    , 'userhomePath' => "$ENV{HOME}"
+    ,'local.bin'     => '/usr/bin'
+
+    ,'logDir'                => "$ENV{PH_OUT}/log"
+    ,'propertiesFile'     => "./conf/testpropertiesfile.conf"
+    ,'harness.console.level' => 'ERROR'
+
+};

Modified: pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm (original)
+++ pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm Mon May 29 15:00:39 2017
@@ -270,7 +270,7 @@ sub runPigCmdLine
     my @cmd = @baseCmd;
 
     # Add option -l giving location for secondary logs
-    ##!!! Should that even be here? 
+    ##!!! Should that even be here?
     my $locallog = $testCmd->{'localpath'} . $testCmd->{'group'} . "_" . 
$testCmd->{'num'} . ".log";
     push(@cmd, "-logfile");
     push(@cmd, $locallog);
@@ -425,6 +425,11 @@ sub getPigCmd($$$)
         }
         TestDriver::dbg("Additional java parameters: 
[$additionalJavaParams].\n");
     }
+
+    # Several OutOfMemoryErrors - Perm space issues were seen during running 
E2E tests, here max Perm size is adjusted
+    if ($testCmd->{'exectype'} eq "spark") {
+        $additionalJavaParams = "-XX:MaxPermSize=512m";
+    }
     
     push(@pigCmd, ("-x", $testCmd->{'exectype'}));
 
@@ -598,7 +603,7 @@ sub postProcessSingleOutputFile
     if (defined $testCmd->{'floatpostprocess'} &&
             defined $testCmd->{'delimiter'}) {
         $fppCmd .= " | perl $toolpath/floatpostprocessor.pl \"" .
-            $testCmd->{'delimiter'} . "\"";
+            $testCmd->{'delimiter'} . "\" " . $testCmd->{'decimals'};
     }
     
     $fppCmd .= " > $localdir/out_original";

Modified: pig/trunk/test/e2e/pig/tests/bigdata.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/bigdata.conf?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/bigdata.conf (original)
+++ pig/trunk/test/e2e/pig/tests/bigdata.conf Mon May 29 15:00:39 2017
@@ -24,7 +24,7 @@
 
 $cfg = {
        'driver' => 'Pig',
-    'execonly' => 'mapred,tez',
+    'execonly' => 'mapred,tez,spark',
 
        'groups' => [
                {

Modified: pig/trunk/test/e2e/pig/tests/cmdline.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/cmdline.conf?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/cmdline.conf (original)
+++ pig/trunk/test/e2e/pig/tests/cmdline.conf Mon May 29 15:00:39 2017
@@ -254,7 +254,7 @@ dump A;\,
                {
                'name' => 'Warning',
                'floatpostprocess' => 0,
-               'execonly' => 'mapred,tez', # Warnings use counters, which 
don't work in local mode
+               'execonly' => 'mapred,tez,spark', # Warnings use counters, 
which don't work in local mode
                'delimiter' => '        ',
                'tests' => [
                

Modified: pig/trunk/test/e2e/pig/tests/grunt.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/grunt.conf?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/grunt.conf (original)
+++ pig/trunk/test/e2e/pig/tests/grunt.conf Mon May 29 15:00:39 2017
@@ -43,13 +43,13 @@ $cfg = {
                       },{
                         'num' => 2,
                         'pig' => "pwd",
-                        'execonly' => 'mapred,tez', # don't have a clue what 
their cwd will be for local mode
+                        'execonly' => 'mapred,tez,spark', # don't have a clue 
what their cwd will be for local mode
                         'expected_out_regex' => "/user",
                         'rc' => 0
                       },{
                         'num' => 3,
                         'pig' => "ls .",
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'expected_out_regex' => "/user",
                         'rc' => 0
                       },{

Modified: pig/trunk/test/e2e/pig/tests/hcat.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/hcat.conf?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/hcat.conf (original)
+++ pig/trunk/test/e2e/pig/tests/hcat.conf Mon May 29 15:00:39 2017
@@ -24,7 +24,7 @@
 
 $cfg = {
        'driver' => 'Pig',
-    'execonly' => 'mapred,tez',
+    'execonly' => 'mapred,tez,spark',
 
        'groups' => [
                {

Modified: pig/trunk/test/e2e/pig/tests/multiquery.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/multiquery.conf?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/trunk/test/e2e/pig/tests/multiquery.conf Mon May 29 15:00:39 2017
@@ -529,7 +529,7 @@ $cfg = {
             # Streaming in demux
             {
             'num' => 2,
-            'execonly' => 'mapred,tez',
+            'execonly' => 'mapred,tez,spark',
             'pig' => q#
                         define CMD `perl GroupBy.pl '\t' 0` 
ship(':SCRIPTHOMEPATH:/GroupBy.pl');
                         A = load ':INPATH:/singlefile/studenttab10k';
@@ -547,7 +547,7 @@ $cfg = {
             # Streaming in nested demux
             {
             'num' => 3,
-            'execonly' => 'mapred,tez',
+            'execonly' => 'mapred,tez,spark',
             'pig' => q#
                         define CMD `perl GroupBy.pl '\t' 0` 
ship(':SCRIPTHOMEPATH:/GroupBy.pl');
                         A = load ':INPATH:/singlefile/studenttab10k';

Modified: pig/trunk/test/e2e/pig/tests/negative.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/negative.conf?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/negative.conf (original)
+++ pig/trunk/test/e2e/pig/tests/negative.conf Mon May 29 15:00:39 2017
@@ -312,7 +312,7 @@ dump B;#,
                         {
                        # Define uses using non-existent command (autoship)
                         'num' => 1,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreamingNotThere.pl`;
@@ -411,7 +411,7 @@ dump B;\,
                        # Streaming application fails in the beginning of 
processing
                        # NEED TO CHECK STDERR MANUALLY FOR NOW
                         'num' => 1,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                         'pig' => q\
 define CMD `perl PigStreamingBad.pl start` 
ship(':SCRIPTHOMEPATH:/PigStreamingBad.pl')  stderr('CMD' limit 1);
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -423,7 +423,7 @@ store B into ':OUTPATH:';\,
                        # Streaming application fails in the middle of 
processing
                        # NEED TO CHECK STDERR MANUALLY FOR NOW
                         'num' => 2,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                         'pig' => q\
 define CMD `perl PigStreamingBad.pl middle` 
ship(':SCRIPTHOMEPATH:/PigStreamingBad.pl')  stderr('CMD' limit 1); 
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -436,7 +436,7 @@ store B into ':OUTPATH:';\,
                        # bring logs to dfs
                        # NEED TO CHECK STDERR MANUALLY FOR NOW
                         'num' => 3,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                         'pig' => q\
 define CMD `perl PigStreamingBad.pl end` 
ship(':SCRIPTHOMEPATH:/PigStreamingBad.pl') stderr('CMD' limit 1);
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -449,7 +449,7 @@ store B into ':OUTPATH:';\,
                        # bring logs to dfs
                        # NEED TO CHECK STDERR MANUALLY FOR NOW
                         'num' => 4,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                         'pig' => q\
 define CMD `perl DieRandomly.pl 10000 2` 
ship(':SCRIPTHOMEPATH:/DieRandomly.pl') stderr('CMD' limit 1);
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -480,7 +480,7 @@ store C into ':OUTPATH:.2';\,
                         {
                         # Invalid deserializer - throws exception
                         'num' => 1,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                         'pig' => q\
 register :FUNCPATH:/testudf.jar;
 define CMD `perl PigStreaming.pl` input(stdin) output(stdout using 
org.apache.pig.test.udf.streaming.DumpStreamerBad) 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
@@ -492,7 +492,7 @@ store B into ':OUTPATH:';\,
                        {
                        # Invalid serializer - throws exception
                         'num' => 2,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                         'pig' => q\
 define CMD `perl PigStreamingDepend.pl` input(stdin using StringStoreBad) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
 A = load ':INPATH:/singlefile/studenttab10k';

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Mon May 29 15:00:39 2017
@@ -573,6 +573,7 @@ c = foreach b generate group, AVG(a.gpa)
 store c into ':OUTPATH:';\,
                         'floatpostprocess' => 1,
                         'delimiter' => '       ',
+                        'decimals' => 6,
                        },
                        {
                        'num' => 10,
@@ -2202,6 +2203,7 @@ store d into ':OUTPATH:';\,
                        },
                        {
                                'num' => 4,
+                               'execonly' => 'mapred,local,tez', # Spark 
doesn't do implicit ordering in distinct
                                'pig' =>q\a = load 
':INPATH:/singlefile/studentnulltab10k';
 b = distinct a;
 c = limit b 100;
@@ -2292,7 +2294,7 @@ store b into ':OUTPATH:';\,
                         },
                        {
                                'num' => 12,
-                               'execonly' => 'tez', #Limit_5 was not able to 
test on tez. 
+                               'execonly' => 'tez,spark', #Limit_5 was not 
able to test on tez.
                                'pig' =>q\a = load 
':INPATH:/singlefile/studenttab10k';
 b = load ':INPATH:/singlefile/studenttab10k';
 a1 = foreach a generate $0, $1;
@@ -2303,6 +2305,15 @@ store d into ':OUTPATH:';\,
                                'verify_pig_script' => q\a = load 
':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int);
 b = limit a 100;
 store b into ':OUTPATH:';\,
+                       },
+                       {
+                               'num' => 13,
+                               'execonly' => 'spark', # Limit_4 failed on 
Spark: distinct doesn't do implicit sort like it does in MR
+                               'pig' =>q\a = load 
':INPATH:/singlefile/studentnulltab10k';
+b = distinct a;
+c = order b by $0, $1, $2;
+d = limit c 100;
+store d into ':OUTPATH:';\,
                        }
                ]
                },
@@ -2948,7 +2959,7 @@ store d into ':OUTPATH:';\,
                                # Merge-join with one file across multiple 
blocks
                    {
                 'num' => 8,
-                           'execonly' => 'mapred,tez', # since this join will 
run out of memory in local mode
+                           'execonly' => 'mapred,tez,spark', # since this join 
will run out of memory in local mode
                        'floatpostprocess' => 1,
                        'delimiter' => '        ',
                 'pig' => q\a = load ':INPATH:/singlefile/votertab10k';
@@ -3616,7 +3627,7 @@ store b into ':OUTPATH:';\,
             'tests' => [
                     {
                     'num' => 1,
-                    'execonly' => 'mapred,tez', # studenttab20m not available 
in local mode
+                    'execonly' => 'mapred,tez,spark', # studenttab20m not 
available in local mode
                     'pig' => q\
 a = load ':INPATH:/singlefile/studenttab20m' using PigStorage() as (name, age, 
gpa);
 b = foreach a generate age;
@@ -4361,7 +4372,7 @@ store b into ':OUTPATH:';\,
                     {
                     # test group
                     'num' => 1,
-                    'execonly' => 'mapred,tez', # since this join will run out 
of memory in local mode
+                    'execonly' => 'mapred,tez,spark', # since this join will 
run out of memory in local mode
                     'pig' => q\register :FUNCPATH:/testudf.jar;
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, 
age:int, gpa);
 b = group a by age PARTITION BY 
org.apache.pig.test.utils.SimpleCustomPartitioner2 parallel 2;
@@ -4952,7 +4963,7 @@ store C into ':OUTPATH:';\,
                 {
                     'num' => 1,
                     'java_params' => ['-Dopt.fetch=false'],
-                    'execonly' => 'mapred,tez', # since distributed cache is 
not supported in local mode
+                    'execonly' => 'mapred,tez,spark', # since distributed 
cache is not supported in local mode
                     'pig' => q?
                         register :FUNCPATH:/testudf.jar;
                         define udfdc 
org.apache.pig.test.udf.evalfunc.Udfcachetest(':INPATH:/singlefile/votertab10k#foodle');
@@ -5194,7 +5205,7 @@ store C into ':OUTPATH:';\,
                     }, {
                         # PIG-2576
                         'num' => 4,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q?register :FUNCPATH:/testudf.jar;
                                 define printconf 
org.apache.pig.test.udf.evalfunc.UdfContextFrontend('dummy');
                                 a = load ':INPATH:/singlefile/studenttab10k' 
as (name, age, gpa);
@@ -5249,7 +5260,7 @@ store C into ':OUTPATH:';\,
                 ],
             },{
                 'name' => 'Bloom',
-                           'execonly' => 'mapred,tez', # distributed cache 
does not work in local mode
+                           'execonly' => 'mapred,tez', # distributed cache 
does not work in local mode, bloom is not implemented for Spark(PIG-5117)
                 'tests' => [
                     {
                         'num' => 1,
@@ -5694,7 +5705,7 @@ store a into ':OUTPATH:';\,
                 'tests' => [
                     {
                         'num' => 1,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q\
                                     SET default_parallel 7;
                                     SET 
mapreduce.input.fileinputformat.split.maxsize '300';
@@ -5711,7 +5722,7 @@ store a into ':OUTPATH:';\,
                                 \,
                     }, {
                         'num' =>2,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q\
                                     SET default_parallel 9;
                                     SET 
mapreduce.input.fileinputformat.split.maxsize '300';
@@ -5728,7 +5739,7 @@ store a into ':OUTPATH:';\,
                                 \,
                     }, {
                         'num' =>3,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q\
                                     SET default_parallel 7;
                                     SET 
mapreduce.input.fileinputformat.split.maxsize '300';
@@ -5745,7 +5756,7 @@ store a into ':OUTPATH:';\,
                                 \,
                     }, {
                         'num' => 4,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q\
                                     SET default_parallel 5;
                                     SET 
mapreduce.input.fileinputformat.split.maxsize '300';
@@ -5763,7 +5774,7 @@ store a into ':OUTPATH:';\,
                                 \,
                     }, {
                         'num' => 5,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q\
                                     SET default_parallel 5;
                                     SET 
mapreduce.input.fileinputformat.split.maxsize '300';
@@ -5786,7 +5797,7 @@ store a into ':OUTPATH:';\,
                                 \,
                     }, {
                         'num' => 6,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q\
                                     SET 
mapreduce.input.fileinputformat.split.maxsize '300';
                                     SET pig.splitCombination false;
@@ -5810,7 +5821,7 @@ store a into ':OUTPATH:';\,
                 'tests' => [
                     {
                                                'num' => 1,
-                                               'execonly' => 'mapred,tez',
+                                               'execonly' => 
'mapred,tez,spark',
                                                'pig' => q\
                                                                         SET 
mapreduce.input.fileinputformat.split.maxsize '300';
                                                                         SET 
pig.splitCombination false;
@@ -5827,7 +5838,7 @@ store a into ':OUTPATH:';\,
                                                                \,
                                        }, {
                                                'num' => 2,
-                                               'execonly' => 'mapred,tez',
+                                               'execonly' => 
'mapred,tez,spark',
                                                'pig' => q\
                                                                         SET 
mapreduce.input.fileinputformat.split.maxsize '300';
                                                                         SET 
pig.splitCombination false;
@@ -5844,7 +5855,7 @@ store a into ':OUTPATH:';\,
                                                                \,
                                        }, {
                                                'num' => 3,
-                                               'execonly' => 'mapred,tez',
+                                               'execonly' => 
'mapred,tez,spark',
                                                'pig' => q\
                                                                         SET 
mapreduce.input.fileinputformat.split.maxsize '300';
                                                                         SET 
pig.splitCombination false;
@@ -5861,7 +5872,7 @@ store a into ':OUTPATH:';\,
                                                                \,
                                        }, {
                                                'num' => 4,
-                                               'execonly' => 'mapred,tez',
+                                               'execonly' => 
'mapred,tez,spark',
                                                'pig' => q\
                                                                         SET 
mapreduce.input.fileinputformat.split.maxsize '300';
                                                                         SET 
pig.splitCombination false;
@@ -5881,7 +5892,7 @@ store a into ':OUTPATH:';\,
                                                                \,
                                        }, {
                                                'num' => 5,
-                                               'execonly' => 'mapred,tez',
+                                               'execonly' => 
'mapred,tez,spark',
                                                'pig' => q\
                                                                        SET 
default_parallel 9;
                                                                         SET 
mapreduce.input.fileinputformat.split.maxsize '300';
@@ -6009,8 +6020,9 @@ store a into ':OUTPATH:';\,
                         fs -rm :INPATH:/singlefile/names.txt#
                         },
                         {
-                # Custom Hive UDF and MapredContext
+                # Custom Hive UDF and MapredContext - disabled for Spark: see 
PIG-5234
                 'num' => 7,
+                'execonly' => 'mapred,tez',
                 'pig' => q\set mapred.max.split.size '100000000'
                         register :FUNCPATH:/testudf.jar;
                         define DummyContextUDF 
HiveUDF('org.apache.pig.test.udf.evalfunc.DummyContextUDF');

Modified: pig/trunk/test/e2e/pig/tests/orc.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/orc.conf?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/orc.conf (original)
+++ pig/trunk/test/e2e/pig/tests/orc.conf Mon May 29 15:00:39 2017
@@ -46,6 +46,7 @@ store b into ':OUTPATH:';\,
                         {
                         'num' => 2,
                         'notmq' => 1,
+                        'execonly' => 'mapred,tez',
                         'pig' => q\
 a = load ':INPATH:/singlefile/studentcomplextab10k' as (nameagegpamap:map[], 
nameagegpatuple:tuple(tname:chararray, tage:int, tgpa:float), 
nameagegpabag:bag{t:tuple(bname:chararray, bage:int, bgpa:float)});
 store a into ':OUTPATH:.intermediate' using OrcStorage();
@@ -113,6 +114,23 @@ store c into ':OUTPATH:';\,
                         'verify_pig_script' => q\a = load 
':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age);
 b = foreach a generate (name is null ? [] : TOMAP(name, age));
 store b into ':OUTPATH:';\,
+                        },
+# Test 6 : Running for Spark only as a replacement of Test 2: Spark and MR may 
produce different order of entries in
+# Pig maps, which although is fine, triggers a false failure during comparison
+                        {
+                          'num' => 6,
+                          'notmq' => 1,
+                          'execonly' => 'spark',
+                          'pig' => q\
+a = load ':INPATH:/singlefile/studentcomplextab10k' as (nameagegpamap:map[], 
nameagegpatuple:tuple(tname:chararray, tage:int, tgpa:float), 
nameagegpabag:bag{t:tuple(bname:chararray, bage:int, bgpa:float)});
+store a into ':OUTPATH:.intermediate' using OrcStorage();
+exec
+b = load ':OUTPATH:.intermediate' using OrcStorage();
+c = foreach b generate nameagegpamap#'name', nameagegpamap#'age', 
nameagegpamap#'gpa', nameagegpatuple, nameagegpabag;
+store c into ':OUTPATH:';\,
+                          'verify_pig_script' => q\a = load 
':INPATH:/singlefile/studentcomplextab10k' as (nameagegpamap:map[], 
nameagegpatuple:tuple(tname:chararray, tage:int, tgpa:float), 
nameagegpabag:bag{t:tuple(bname:chararray, bage:int, bgpa:float)});
+b = foreach a generate nameagegpamap#'name', nameagegpamap#'age', 
nameagegpamap#'gpa', nameagegpatuple, nameagegpabag;
+store b into ':OUTPATH:';\,
                         }
                         ]
                 },
@@ -139,7 +157,7 @@ store b into ':OUTPATH:';\,
                         {
                         'num' => 2,
                         'notmq' => 1,
-                        'execonly' => 'mapred,tez', # studenttab20m not 
available in local mode
+                        'execonly' => 'mapred,tez,spark', # studenttab20m not 
available in local mode
                         'pig' => q\
 a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, 
gpa:float);
 b = order a by age desc parallel 4;

Modified: pig/trunk/test/e2e/pig/tests/streaming.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/streaming.conf?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/streaming.conf (original)
+++ pig/trunk/test/e2e/pig/tests/streaming.conf Mon May 29 15:00:39 2017
@@ -79,7 +79,7 @@ store C into ':OUTPATH:';#,
                        {
                        #Section 1.1: perl script, no parameters, 
autoship(Section 2.1)
                         'num' => 4,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 B = foreach A generate $0, $1, $2;
@@ -90,7 +90,7 @@ store C into ':OUTPATH:';#,
                        {
                        # Section 1.2: perl script that takes parameters; 
explicit ship of script (Section 2.1)
                         'num' => 5,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl - -` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') 
stderr('CMD' limit 1);
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -102,7 +102,7 @@ store C into ':OUTPATH:';#,
                        {
                        # Section 1.3: define clause; explicit ship of script 
(Section 2.1)
                         'num' => 6,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') 
stderr('CMD');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -115,7 +115,7 @@ store D into ':OUTPATH:';#,
                        {
                        # Section 1.4: grouped data
                         'num' => 7,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -128,7 +128,7 @@ store D into ':OUTPATH:';#,
                        {
                        # Section 1.4: grouped and ordered data
                         'num' => 8,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl GroupBy.pl '\t' 0 1` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -144,7 +144,7 @@ store E into ':OUTPATH:';#,
                        {
                        # Section 1.5: multiple streaming operators - adjacent 
- map side
                         'num' => 9,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -157,7 +157,7 @@ store D into ':OUTPATH:';#,
                        {
                        # Section 1.5: multiple streaming operators - not 
adjacent - map side
                         'num' => 10,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 define CMD `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
@@ -172,7 +172,7 @@ store E into ':OUTPATH:';#,
                        {
                        # Section 1.5: multiple streaming operators - adjacent 
- reduce side
                         'num' => 11,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD1 `perl GroupBy.pl '\t' 0 1` ship(':SCRIPTHOMEPATH:/GroupBy.pl') 
stderr('CMD1');
 define CMD2 `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm') stderr('CMD2');
@@ -191,7 +191,7 @@ store F into ':OUTPATH:';#,
                        # Section 1.5: multiple streaming operators - one on 
map and one on reduce side
                        # same alias name
                         'num' => 12,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD1 `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
 define CMD2 `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
@@ -206,7 +206,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 1.5: multiple streaming operators - adjacent 
- map side
                         'num' => 13,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -232,7 +232,7 @@ store D into ':OUTPATH:';#,
                         # Section 2.1: perl script and its dependency shipped
                        # Also covers part of section 3.1: custom serializer
                        'num' => 1,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -243,7 +243,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 2.1: perl script and supported data file is 
shipped
                        'num' => 2,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl - - nameMap` 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl', ':SCRIPTHOMEPATH:/nameMap');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -257,7 +257,7 @@ store E into ':OUTPATH:';#,
                        {
                        # Section 2.2: script is shipped while the supporting 
file is cached
                        'num' => 3,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q@
 define CMD `perl PigStreaming.pl - - nameMap` 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl') 
cache(':INPATH:/nameMap/part-00000#nameMap');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -283,7 +283,7 @@ store E into ':OUTPATH:';@,
                        {
                        # Section 3.1: use of custom deserializer
                        'num' => 1,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl` output(stdout) 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -294,7 +294,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 3.1: use of custom serializer and deserializer
                        'num' => 2,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 register :FUNCPATH:/testudf.jar;
 define CMD `perl PigStreaming.pl` input(stdin using 
org.apache.pig.test.udf.streaming.StreamingDump) output(stdout using 
org.apache.pig.test.udf.streaming.DumpStreamer) 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
@@ -307,7 +307,7 @@ store C into ':OUTPATH:';#,
                        {
                        # Section 3.3: streaming application reads from file 
rather than stdin
                        'num' => 3,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl foo -` input('foo') 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -318,7 +318,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 3.4: streaming application writes single 
output to a file
                        'num' => 4,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl - foo nameMap` output('foo') 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl', ':SCRIPTHOMEPATH:/nameMap');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -330,7 +330,7 @@ store C into ':OUTPATH:';#,
                        {
                        # Section 3.4: streaming application writes multiple 
outputs to file
                        'num' => 5,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreamingDepend.pl - sio_5_1 sio_5_2` input(stdin) 
output('sio_5_1', 'sio_5_2') ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -341,7 +341,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 3.4: streaming application writes multiple 
outputs: 1 to file and 1 to stdout
                        'num' => 6,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreamingDepend.pl - - sio_5_2` input(stdin) 
output(stdout, 'sio_5_2') ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -362,7 +362,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 4.3: integration with parameter substitition
                        'num' => 1,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig_params' => ['-p', 
qq(script_name='PigStreaming.pl')],
                        'pig' => q#
 define CMD `perl $script_name - - nameMap` 
ship(':SCRIPTHOMEPATH:/$script_name', ':SCRIPTHOMEPATH:/nameMap');
@@ -387,7 +387,7 @@ store E into ':OUTPATH:';#,
                        {
                        # Section 5.1: load/store optimization
                        'num' => 1,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') 
stderr('CMD');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -422,7 +422,7 @@ store D into ':OUTPATH:';#,
                        {
                        # PIG-272: problem with optimization and intermediate 
store
                        'num' => 3,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD1 `perl -ne 'print $_;print STDERR "stderr $_";'`; 
 define CMD2 `Split.pl 3` input(stdin using PigStreaming(',')) 
ship(':SCRIPTHOMEPATH:/Split.pl'); 
@@ -444,7 +444,7 @@ store D into ':OUTPATH:';#,
                        {
                        # PIG-272: problem with optimization and intermediate 
store
                        'num' => 4,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD1 `perl -ne 'print $_;'`; 
 define CMD2 `Split.pl 3` input(stdin using PigStreaming(',')) 
ship(':SCRIPTHOMEPATH:/Split.pl'); 
@@ -472,7 +472,7 @@ store E into ':OUTPATH:';#,
                        # Make sure join with stream optimization works
             # optimization only on load side
                        'num' => 5,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 B = stream A through `cat` as (name:chararray, age:int, gpa:double);
@@ -485,7 +485,7 @@ store D into ':OUTPATH:';#,
                        # Make sure join with stream optimization works
             # optimization only on store side 
                        'num' => 6,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 B = filter A by $1 > 25;
@@ -500,7 +500,7 @@ store D into ':OUTPATH:';#,
                        # Make sure join with stream optimization works
             # optimization on load and store
                        'num' => 7,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 B = stream A through `cat` as (name:chararray, age:int, gpa:double);
@@ -536,7 +536,7 @@ store B into ':OUTPATH:';#,
                        # case where binary finishes normally
                 # BEFORE all input has been passed to it
                         'num' => 2,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q#
 define CMD `perl DieRandomly.pl 10000 0` 
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -565,7 +565,7 @@ store D into ':OUTPATH:';#,
                 # BEFORE all input has been passed to it
                 # FIXME: in local mode
                         'num' => 4,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                         'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 B = stream A through `head -1` as (name, age, gpa);
@@ -581,7 +581,7 @@ store E into ':OUTPATH:';#,
                 # BEFORE all input has been passed to it
                 # and emits no output
                         'num' => 5,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q#
 define CMD `perl DieRandomly.pl 10000 0` 
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
 A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -598,7 +598,7 @@ store D into ':OUTPATH:';#,
                 # BEFORE all input has been passed to it
                 # and emits no output
                         'num' => 6,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q#
 define CMD `perl DieRandomly.pl 10000 0` 
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
 A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -613,7 +613,7 @@ store E into ':OUTPATH:';#,
                 # two stream operators one after another where first
                 # one emits no output
                         'num' => 7,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q#
 define CMD `perl DieRandomly.pl 10000 0` 
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
 A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);

Modified: pig/trunk/test/e2e/pig/tests/turing_jython.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/turing_jython.conf?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/turing_jython.conf (original)
+++ pig/trunk/test/e2e/pig/tests/turing_jython.conf Mon May 29 15:00:39 2017
@@ -452,9 +452,9 @@ result = P.bind({'in1':input1, 'in2':inp
                        ,'rc'=> 0
 
        }, {
-                # illustrate() on a complex query       
+                # illustrate() on a complex query
                'num' => 2
-               ,'execonly' => 'mapred,local' #TODO: PIG-3993: Illustrate is 
yet to be implemented in Tez
+               ,'execonly' => 'mapred,local' #TODO: PIG-3993,PIG-5204: 
Illustrate is yet to be implemented in Tez and in Spark
                ,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
 

Modified: pig/trunk/test/e2e/pig/tools/test/floatpostprocessor.pl
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tools/test/floatpostprocessor.pl?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tools/test/floatpostprocessor.pl (original)
+++ pig/trunk/test/e2e/pig/tools/test/floatpostprocessor.pl Mon May 29 15:00:39 
2017
@@ -26,6 +26,7 @@ use strict;
 
 our @floats;
 our $delim;
+our $decimals;
 
 sub parseLine($)
 {
@@ -41,7 +42,7 @@ sub postprocess($)
        for (my $i = 0; $i < @fields; $i++) {
                if ($i != 0) { print($delim); }
                if ($floats[$i]) {
-                       printf("%.3f", $fields[$i]);
+                       printf("%." . $decimals . "f", $fields[$i]);
                } else {
                        print($fields[$i]);
                }
@@ -72,6 +73,10 @@ sub is_float {
        if (!defined($delim)) {
                die "Usage: $0 delimiter\n";
        }
+       $decimals = shift;
+       if (!defined($decimals)) {
+               $decimals = 3;
+       }
 
        my @sampled;
     my $line;

Modified: pig/trunk/test/excluded-tests-mr
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/excluded-tests-mr?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/excluded-tests-mr (original)
+++ pig/trunk/test/excluded-tests-mr Mon May 29 15:00:39 2017
@@ -1 +1,2 @@
-**/tez/*.java
\ No newline at end of file
+**/tez/*.java
++**/spark/*.java
\ No newline at end of file

Added: pig/trunk/test/excluded-tests-spark
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/excluded-tests-spark?rev=1796639&view=auto
==============================================================================
--- pig/trunk/test/excluded-tests-spark (added)
+++ pig/trunk/test/excluded-tests-spark Mon May 29 15:00:39 2017
@@ -0,0 +1,4 @@
+**/Test*MR.java
+**/tez/*.java
+**/TestNativeMapReduce.java
+**/TestCounters.java

Modified: pig/trunk/test/excluded-tests-tez
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/excluded-tests-tez?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/excluded-tests-tez (original)
+++ pig/trunk/test/excluded-tests-tez Mon May 29 15:00:39 2017
@@ -1 +1,2 @@
-**/Test*MR.java
\ No newline at end of file
+**/Test*MR.java
++**/spark/*.java
\ No newline at end of file

Modified: 
pig/trunk/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
 (original)
+++ 
pig/trunk/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
 Mon May 29 15:00:39 2017
@@ -62,6 +62,9 @@ public class TestLocationInPhysicalPlan
         JobStats jStats = 
(JobStats)job.getStatistics().getJobGraph().getSinks().get(0);
         if (Util.getLocalTestMode().toString().equals("TEZ_LOCAL")) {
             Assert.assertEquals("A[1,4],A[3,4],B[2,4]", 
jStats.getAliasLocation());
+        } else if (Util.getLocalTestMode().toString().equals("SPARK_LOCAL")) {
+            //TODO PIG-5239:Investigate why there are duplicated A[3,4]
+            Assert.assertEquals("A[1,4],A[3,4],B[2,4],A[3,4]", 
jStats.getAliasLocation());
         } else {
             Assert.assertEquals("M: A[1,4],A[3,4],B[2,4] C: A[3,4],B[2,4] R: 
A[3,4]", jStats.getAliasLocation());
         }

Modified: pig/trunk/test/org/apache/pig/pigunit/PigTest.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/pigunit/PigTest.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/pigunit/PigTest.java (original)
+++ pig/trunk/test/org/apache/pig/pigunit/PigTest.java Mon May 29 15:00:39 2017
@@ -142,6 +142,12 @@ public class PigTest {
           } else if 
(System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("tez_local"))
 {
             LOG.info("Using tez local mode");
             execType = ExecTypeProvider.fromString("tez_local");
+          } else if 
(System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("spark")) {
+              LOG.info("Using spark cluster mode");
+              execType = ExecTypeProvider.fromString("spark");
+          } else if 
(System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("spark_local"))
 {
+              LOG.info("Using spark local cluster mode");
+              execType = ExecTypeProvider.fromString("spark_local");
           } else {
             LOG.info("Using default local mode");
           }


Reply via email to