Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Mon May 29 15:00:39 2017 @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats.spark; + +import java.util.List; +import java.util.Map; + +import org.apache.pig.tools.pigstats.*; +import scala.Option; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.Counters; +import org.apache.pig.PigWarning; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.newplan.PlanVisitor; +import org.apache.spark.executor.ShuffleReadMetrics; +import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.executor.TaskMetrics; + +import com.google.common.collect.Maps; + +public class SparkJobStats extends JobStats { + + private int jobId; + private Map<String, Long> stats = Maps.newLinkedHashMap(); + private boolean disableCounter; + private Counters counters = null; + public static String FS_COUNTER_GROUP = "FS_GROUP"; + private Map<String, SparkCounter<Map<String, Long>>> warningCounters = null; + + protected SparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf) { + this(String.valueOf(jobId), plan, conf); + this.jobId = jobId; + } + + protected SparkJobStats(String jobId, PigStats.JobGraph plan, Configuration conf) { + super(jobId, plan); + setConf(conf); + } + + public void setConf(Configuration conf) { + super.setConf(conf); + disableCounter = conf.getBoolean("pig.disable.counter", false); + initializeHadoopCounter(); + } + + public void addOutputInfo(POStore poStore, boolean success, + JobMetricsListener jobMetricsListener) { + if (!poStore.isTmpStore()) { + long bytes = getOutputSize(poStore, conf); + long recordsCount = -1; + if (disableCounter == false) { + recordsCount = SparkStatsUtil.getRecordCount(poStore); + } + OutputStats outputStats = new OutputStats(poStore.getSFile().getFileName(), + bytes, recordsCount, success); + outputStats.setPOStore(poStore); + outputStats.setConf(conf); + + outputs.add(outputStats); + } + } + + public void addInputStats(POLoad po, boolean success, + boolean singleInput) { + + long recordsCount = -1; + if (disableCounter == false) { + recordsCount = SparkStatsUtil.getRecordCount(po); + } + long bytesRead = -1; + if (singleInput && stats.get("BytesRead") != null) { + bytesRead = stats.get("BytesRead"); + } + InputStats inputStats = new InputStats(po.getLFile().getFileName(), + bytesRead, recordsCount, success); + inputStats.setConf(conf); + + inputs.add(inputStats); + } + + public void collectStats(JobMetricsListener jobMetricsListener) { + if (jobMetricsListener != null) { + Map<String, List<TaskMetrics>> taskMetrics = jobMetricsListener.getJobMetric(jobId); + if (taskMetrics == null) { + throw new RuntimeException("No task metrics available for jobId " + jobId); + } + stats = combineTaskMetrics(taskMetrics); + } + } + + public Map<String, Long> getStats() { + return stats; + } + + private Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) { + Map<String, Long> results = Maps.newLinkedHashMap(); + + long executorDeserializeTime = 0; + long executorRunTime = 0; + long resultSize = 0; + long jvmGCTime = 0; + long resultSerializationTime = 0; + long memoryBytesSpilled = 0; + long diskBytesSpilled = 0; + long bytesRead = 0; + long bytesWritten = 0; + long remoteBlocksFetched = 0; + long localBlocksFetched = 0; + long fetchWaitTime = 0; + long remoteBytesRead = 0; + long shuffleBytesWritten = 0; + long shuffleWriteTime = 0; + boolean inputMetricExist = false; + boolean outputMetricExist = false; + boolean shuffleReadMetricExist = false; + boolean shuffleWriteMetricExist = false; + + for (List<TaskMetrics> stageMetric : jobMetric.values()) { + if (stageMetric != null) { + for (TaskMetrics taskMetrics : stageMetric) { + if (taskMetrics != null) { + executorDeserializeTime += taskMetrics.executorDeserializeTime(); + executorRunTime += taskMetrics.executorRunTime(); + resultSize += taskMetrics.resultSize(); + jvmGCTime += taskMetrics.jvmGCTime(); + resultSerializationTime += taskMetrics.resultSerializationTime(); + memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); + diskBytesSpilled += taskMetrics.diskBytesSpilled(); + if (!taskMetrics.inputMetrics().isEmpty()) { + inputMetricExist = true; + bytesRead += taskMetrics.inputMetrics().get().bytesRead(); + } + + if (!taskMetrics.outputMetrics().isEmpty()) { + outputMetricExist = true; + bytesWritten += taskMetrics.outputMetrics().get().bytesWritten(); + } + + Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); + if (!shuffleReadMetricsOption.isEmpty()) { + shuffleReadMetricExist = true; + remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); + localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); + fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); + remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); + } + + Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); + if (!shuffleWriteMetricsOption.isEmpty()) { + shuffleWriteMetricExist = true; + shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); + shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); + } + + } + } + } + } + + results.put("EexcutorDeserializeTime", executorDeserializeTime); + results.put("ExecutorRunTime", executorRunTime); + results.put("ResultSize", resultSize); + results.put("JvmGCTime", jvmGCTime); + results.put("ResultSerializationTime", resultSerializationTime); + results.put("MemoryBytesSpilled", memoryBytesSpilled); + results.put("DiskBytesSpilled", diskBytesSpilled); + if (inputMetricExist) { + results.put("BytesRead", bytesRead); + hdfsBytesRead = bytesRead; + counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead); + } + + if (outputMetricExist) { + results.put("BytesWritten", bytesWritten); + hdfsBytesWritten = bytesWritten; + counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten); + } + + if (shuffleReadMetricExist) { + results.put("RemoteBlocksFetched", remoteBlocksFetched); + results.put("LocalBlocksFetched", localBlocksFetched); + results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); + results.put("FetchWaitTime", fetchWaitTime); + results.put("RemoteBytesRead", remoteBytesRead); + } + + if (shuffleWriteMetricExist) { + results.put("ShuffleBytesWritten", shuffleBytesWritten); + results.put("ShuffleWriteTime", shuffleWriteTime); + } + + return results; + } + + @Override + public String getJobId() { + return String.valueOf(jobId); + } + + @Override + public void accept(PlanVisitor v) throws FrontendException { + throw new UnsupportedOperationException(); + } + + @Override + public String getDisplayString() { + return null; + } + + @Override + public int getNumberMaps() { + throw new UnsupportedOperationException(); + } + + @Override + public int getNumberReduces() { + throw new UnsupportedOperationException(); + } + + @Override + public long getMaxMapTime() { + throw new UnsupportedOperationException(); + } + + @Override + public long getMinMapTime() { + throw new UnsupportedOperationException(); + } + + @Override + public long getAvgMapTime() { + throw new UnsupportedOperationException(); + } + + @Override + public long getMaxReduceTime() { + throw new UnsupportedOperationException(); + } + + @Override + public long getMinReduceTime() { + throw new UnsupportedOperationException(); + } + + @Override + public long getAvgREduceTime() { + throw new UnsupportedOperationException(); + } + + @Override + public long getMapInputRecords() { + throw new UnsupportedOperationException(); + } + + @Override + public long getMapOutputRecords() { + throw new UnsupportedOperationException(); + } + + @Override + public long getReduceInputRecords() { + throw new UnsupportedOperationException(); + } + + @Override + public long getReduceOutputRecords() { + throw new UnsupportedOperationException(); + } + + @Override + public long getSMMSpillCount() { + throw new UnsupportedOperationException(); + } + + @Override + public long getProactiveSpillCountObjects() { + throw new UnsupportedOperationException(); + } + + @Override + public long getProactiveSpillCountRecs() { + throw new UnsupportedOperationException(); + } + + @Override + public Counters getHadoopCounters() { + return counters; + } + + @Override + public Map<String, Long> getMultiStoreCounters() { + throw new UnsupportedOperationException(); + } + + @Override + public Map<String, Long> getMultiInputCounters() { + throw new UnsupportedOperationException(); + } + + public void setAlias(SparkOperator sparkOperator) { + SparkScriptState ss = (SparkScriptState) SparkScriptState.get(); + SparkScriptState.SparkScriptInfo sparkScriptInfo = ss.getScriptInfo(); + annotate(ALIAS, sparkScriptInfo.getAlias(sparkOperator)); + annotate(ALIAS_LOCATION, sparkScriptInfo.getAliasLocation(sparkOperator)); + annotate(FEATURE, sparkScriptInfo.getPigFeatures(sparkOperator)); + } + + private void initializeHadoopCounter() { + counters = new Counters(); + Counters.Group fsGrp = counters.addGroup(FS_COUNTER_GROUP, FS_COUNTER_GROUP); + fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_READ, PigStatsUtil.HDFS_BYTES_READ, 0); + fsGrp.addCounter(PigStatsUtil.HDFS_BYTES_WRITTEN, PigStatsUtil.HDFS_BYTES_WRITTEN, 0); + } + + + public Map<String, SparkCounter<Map<String, Long>>> getWarningCounters() { + return warningCounters; + } + + public void initWarningCounters() { + SparkCounters counters = SparkPigStatusReporter.getInstance().getCounters(); + SparkCounterGroup<Map<String, Long>> sparkCounterGroup = counters.getSparkCounterGroups().get( + PigWarning.class.getCanonicalName()); + if (sparkCounterGroup != null) { + this.warningCounters = sparkCounterGroup.getSparkCounters(); + } + } +}
Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Mon May 29 15:00:39 2017 @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.tools.pigstats.spark; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobClient; +import org.apache.pig.PigWarning; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.CompilationMessageCollector; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.tools.pigstats.InputStats; +import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.spark.api.java.JavaSparkContext; + +public class SparkPigStats extends PigStats { + + private Map<SparkJobStats,SparkOperator> jobSparkOperatorMap = new HashMap<SparkJobStats, SparkOperator>(); + private static final Log LOG = LogFactory.getLog(SparkPigStats.class); + + private Set<SparkOperator> sparkOperatorsSet = new HashSet<SparkOperator>(); + + private SparkScriptState sparkScriptState; + + private Configuration conf; + + public SparkPigStats() { + jobPlan = new JobGraph(); + this.sparkScriptState = (SparkScriptState) ScriptState.get(); + } + + public void initialize(PigContext pigContext, SparkOperPlan sparkPlan, Configuration conf) { + super.start(); + this.pigContext = pigContext; + this.conf = conf; + sparkScriptState.setScriptInfo(sparkPlan); + } + + public void addJobStats(POStore poStore, SparkOperator sparkOperator, int jobId, + JobMetricsListener jobMetricsListener, + JavaSparkContext sparkContext) { + boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext); + SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf); + jobStats.setSuccessful(isSuccess); + jobStats.collectStats(jobMetricsListener); + jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener); + addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf); + jobStats.initWarningCounters(); + jobSparkOperatorMap.put(jobStats, sparkOperator); + + jobPlan.add(jobStats); + } + + + public void addFailJobStats(POStore poStore, SparkOperator sparkOperator, String jobId, + JobMetricsListener jobMetricsListener, + JavaSparkContext sparkContext, + Exception e) { + boolean isSuccess = false; + SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf); + jobStats.setSuccessful(isSuccess); + jobStats.collectStats(jobMetricsListener); + jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener); + addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf); + jobSparkOperatorMap.put(jobStats, sparkOperator); + jobPlan.add(jobStats); + jobStats.setBackendException(e); + } + + public void addNativeJobStats(NativeSparkOperator sparkOperator, String jobId, boolean isSuccess, Exception e) { + SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf); + jobStats.setSuccessful(isSuccess); + jobSparkOperatorMap.put(jobStats, sparkOperator); + jobPlan.add(jobStats); + jobStats.setBackendException(e); + } + + public void finish() { + super.stop(); + display(); + } + + private void display() { + LOG.info(getDisplayString()); + handleAggregateWarnings(); + } + + private void handleAggregateWarnings() { + Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>(); + + Iterator<JobStats> iter = jobPlan.iterator(); + while (iter.hasNext()) { + SparkJobStats js = (SparkJobStats) iter.next(); + Map<String, SparkCounter<Map<String,Long>>> counterMap = js.getWarningCounters(); + if (counterMap == null) { + continue; + } + Map<String, Long> warningCounters = counterMap.get(PigWarning.SPARK_WARN.name()).getValue(); + if (warningCounters == null) { + continue; + } + for (String warnKey : warningCounters.keySet()) { + Long val = warningAggMap.get(warnKey); + if (val != null) { + val += (Long)warningCounters.get(warnKey); + } else { + val = (Long)warningCounters.get(warnKey); + } + warningAggMap.put(PigWarning.valueOf(warnKey), val); + } + } + CompilationMessageCollector.logAggregate(warningAggMap, CompilationMessageCollector.MessageType.Warning, LOG); + } + + @Override + public String getDisplayString() { + StringBuilder sb = new StringBuilder(); + Iterator<JobStats> iter = jobPlan.iterator(); + while (iter.hasNext()) { + SparkJobStats js = (SparkJobStats)iter.next(); + if (jobSparkOperatorMap.containsKey(js)) { + SparkOperator sparkOperator = jobSparkOperatorMap.get(js); + js.setAlias(sparkOperator); + } + sb.append("Spark Job [" + js.getJobId() + "] Metrics"); + Map<String, Long> stats = js.getStats(); + if (stats == null) { + sb.append("No statistics found for job " + js.getJobId()); + return sb.toString(); + } + + Iterator statIt = stats.entrySet().iterator(); + while (statIt.hasNext()) { + Map.Entry pairs = (Map.Entry)statIt.next(); + sb.append("\t" + pairs.getKey() + " : " + pairs.getValue()); + } + for (InputStats inputStat : js.getInputs()){ + sb.append("\t"+inputStat.getDisplayString()); + } + } + return sb.toString(); + } + + @Override + public JobClient getJobClient() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmbedded() { + return false; + } + + @Override + public Map<String, List<PigStats>> getAllStats() { + throw new UnsupportedOperationException(); + } + + @Override + public List<String> getAllErrorMessages() { + throw new UnsupportedOperationException(); + } + + @Override + public long getSMMSpillCount() { + throw new UnsupportedOperationException(); + } + + @Override + public long getProactiveSpillCountObjects() { + throw new UnsupportedOperationException(); + } + + @Override + public long getProactiveSpillCountRecords() { + throw new UnsupportedOperationException(); + } + + @Override + public int getNumberJobs() { + return jobPlan.size(); + } + + /** + * SparkPlan can have many SparkOperators. + * Each SparkOperator can have multiple POStores + * We currently collect stats once for every POStore, + * But do not want to collect input stats for every POStore + * + * e.g. After multiQuery optimization, the sparkOperator may look like this: + * POLoad_1 (PhysicalPlan) ...POStore_A + * \ / + * ...POSplit + * / \ + * POLoad_2 (PhysicalPlan) ...POStore_B + */ + private void addInputInfoForSparkOper(SparkOperator sparkOperator, + SparkJobStats jobStats, + boolean isSuccess, + JobMetricsListener jobMetricsListener, + Configuration conf) { + //to avoid repetition + if (sparkOperatorsSet.contains(sparkOperator)) { + return; + } + + try { + List<POLoad> poLoads = PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POLoad.class); + for (POLoad load : poLoads) { + if (!load.isTmpLoad()) { + jobStats.addInputStats(load, isSuccess, (poLoads.size() == 1)); + } + } + } catch (VisitorException ve) { + LOG.warn(ve); + } + + sparkOperatorsSet.add(sparkOperator); + } +} Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats.spark; + +import org.apache.pig.JVMReuseManager; +import org.apache.pig.StaticDataCleanup; + +/** + * Just like PigStatusReporter which will create/reset Hadoop counters, SparkPigStatusReporter will + * create/reset Spark counters. + * Note that, it is not suitable to make SparkCounters as a Singleton, it will be created/reset for + * a given pig script or a Dump/Store action in Grunt mode. + */ +public class SparkPigStatusReporter { + private static SparkPigStatusReporter reporter; + private SparkCounters counters; + + static { + JVMReuseManager.getInstance().registerForStaticDataCleanup(SparkPigStatusReporter.class); + } + + @StaticDataCleanup + public static void staticDataCleanup() { + reporter = null; + } + + private SparkPigStatusReporter() { + } + + public static SparkPigStatusReporter getInstance() { + if (reporter == null) { + reporter = new SparkPigStatusReporter(); + } + return reporter; + } + + public void createCounter(String groupName, String counterName) { + if (counters != null) { + counters.createCounter(groupName, counterName); + } + } + + public SparkCounters getCounters() { + return counters; + } + + public void setCounters(SparkCounters counters) { + this.counters = counters; + } +} Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java Mon May 29 15:00:39 2017 @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.tools.pigstats.spark; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.tools.pigstats.ScriptState; + +import com.google.common.collect.Maps; + +/** + * ScriptStates encapsulates settings for a Pig script that runs on a hadoop + * cluster. These settings are added to all Spark jobs spawned by the script and + * in turn are persisted in the hadoop job xml. With the properties already in + * the job xml, users who want to know the relations between the script and Spark + * jobs can derive them from the job xmls. + */ +public class SparkScriptState extends ScriptState { + public SparkScriptState(String id) { + super(id); + } + + private SparkScriptInfo scriptInfo = null; + + public void setScriptInfo(SparkOperPlan plan) { + this.scriptInfo = new SparkScriptInfo(plan); + } + + public SparkScriptInfo getScriptInfo() { + return scriptInfo; + } + + public static class SparkScriptInfo { + + private static final Log LOG = LogFactory.getLog(SparkScriptInfo.class); + private SparkOperPlan sparkPlan; + private String alias; + private String aliasLocation; + private String features; + + private Map<OperatorKey, String> featuresMap = Maps.newHashMap(); + private Map<OperatorKey, String> aliasMap = Maps.newHashMap(); + private Map<OperatorKey, String> aliasLocationMap = Maps.newHashMap(); + + public SparkScriptInfo(SparkOperPlan sparkPlan) { + this.sparkPlan = sparkPlan; + initialize(); + } + + private void initialize() { + try { + new DAGAliasVisitor(sparkPlan).visit(); + } catch (VisitorException e) { + LOG.warn("Cannot calculate alias information for DAG", e); + } + } + + public String getAlias(SparkOperator sparkOp) { + return aliasMap.get(sparkOp.getOperatorKey()); + } + + public String getAliasLocation(SparkOperator sparkOp) { + return aliasLocationMap.get(sparkOp.getOperatorKey()); + } + + public String getPigFeatures(SparkOperator sparkOp) { + return featuresMap.get(sparkOp.getOperatorKey()); + } + + class DAGAliasVisitor extends SparkOpPlanVisitor { + + private Set<String> aliases; + private Set<String> aliasLocations; + private BitSet featureSet; + + public DAGAliasVisitor(SparkOperPlan plan) { + super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan)); + this.aliases = new HashSet<String>(); + this.aliasLocations = new HashSet<String>(); + this.featureSet = new BitSet(); + } + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + + ArrayList<String> aliasList = new ArrayList<String>(); + String aliasLocationStr = ""; + try { + ArrayList<String> aliasLocationList = new ArrayList<String>(); + new AliasVisitor(sparkOp.physicalPlan, aliasList, aliasLocationList).visit(); + aliasLocationStr += LoadFunc.join(aliasLocationList, ","); + if (!aliasList.isEmpty()) { + Collections.sort(aliasList); + aliases.addAll(aliasList); + aliasLocations.addAll(aliasLocationList); + } + } catch (VisitorException e) { + LOG.warn("unable to get alias", e); + } + aliasMap.put(sparkOp.getOperatorKey(), LoadFunc.join(aliasList, ",")); + aliasLocationMap.put(sparkOp.getOperatorKey(), aliasLocationStr); + + + BitSet feature = new BitSet(); + feature.clear(); + if (sparkOp.isSampler()) { + feature.set(PIG_FEATURE.SAMPLER.ordinal()); + } + if (sparkOp.isIndexer()) { + feature.set(PIG_FEATURE.INDEXER.ordinal()); + } + if (sparkOp.isCogroup()) { + feature.set(PIG_FEATURE.COGROUP.ordinal()); + } + if (sparkOp.isGroupBy()) { + feature.set(PIG_FEATURE.GROUP_BY.ordinal()); + } + if (sparkOp.isRegularJoin()) { + feature.set(PIG_FEATURE.HASH_JOIN.ordinal()); + } + if (sparkOp.isUnion()) { + feature.set(PIG_FEATURE.UNION.ordinal()); + } + if (sparkOp.isNative()) { + feature.set(PIG_FEATURE.NATIVE.ordinal()); + } + if (sparkOp.isLimit() || sparkOp.isLimitAfterSort()) { + feature.set(PIG_FEATURE.LIMIT.ordinal()); + } + try { + new FeatureVisitor(sparkOp.physicalPlan, feature).visit(); + } catch (VisitorException e) { + LOG.warn("Feature visitor failed", e); + } + StringBuilder sb = new StringBuilder(); + for (int i = feature.nextSetBit(0); i >= 0; i = feature.nextSetBit(i + 1)) { + if (sb.length() > 0) sb.append(","); + sb.append(PIG_FEATURE.values()[i].name()); + } + featuresMap.put(sparkOp.getOperatorKey(), sb.toString()); + for (int i = 0; i < feature.length(); i++) { + if (feature.get(i)) { + featureSet.set(i); + } + } + } + + @Override + public void visit() throws VisitorException { + super.visit(); + if (!aliases.isEmpty()) { + ArrayList<String> aliasList = new ArrayList<String>(aliases); + ArrayList<String> aliasLocationList = new ArrayList<String>(aliasLocations); + Collections.sort(aliasList); + Collections.sort(aliasLocationList); + alias = LoadFunc.join(aliasList, ","); + aliasLocation = LoadFunc.join(aliasLocationList, ","); + } + StringBuilder sb = new StringBuilder(); + for (int i = featureSet.nextSetBit(0); i >= 0; i = featureSet.nextSetBit(i + 1)) { + if (sb.length() > 0) sb.append(","); + sb.append(PIG_FEATURE.values()[i].name()); + } + features = sb.toString(); + } + } + } +} Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Mon May 29 15:00:39 2017 @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats.spark; + +import java.util.List; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.spark.JobGraphBuilder; +import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.tools.pigstats.PigStatsUtil; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.spark.JobExecutionStatus; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.api.java.JavaSparkContext; + +public class SparkStatsUtil { + + public static final String SPARK_STORE_COUNTER_GROUP = PigStatsUtil.MULTI_STORE_COUNTER_GROUP; + public static final String SPARK_STORE_RECORD_COUNTER = PigStatsUtil.MULTI_STORE_RECORD_COUNTER; + public static final String SPARK_INPUT_COUNTER_GROUP = PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP; + public static final String SPARK_INPUT_RECORD_COUNTER = PigStatsUtil.MULTI_INPUTS_RECORD_COUNTER; + + public static void waitForJobAddStats(int jobID, + POStore poStore, SparkOperator sparkOperator, + JobMetricsListener jobMetricsListener, + JavaSparkContext sparkContext, + SparkPigStats sparkPigStats) + throws InterruptedException { + // Even though we are not making any async calls to spark, + // the SparkStatusTracker can still return RUNNING status + // for a finished job. + // Looks like there is a race condition between spark + // "event bus" thread updating it's internal listener and + // this driver thread calling SparkStatusTracker. + // To workaround this, we will wait for this job to "finish". + jobMetricsListener.waitForJobToEnd(jobID); + sparkPigStats.addJobStats(poStore, sparkOperator, jobID, jobMetricsListener, + sparkContext); + jobMetricsListener.cleanup(jobID); + } + + public static void addFailJobStats(String jobID, + POStore poStore, SparkOperator sparkOperator, + SparkPigStats sparkPigStats, + Exception e) { + JobMetricsListener jobMetricsListener = null; + JavaSparkContext sparkContext = null; + sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID, jobMetricsListener, + sparkContext, e); + } + + public static String getCounterName(POStore store) { + String shortName = PigStatsUtil.getShortName(store.getSFile().getFileName()); + + StringBuffer sb = new StringBuffer(SPARK_STORE_RECORD_COUNTER); + sb.append("_"); + sb.append(store.getIndex()); + sb.append("_"); + sb.append(store.getOperatorKey()); + sb.append("_"); + sb.append(shortName); + return sb.toString(); + } + + public static String getCounterName(POLoad load) { + String shortName = PigStatsUtil.getShortName(load.getLFile().getFileName()); + + StringBuffer sb = new StringBuffer(SPARK_INPUT_RECORD_COUNTER); + sb.append("_"); + sb.append(load.getOperatorKey()); + sb.append("_"); + sb.append(shortName); + return sb.toString(); + } + + public static long getRecordCount(POStore store) { + SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance(); + Object value = reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getCounterName(store)); + if (value == null) { + return 0L; + } else { + return (Long)value; + } + } + + public static long getRecordCount(POLoad load) { + SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance(); + int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan()); + Object value = reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getCounterName(load)); + if (value == null) { + return 0L; + } else { + return (Long)value/loadersCount; + } + } + + private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan pp){ + List<PhysicalOperator> successors = pp.getSuccessors(op); + if (successors == null || successors.size()==0) return 1; + for (PhysicalOperator successor : successors){ + if (successor instanceof POSplit){ + return ((POSplit)successor).getPlans().size(); + }else{ + return countCoLoadsIfInSplit(successor,pp); + } + } + return 1; + } + + public static boolean isJobSuccess(int jobID, + JavaSparkContext sparkContext) { + if (jobID == JobGraphBuilder.NULLPART_JOB_ID) { + return true; + } + JobExecutionStatus status = getJobInfo(jobID, sparkContext).status(); + if (status == JobExecutionStatus.SUCCEEDED) { + return true; + } else if (status != JobExecutionStatus.FAILED) { + throw new RuntimeException("Unexpected job execution status " + + status); + } + + return false; + } + + private static SparkJobInfo getJobInfo(int jobID, + JavaSparkContext sparkContext) { + SparkJobInfo jobInfo = sparkContext.statusTracker().getJobInfo(jobID); + if (jobInfo == null) { + throw new RuntimeException("No jobInfo available for jobID " + + jobID); + } + + return jobInfo; + } + + public static void addNativeJobStats(PigStats ps, NativeSparkOperator nativeSparkOperator) { + ((SparkPigStats) ps).addNativeJobStats(nativeSparkOperator, nativeSparkOperator.getJobId(), true, null); + } + + public static void addFailedNativeJobStats(PigStats ps, NativeSparkOperator nativeSparkOperator, Exception e) { + ((SparkPigStats) ps).addNativeJobStats(nativeSparkOperator, nativeSparkOperator.getJobId(), false, e); + } +} \ No newline at end of file Modified: pig/trunk/test/e2e/pig/build.xml URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/build.xml?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/build.xml (original) +++ pig/trunk/test/e2e/pig/build.xml Mon May 29 15:00:39 2017 @@ -353,6 +353,12 @@ </antcall> </target> + <target name="test-spark"> + <antcall target="test-base"> + <param name="harness.conf.file" value="${basedir}/conf/spark.conf"/> + </antcall> + </target> + <target name="deploy-base" depends="property-check, tar, init-test"> <exec executable="perl" dir="${test.location}" failonerror="true"> <env key="HARNESS_ROOT" value="."/> Added: pig/trunk/test/e2e/pig/conf/spark.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/conf/spark.conf?rev=1796639&view=auto ============================================================================== --- pig/trunk/test/e2e/pig/conf/spark.conf (added) +++ pig/trunk/test/e2e/pig/conf/spark.conf Mon May 29 15:00:39 2017 @@ -0,0 +1,75 @@ +############################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +my $me = `whoami`; +chomp $me; + +# The contents of this file can be rewritten to fit your installation. +# Also, you can define the following environment variables and set things up as in the test setup +# PH_ROOT Root directory where test harness is installed +# PH_LOCAL Root directory for input and output for local mode tests +# PH_OUT Root directory where output data will be stored (on local disk, not HDFS) +# PH_CLUSTER_BIN Conf directory for cluster being used +# HADOOP_CONF_DIR Binary executable for cluster being used +# PH_PIG Root directory for Pig version being used + +my $hdfsBase = $ENV{PH_HDFS_BASE} || "/user/pig"; + +$cfg = { + #HDFS + 'inpathbase' => "$hdfsBase/tests/data" + , 'outpathbase' => "$hdfsBase/out" + + #LOCAL + , 'localinpathbase' => "$ENV{PH_LOCAL}/in" + , 'localoutpathbase' => "$ENV{PH_LOCAL}/out/log" + , 'localxmlpathbase' => "$ENV{PH_LOCAL}/out/xml" + , 'localpathbase' => "$ENV{PH_LOCAL}/out/pigtest/$me" + , 'benchmarkcachepath'=> "$ENV{PH_BENCHMARK_CACHE_PATH}" + + #TEST + , 'benchmarkPath' => "$ENV{PH_OUT}/benchmarks" + , 'scriptPath' => "$ENV{PH_ROOT}/libexec" + , 'tmpPath' => '/tmp/pigtest' + + #PIG + , 'testconfigpath' => "$ENV{HADOOP_CONF_DIR}" + , 'funcjarPath' => "$ENV{PH_ROOT}/lib/java" + , 'paramPath' => "$ENV{PH_ROOT}/paramfiles" + , 'piggybankjarPath' => "$ENV{PH_PIGGYBANK_JAR}" + , 'pigpath' => "$ENV{PH_PIG}" + , 'oldpigpath' => "$ENV{PH_OLDPIG}" + , 'hcatbin' => "$ENV{HCAT_BIN}" + , 'usePython' => "$ENV{PIG_USE_PYTHON}" + , 'exectype' => 'spark' + , 'benchmark_exectype' => 'mapred' + + #HADOOP + , 'mapredjars' => "$ENV{PH_ROOT}/lib" + + #HIVE + , 'hivelibdir' => "$ENV{PH_HIVE_LIB_DIR}" + , 'hiveversion' => "$ENV{PH_HIVE_VERSION}" + , 'hiveshimsversion' => "$ENV{PH_HIVE_SHIMS_VERSION}" + + , 'userhomePath' => "$ENV{HOME}" + ,'local.bin' => '/usr/bin' + + ,'logDir' => "$ENV{PH_OUT}/log" + ,'propertiesFile' => "./conf/testpropertiesfile.conf" + ,'harness.console.level' => 'ERROR' + +}; Modified: pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm (original) +++ pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm Mon May 29 15:00:39 2017 @@ -270,7 +270,7 @@ sub runPigCmdLine my @cmd = @baseCmd; # Add option -l giving location for secondary logs - ##!!! Should that even be here? + ##!!! Should that even be here? my $locallog = $testCmd->{'localpath'} . $testCmd->{'group'} . "_" . $testCmd->{'num'} . ".log"; push(@cmd, "-logfile"); push(@cmd, $locallog); @@ -425,6 +425,11 @@ sub getPigCmd($$$) } TestDriver::dbg("Additional java parameters: [$additionalJavaParams].\n"); } + + # Several OutOfMemoryErrors - Perm space issues were seen during running E2E tests, here max Perm size is adjusted + if ($testCmd->{'exectype'} eq "spark") { + $additionalJavaParams = "-XX:MaxPermSize=512m"; + } push(@pigCmd, ("-x", $testCmd->{'exectype'})); @@ -598,7 +603,7 @@ sub postProcessSingleOutputFile if (defined $testCmd->{'floatpostprocess'} && defined $testCmd->{'delimiter'}) { $fppCmd .= " | perl $toolpath/floatpostprocessor.pl \"" . - $testCmd->{'delimiter'} . "\""; + $testCmd->{'delimiter'} . "\" " . $testCmd->{'decimals'}; } $fppCmd .= " > $localdir/out_original"; Modified: pig/trunk/test/e2e/pig/tests/bigdata.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/bigdata.conf?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/bigdata.conf (original) +++ pig/trunk/test/e2e/pig/tests/bigdata.conf Mon May 29 15:00:39 2017 @@ -24,7 +24,7 @@ $cfg = { 'driver' => 'Pig', - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'groups' => [ { Modified: pig/trunk/test/e2e/pig/tests/cmdline.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/cmdline.conf?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/cmdline.conf (original) +++ pig/trunk/test/e2e/pig/tests/cmdline.conf Mon May 29 15:00:39 2017 @@ -254,7 +254,7 @@ dump A;\, { 'name' => 'Warning', 'floatpostprocess' => 0, - 'execonly' => 'mapred,tez', # Warnings use counters, which don't work in local mode + 'execonly' => 'mapred,tez,spark', # Warnings use counters, which don't work in local mode 'delimiter' => ' ', 'tests' => [ Modified: pig/trunk/test/e2e/pig/tests/grunt.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/grunt.conf?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/grunt.conf (original) +++ pig/trunk/test/e2e/pig/tests/grunt.conf Mon May 29 15:00:39 2017 @@ -43,13 +43,13 @@ $cfg = { },{ 'num' => 2, 'pig' => "pwd", - 'execonly' => 'mapred,tez', # don't have a clue what their cwd will be for local mode + 'execonly' => 'mapred,tez,spark', # don't have a clue what their cwd will be for local mode 'expected_out_regex' => "/user", 'rc' => 0 },{ 'num' => 3, 'pig' => "ls .", - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'expected_out_regex' => "/user", 'rc' => 0 },{ Modified: pig/trunk/test/e2e/pig/tests/hcat.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/hcat.conf?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/hcat.conf (original) +++ pig/trunk/test/e2e/pig/tests/hcat.conf Mon May 29 15:00:39 2017 @@ -24,7 +24,7 @@ $cfg = { 'driver' => 'Pig', - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'groups' => [ { Modified: pig/trunk/test/e2e/pig/tests/multiquery.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/multiquery.conf?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/multiquery.conf (original) +++ pig/trunk/test/e2e/pig/tests/multiquery.conf Mon May 29 15:00:39 2017 @@ -529,7 +529,7 @@ $cfg = { # Streaming in demux { 'num' => 2, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -547,7 +547,7 @@ $cfg = { # Streaming in nested demux { 'num' => 3, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); A = load ':INPATH:/singlefile/studenttab10k'; Modified: pig/trunk/test/e2e/pig/tests/negative.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/negative.conf?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/negative.conf (original) +++ pig/trunk/test/e2e/pig/tests/negative.conf Mon May 29 15:00:39 2017 @@ -312,7 +312,7 @@ dump B;#, { # Define uses using non-existent command (autoship) 'num' => 1, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\ define CMD `perl PigStreamingNotThere.pl`; @@ -411,7 +411,7 @@ dump B;\, # Streaming application fails in the beginning of processing # NEED TO CHECK STDERR MANUALLY FOR NOW 'num' => 1, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ define CMD `perl PigStreamingBad.pl start` ship(':SCRIPTHOMEPATH:/PigStreamingBad.pl') stderr('CMD' limit 1); A = load ':INPATH:/singlefile/studenttab10k'; @@ -423,7 +423,7 @@ store B into ':OUTPATH:';\, # Streaming application fails in the middle of processing # NEED TO CHECK STDERR MANUALLY FOR NOW 'num' => 2, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ define CMD `perl PigStreamingBad.pl middle` ship(':SCRIPTHOMEPATH:/PigStreamingBad.pl') stderr('CMD' limit 1); A = load ':INPATH:/singlefile/studenttab10k'; @@ -436,7 +436,7 @@ store B into ':OUTPATH:';\, # bring logs to dfs # NEED TO CHECK STDERR MANUALLY FOR NOW 'num' => 3, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ define CMD `perl PigStreamingBad.pl end` ship(':SCRIPTHOMEPATH:/PigStreamingBad.pl') stderr('CMD' limit 1); A = load ':INPATH:/singlefile/studenttab10k'; @@ -449,7 +449,7 @@ store B into ':OUTPATH:';\, # bring logs to dfs # NEED TO CHECK STDERR MANUALLY FOR NOW 'num' => 4, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ define CMD `perl DieRandomly.pl 10000 2` ship(':SCRIPTHOMEPATH:/DieRandomly.pl') stderr('CMD' limit 1); A = load ':INPATH:/singlefile/studenttab10k'; @@ -480,7 +480,7 @@ store C into ':OUTPATH:.2';\, { # Invalid deserializer - throws exception 'num' => 1, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ register :FUNCPATH:/testudf.jar; define CMD `perl PigStreaming.pl` input(stdin) output(stdout using org.apache.pig.test.udf.streaming.DumpStreamerBad) ship(':SCRIPTHOMEPATH:/PigStreaming.pl'); @@ -492,7 +492,7 @@ store B into ':OUTPATH:';\, { # Invalid serializer - throws exception 'num' => 2, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ define CMD `perl PigStreamingDepend.pl` input(stdin using StringStoreBad) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; Modified: pig/trunk/test/e2e/pig/tests/nightly.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/nightly.conf (original) +++ pig/trunk/test/e2e/pig/tests/nightly.conf Mon May 29 15:00:39 2017 @@ -573,6 +573,7 @@ c = foreach b generate group, AVG(a.gpa) store c into ':OUTPATH:';\, 'floatpostprocess' => 1, 'delimiter' => ' ', + 'decimals' => 6, }, { 'num' => 10, @@ -2202,6 +2203,7 @@ store d into ':OUTPATH:';\, }, { 'num' => 4, + 'execonly' => 'mapred,local,tez', # Spark doesn't do implicit ordering in distinct 'pig' =>q\a = load ':INPATH:/singlefile/studentnulltab10k'; b = distinct a; c = limit b 100; @@ -2292,7 +2294,7 @@ store b into ':OUTPATH:';\, }, { 'num' => 12, - 'execonly' => 'tez', #Limit_5 was not able to test on tez. + 'execonly' => 'tez,spark', #Limit_5 was not able to test on tez. 'pig' =>q\a = load ':INPATH:/singlefile/studenttab10k'; b = load ':INPATH:/singlefile/studenttab10k'; a1 = foreach a generate $0, $1; @@ -2303,6 +2305,15 @@ store d into ':OUTPATH:';\, 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int); b = limit a 100; store b into ':OUTPATH:';\, + }, + { + 'num' => 13, + 'execonly' => 'spark', # Limit_4 failed on Spark: distinct doesn't do implicit sort like it does in MR + 'pig' =>q\a = load ':INPATH:/singlefile/studentnulltab10k'; +b = distinct a; +c = order b by $0, $1, $2; +d = limit c 100; +store d into ':OUTPATH:';\, } ] }, @@ -2948,7 +2959,7 @@ store d into ':OUTPATH:';\, # Merge-join with one file across multiple blocks { 'num' => 8, - 'execonly' => 'mapred,tez', # since this join will run out of memory in local mode + 'execonly' => 'mapred,tez,spark', # since this join will run out of memory in local mode 'floatpostprocess' => 1, 'delimiter' => ' ', 'pig' => q\a = load ':INPATH:/singlefile/votertab10k'; @@ -3616,7 +3627,7 @@ store b into ':OUTPATH:';\, 'tests' => [ { 'num' => 1, - 'execonly' => 'mapred,tez', # studenttab20m not available in local mode + 'execonly' => 'mapred,tez,spark', # studenttab20m not available in local mode 'pig' => q\ a = load ':INPATH:/singlefile/studenttab20m' using PigStorage() as (name, age, gpa); b = foreach a generate age; @@ -4361,7 +4372,7 @@ store b into ':OUTPATH:';\, { # test group 'num' => 1, - 'execonly' => 'mapred,tez', # since this join will run out of memory in local mode + 'execonly' => 'mapred,tez,spark', # since this join will run out of memory in local mode 'pig' => q\register :FUNCPATH:/testudf.jar; a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa); b = group a by age PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner2 parallel 2; @@ -4952,7 +4963,7 @@ store C into ':OUTPATH:';\, { 'num' => 1, 'java_params' => ['-Dopt.fetch=false'], - 'execonly' => 'mapred,tez', # since distributed cache is not supported in local mode + 'execonly' => 'mapred,tez,spark', # since distributed cache is not supported in local mode 'pig' => q? register :FUNCPATH:/testudf.jar; define udfdc org.apache.pig.test.udf.evalfunc.Udfcachetest(':INPATH:/singlefile/votertab10k#foodle'); @@ -5194,7 +5205,7 @@ store C into ':OUTPATH:';\, }, { # PIG-2576 'num' => 4, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q?register :FUNCPATH:/testudf.jar; define printconf org.apache.pig.test.udf.evalfunc.UdfContextFrontend('dummy'); a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); @@ -5249,7 +5260,7 @@ store C into ':OUTPATH:';\, ], },{ 'name' => 'Bloom', - 'execonly' => 'mapred,tez', # distributed cache does not work in local mode + 'execonly' => 'mapred,tez', # distributed cache does not work in local mode, bloom is not implemented for Spark(PIG-5117) 'tests' => [ { 'num' => 1, @@ -5694,7 +5705,7 @@ store a into ':OUTPATH:';\, 'tests' => [ { 'num' => 1, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ SET default_parallel 7; SET mapreduce.input.fileinputformat.split.maxsize '300'; @@ -5711,7 +5722,7 @@ store a into ':OUTPATH:';\, \, }, { 'num' =>2, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ SET default_parallel 9; SET mapreduce.input.fileinputformat.split.maxsize '300'; @@ -5728,7 +5739,7 @@ store a into ':OUTPATH:';\, \, }, { 'num' =>3, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ SET default_parallel 7; SET mapreduce.input.fileinputformat.split.maxsize '300'; @@ -5745,7 +5756,7 @@ store a into ':OUTPATH:';\, \, }, { 'num' => 4, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ SET default_parallel 5; SET mapreduce.input.fileinputformat.split.maxsize '300'; @@ -5763,7 +5774,7 @@ store a into ':OUTPATH:';\, \, }, { 'num' => 5, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ SET default_parallel 5; SET mapreduce.input.fileinputformat.split.maxsize '300'; @@ -5786,7 +5797,7 @@ store a into ':OUTPATH:';\, \, }, { 'num' => 6, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ SET mapreduce.input.fileinputformat.split.maxsize '300'; SET pig.splitCombination false; @@ -5810,7 +5821,7 @@ store a into ':OUTPATH:';\, 'tests' => [ { 'num' => 1, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ SET mapreduce.input.fileinputformat.split.maxsize '300'; SET pig.splitCombination false; @@ -5827,7 +5838,7 @@ store a into ':OUTPATH:';\, \, }, { 'num' => 2, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ SET mapreduce.input.fileinputformat.split.maxsize '300'; SET pig.splitCombination false; @@ -5844,7 +5855,7 @@ store a into ':OUTPATH:';\, \, }, { 'num' => 3, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ SET mapreduce.input.fileinputformat.split.maxsize '300'; SET pig.splitCombination false; @@ -5861,7 +5872,7 @@ store a into ':OUTPATH:';\, \, }, { 'num' => 4, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ SET mapreduce.input.fileinputformat.split.maxsize '300'; SET pig.splitCombination false; @@ -5881,7 +5892,7 @@ store a into ':OUTPATH:';\, \, }, { 'num' => 5, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q\ SET default_parallel 9; SET mapreduce.input.fileinputformat.split.maxsize '300'; @@ -6009,8 +6020,9 @@ store a into ':OUTPATH:';\, fs -rm :INPATH:/singlefile/names.txt# }, { - # Custom Hive UDF and MapredContext + # Custom Hive UDF and MapredContext - disabled for Spark: see PIG-5234 'num' => 7, + 'execonly' => 'mapred,tez', 'pig' => q\set mapred.max.split.size '100000000' register :FUNCPATH:/testudf.jar; define DummyContextUDF HiveUDF('org.apache.pig.test.udf.evalfunc.DummyContextUDF'); Modified: pig/trunk/test/e2e/pig/tests/orc.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/orc.conf?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/orc.conf (original) +++ pig/trunk/test/e2e/pig/tests/orc.conf Mon May 29 15:00:39 2017 @@ -46,6 +46,7 @@ store b into ':OUTPATH:';\, { 'num' => 2, 'notmq' => 1, + 'execonly' => 'mapred,tez', 'pig' => q\ a = load ':INPATH:/singlefile/studentcomplextab10k' as (nameagegpamap:map[], nameagegpatuple:tuple(tname:chararray, tage:int, tgpa:float), nameagegpabag:bag{t:tuple(bname:chararray, bage:int, bgpa:float)}); store a into ':OUTPATH:.intermediate' using OrcStorage(); @@ -113,6 +114,23 @@ store c into ':OUTPATH:';\, 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age); b = foreach a generate (name is null ? [] : TOMAP(name, age)); store b into ':OUTPATH:';\, + }, +# Test 6 : Running for Spark only as a replacement of Test 2: Spark and MR may produce different order of entries in +# Pig maps, which although is fine, triggers a false failure during comparison + { + 'num' => 6, + 'notmq' => 1, + 'execonly' => 'spark', + 'pig' => q\ +a = load ':INPATH:/singlefile/studentcomplextab10k' as (nameagegpamap:map[], nameagegpatuple:tuple(tname:chararray, tage:int, tgpa:float), nameagegpabag:bag{t:tuple(bname:chararray, bage:int, bgpa:float)}); +store a into ':OUTPATH:.intermediate' using OrcStorage(); +exec +b = load ':OUTPATH:.intermediate' using OrcStorage(); +c = foreach b generate nameagegpamap#'name', nameagegpamap#'age', nameagegpamap#'gpa', nameagegpatuple, nameagegpabag; +store c into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentcomplextab10k' as (nameagegpamap:map[], nameagegpatuple:tuple(tname:chararray, tage:int, tgpa:float), nameagegpabag:bag{t:tuple(bname:chararray, bage:int, bgpa:float)}); +b = foreach a generate nameagegpamap#'name', nameagegpamap#'age', nameagegpamap#'gpa', nameagegpatuple, nameagegpabag; +store b into ':OUTPATH:';\, } ] }, @@ -139,7 +157,7 @@ store b into ':OUTPATH:';\, { 'num' => 2, 'notmq' => 1, - 'execonly' => 'mapred,tez', # studenttab20m not available in local mode + 'execonly' => 'mapred,tez,spark', # studenttab20m not available in local mode 'pig' => q\ a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:float); b = order a by age desc parallel 4; Modified: pig/trunk/test/e2e/pig/tests/streaming.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/streaming.conf?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/streaming.conf (original) +++ pig/trunk/test/e2e/pig/tests/streaming.conf Mon May 29 15:00:39 2017 @@ -79,7 +79,7 @@ store C into ':OUTPATH:';#, { #Section 1.1: perl script, no parameters, autoship(Section 2.1) 'num' => 4, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0, $1, $2; @@ -90,7 +90,7 @@ store C into ':OUTPATH:';#, { # Section 1.2: perl script that takes parameters; explicit ship of script (Section 2.1) 'num' => 5, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreaming.pl - -` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') stderr('CMD' limit 1); A = load ':INPATH:/singlefile/studenttab10k'; @@ -102,7 +102,7 @@ store C into ':OUTPATH:';#, { # Section 1.3: define clause; explicit ship of script (Section 2.1) 'num' => 6, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') stderr('CMD'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -115,7 +115,7 @@ store D into ':OUTPATH:';#, { # Section 1.4: grouped data 'num' => 7, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -128,7 +128,7 @@ store D into ':OUTPATH:';#, { # Section 1.4: grouped and ordered data 'num' => 8, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl GroupBy.pl '\t' 0 1` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -144,7 +144,7 @@ store E into ':OUTPATH:';#, { # Section 1.5: multiple streaming operators - adjacent - map side 'num' => 9, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -157,7 +157,7 @@ store D into ':OUTPATH:';#, { # Section 1.5: multiple streaming operators - not adjacent - map side 'num' => 10, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; define CMD `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); @@ -172,7 +172,7 @@ store E into ':OUTPATH:';#, { # Section 1.5: multiple streaming operators - adjacent - reduce side 'num' => 11, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD1 `perl GroupBy.pl '\t' 0 1` ship(':SCRIPTHOMEPATH:/GroupBy.pl') stderr('CMD1'); define CMD2 `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm') stderr('CMD2'); @@ -191,7 +191,7 @@ store F into ':OUTPATH:';#, # Section 1.5: multiple streaming operators - one on map and one on reduce side # same alias name 'num' => 12, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD1 `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); define CMD2 `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); @@ -206,7 +206,7 @@ store B into ':OUTPATH:';#, { # Section 1.5: multiple streaming operators - adjacent - map side 'num' => 13, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -232,7 +232,7 @@ store D into ':OUTPATH:';#, # Section 2.1: perl script and its dependency shipped # Also covers part of section 3.1: custom serializer 'num' => 1, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -243,7 +243,7 @@ store B into ':OUTPATH:';#, { # Section 2.1: perl script and supported data file is shipped 'num' => 2, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreaming.pl - - nameMap` ship(':SCRIPTHOMEPATH:/PigStreaming.pl', ':SCRIPTHOMEPATH:/nameMap'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -257,7 +257,7 @@ store E into ':OUTPATH:';#, { # Section 2.2: script is shipped while the supporting file is cached 'num' => 3, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q@ define CMD `perl PigStreaming.pl - - nameMap` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') cache(':INPATH:/nameMap/part-00000#nameMap'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -283,7 +283,7 @@ store E into ':OUTPATH:';@, { # Section 3.1: use of custom deserializer 'num' => 1, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreaming.pl` output(stdout) ship(':SCRIPTHOMEPATH:/PigStreaming.pl'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -294,7 +294,7 @@ store B into ':OUTPATH:';#, { # Section 3.1: use of custom serializer and deserializer 'num' => 2, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# register :FUNCPATH:/testudf.jar; define CMD `perl PigStreaming.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output(stdout using org.apache.pig.test.udf.streaming.DumpStreamer) ship(':SCRIPTHOMEPATH:/PigStreaming.pl'); @@ -307,7 +307,7 @@ store C into ':OUTPATH:';#, { # Section 3.3: streaming application reads from file rather than stdin 'num' => 3, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreaming.pl foo -` input('foo') ship(':SCRIPTHOMEPATH:/PigStreaming.pl'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -318,7 +318,7 @@ store B into ':OUTPATH:';#, { # Section 3.4: streaming application writes single output to a file 'num' => 4, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreaming.pl - foo nameMap` output('foo') ship(':SCRIPTHOMEPATH:/PigStreaming.pl', ':SCRIPTHOMEPATH:/nameMap'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -330,7 +330,7 @@ store C into ':OUTPATH:';#, { # Section 3.4: streaming application writes multiple outputs to file 'num' => 5, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreamingDepend.pl - sio_5_1 sio_5_2` input(stdin) output('sio_5_1', 'sio_5_2') ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -341,7 +341,7 @@ store B into ':OUTPATH:';#, { # Section 3.4: streaming application writes multiple outputs: 1 to file and 1 to stdout 'num' => 6, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreamingDepend.pl - - sio_5_2` input(stdin) output(stdout, 'sio_5_2') ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -362,7 +362,7 @@ store B into ':OUTPATH:';#, { # Section 4.3: integration with parameter substitition 'num' => 1, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig_params' => ['-p', qq(script_name='PigStreaming.pl')], 'pig' => q# define CMD `perl $script_name - - nameMap` ship(':SCRIPTHOMEPATH:/$script_name', ':SCRIPTHOMEPATH:/nameMap'); @@ -387,7 +387,7 @@ store E into ':OUTPATH:';#, { # Section 5.1: load/store optimization 'num' => 1, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') stderr('CMD'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -422,7 +422,7 @@ store D into ':OUTPATH:';#, { # PIG-272: problem with optimization and intermediate store 'num' => 3, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD1 `perl -ne 'print $_;print STDERR "stderr $_";'`; define CMD2 `Split.pl 3` input(stdin using PigStreaming(',')) ship(':SCRIPTHOMEPATH:/Split.pl'); @@ -444,7 +444,7 @@ store D into ':OUTPATH:';#, { # PIG-272: problem with optimization and intermediate store 'num' => 4, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD1 `perl -ne 'print $_;'`; define CMD2 `Split.pl 3` input(stdin using PigStreaming(',')) ship(':SCRIPTHOMEPATH:/Split.pl'); @@ -472,7 +472,7 @@ store E into ':OUTPATH:';#, # Make sure join with stream optimization works # optimization only on load side 'num' => 5, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through `cat` as (name:chararray, age:int, gpa:double); @@ -485,7 +485,7 @@ store D into ':OUTPATH:';#, # Make sure join with stream optimization works # optimization only on store side 'num' => 6, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = filter A by $1 > 25; @@ -500,7 +500,7 @@ store D into ':OUTPATH:';#, # Make sure join with stream optimization works # optimization on load and store 'num' => 7, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through `cat` as (name:chararray, age:int, gpa:double); @@ -536,7 +536,7 @@ store B into ':OUTPATH:';#, # case where binary finishes normally # BEFORE all input has been passed to it 'num' => 2, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl DieRandomly.pl 10000 0` ship(':SCRIPTHOMEPATH:/DieRandomly.pl'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -565,7 +565,7 @@ store D into ':OUTPATH:';#, # BEFORE all input has been passed to it # FIXME: in local mode 'num' => 4, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); B = stream A through `head -1` as (name, age, gpa); @@ -581,7 +581,7 @@ store E into ':OUTPATH:';#, # BEFORE all input has been passed to it # and emits no output 'num' => 5, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl DieRandomly.pl 10000 0` ship(':SCRIPTHOMEPATH:/DieRandomly.pl'); A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); @@ -598,7 +598,7 @@ store D into ':OUTPATH:';#, # BEFORE all input has been passed to it # and emits no output 'num' => 6, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl DieRandomly.pl 10000 0` ship(':SCRIPTHOMEPATH:/DieRandomly.pl'); A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); @@ -613,7 +613,7 @@ store E into ':OUTPATH:';#, # two stream operators one after another where first # one emits no output 'num' => 7, - 'execonly' => 'mapred,tez', + 'execonly' => 'mapred,tez,spark', 'pig' => q# define CMD `perl DieRandomly.pl 10000 0` ship(':SCRIPTHOMEPATH:/DieRandomly.pl'); A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); Modified: pig/trunk/test/e2e/pig/tests/turing_jython.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/turing_jython.conf?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/turing_jython.conf (original) +++ pig/trunk/test/e2e/pig/tests/turing_jython.conf Mon May 29 15:00:39 2017 @@ -452,9 +452,9 @@ result = P.bind({'in1':input1, 'in2':inp ,'rc'=> 0 }, { - # illustrate() on a complex query + # illustrate() on a complex query 'num' => 2 - ,'execonly' => 'mapred,local' #TODO: PIG-3993: Illustrate is yet to be implemented in Tez + ,'execonly' => 'mapred,local' #TODO: PIG-3993,PIG-5204: Illustrate is yet to be implemented in Tez and in Spark ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig Modified: pig/trunk/test/e2e/pig/tools/test/floatpostprocessor.pl URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tools/test/floatpostprocessor.pl?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tools/test/floatpostprocessor.pl (original) +++ pig/trunk/test/e2e/pig/tools/test/floatpostprocessor.pl Mon May 29 15:00:39 2017 @@ -26,6 +26,7 @@ use strict; our @floats; our $delim; +our $decimals; sub parseLine($) { @@ -41,7 +42,7 @@ sub postprocess($) for (my $i = 0; $i < @fields; $i++) { if ($i != 0) { print($delim); } if ($floats[$i]) { - printf("%.3f", $fields[$i]); + printf("%." . $decimals . "f", $fields[$i]); } else { print($fields[$i]); } @@ -72,6 +73,10 @@ sub is_float { if (!defined($delim)) { die "Usage: $0 delimiter\n"; } + $decimals = shift; + if (!defined($decimals)) { + $decimals = 3; + } my @sampled; my $line; Modified: pig/trunk/test/excluded-tests-mr URL: http://svn.apache.org/viewvc/pig/trunk/test/excluded-tests-mr?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/excluded-tests-mr (original) +++ pig/trunk/test/excluded-tests-mr Mon May 29 15:00:39 2017 @@ -1 +1,2 @@ -**/tez/*.java \ No newline at end of file +**/tez/*.java ++**/spark/*.java \ No newline at end of file Added: pig/trunk/test/excluded-tests-spark URL: http://svn.apache.org/viewvc/pig/trunk/test/excluded-tests-spark?rev=1796639&view=auto ============================================================================== --- pig/trunk/test/excluded-tests-spark (added) +++ pig/trunk/test/excluded-tests-spark Mon May 29 15:00:39 2017 @@ -0,0 +1,4 @@ +**/Test*MR.java +**/tez/*.java +**/TestNativeMapReduce.java +**/TestCounters.java Modified: pig/trunk/test/excluded-tests-tez URL: http://svn.apache.org/viewvc/pig/trunk/test/excluded-tests-tez?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/excluded-tests-tez (original) +++ pig/trunk/test/excluded-tests-tez Mon May 29 15:00:39 2017 @@ -1 +1,2 @@ -**/Test*MR.java \ No newline at end of file +**/Test*MR.java ++**/spark/*.java \ No newline at end of file Modified: pig/trunk/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java (original) +++ pig/trunk/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java Mon May 29 15:00:39 2017 @@ -62,6 +62,9 @@ public class TestLocationInPhysicalPlan JobStats jStats = (JobStats)job.getStatistics().getJobGraph().getSinks().get(0); if (Util.getLocalTestMode().toString().equals("TEZ_LOCAL")) { Assert.assertEquals("A[1,4],A[3,4],B[2,4]", jStats.getAliasLocation()); + } else if (Util.getLocalTestMode().toString().equals("SPARK_LOCAL")) { + //TODO PIG-5239:Investigate why there are duplicated A[3,4] + Assert.assertEquals("A[1,4],A[3,4],B[2,4],A[3,4]", jStats.getAliasLocation()); } else { Assert.assertEquals("M: A[1,4],A[3,4],B[2,4] C: A[3,4],B[2,4] R: A[3,4]", jStats.getAliasLocation()); } Modified: pig/trunk/test/org/apache/pig/pigunit/PigTest.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/pigunit/PigTest.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/pigunit/PigTest.java (original) +++ pig/trunk/test/org/apache/pig/pigunit/PigTest.java Mon May 29 15:00:39 2017 @@ -142,6 +142,12 @@ public class PigTest { } else if (System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("tez_local")) { LOG.info("Using tez local mode"); execType = ExecTypeProvider.fromString("tez_local"); + } else if (System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("spark")) { + LOG.info("Using spark cluster mode"); + execType = ExecTypeProvider.fromString("spark"); + } else if (System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("spark_local")) { + LOG.info("Using spark local cluster mode"); + execType = ExecTypeProvider.fromString("spark_local"); } else { LOG.info("Using default local mode"); }