Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.pig.tools.pigstats.spark.SparkCounters; +import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter; +import org.apache.pig.tools.pigstats.spark.SparkStatsUtil; +import scala.Tuple2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.pig.PigConfiguration; +import org.apache.pig.StoreFuncInterface; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.rdd.PairRDDFunctions; +import org.apache.spark.rdd.RDD; + +import com.google.common.collect.Lists; + +/** + * Converter that takes a POStore and stores it's content. + */ +@SuppressWarnings({ "serial" }) +public class StoreConverter implements + RDDConverter<Tuple, Tuple2<Text, Tuple>, POStore> { + + private static final Log LOG = LogFactory.getLog(StoreConverter.class); + + private JobConf jobConf = null; + public StoreConverter(JobConf jobConf) { + this.jobConf = jobConf; + } + + @Override + public RDD<Tuple2<Text, Tuple>> convert(List<RDD<Tuple>> predecessors, + POStore op) throws IOException { + SparkUtil.assertPredecessorSize(predecessors, op, 1); + RDD<Tuple> rdd = predecessors.get(0); + + SparkPigStatusReporter.getInstance().createCounter(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP, + SparkStatsUtil.getCounterName(op)); + + // convert back to KV pairs + JavaRDD<Tuple2<Text, Tuple>> rddPairs = rdd.toJavaRDD().map( + buildFromTupleFunction(op)); + + PairRDDFunctions<Text, Tuple> pairRDDFunctions = new PairRDDFunctions<Text, Tuple>( + rddPairs.rdd(), SparkUtil.getManifest(Text.class), + SparkUtil.getManifest(Tuple.class), null); + + POStore poStore = configureStorer(jobConf, op); + + if ("true".equalsIgnoreCase(jobConf + .get(PigConfiguration.PIG_OUTPUT_LAZY))) { + Job storeJob = new Job(jobConf); + LazyOutputFormat.setOutputFormatClass(storeJob, + PigOutputFormat.class); + jobConf = (JobConf) storeJob.getConfiguration(); + jobConf.setOutputKeyClass(Text.class); + jobConf.setOutputValueClass(Tuple.class); + String fileName = poStore.getSFile().getFileName(); + Path filePath = new Path(fileName); + FileOutputFormat.setOutputPath(jobConf,filePath); + pairRDDFunctions.saveAsNewAPIHadoopDataset(jobConf); + } else { + pairRDDFunctions.saveAsNewAPIHadoopFile(poStore.getSFile() + .getFileName(), Text.class, Tuple.class, + PigOutputFormat.class, jobConf); + } + + RDD<Tuple2<Text, Tuple>> retRdd = rddPairs.rdd(); + if (LOG.isDebugEnabled()) + LOG.debug("RDD lineage: " + retRdd.toDebugString()); + return retRdd; + } + + + private static POStore configureStorer(JobConf jobConf, + PhysicalOperator op) throws IOException { + ArrayList<POStore> storeLocations = Lists.newArrayList(); + POStore poStore = (POStore) op; + storeLocations.add(poStore); + StoreFuncInterface sFunc = poStore.getStoreFunc(); + sFunc.setStoreLocation(poStore.getSFile().getFileName(), + new org.apache.hadoop.mapreduce.Job(jobConf)); + poStore.setInputs(null); + poStore.setParentPlan(null); + + jobConf.set(JobControlCompiler.PIG_MAP_STORES, + ObjectSerializer.serialize(Lists.newArrayList())); + jobConf.set(JobControlCompiler.PIG_REDUCE_STORES, + ObjectSerializer.serialize(storeLocations)); + return poStore; + } + + private static class FromTupleFunction implements + Function<Tuple, Tuple2<Text, Tuple>> { + + private static Text EMPTY_TEXT = new Text(); + private String counterGroupName; + private String counterName; + private SparkCounters sparkCounters; + private boolean disableCounter; + + + public Tuple2<Text, Tuple> call(Tuple v1) { + if (sparkCounters != null && disableCounter == false) { + sparkCounters.increment(counterGroupName, counterName, 1L); + } + return new Tuple2<Text, Tuple>(EMPTY_TEXT, v1); + } + + public void setCounterGroupName(String counterGroupName) { + this.counterGroupName = counterGroupName; + } + + public void setCounterName(String counterName) { + this.counterName = counterName; + } + + public void setSparkCounters(SparkCounters sparkCounter) { + this.sparkCounters = sparkCounter; + } + + public void setDisableCounter(boolean disableCounter) { + this.disableCounter = disableCounter; + } + } + + private FromTupleFunction buildFromTupleFunction(POStore op) { + FromTupleFunction ftf = new FromTupleFunction(); + boolean disableCounter = op.disableCounter(); + if (!op.isTmpStore() && !disableCounter) { + ftf.setDisableCounter(disableCounter); + ftf.setCounterGroupName(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP); + ftf.setCounterName(SparkStatsUtil.getCounterName(op)); + SparkPigStatusReporter counterReporter = SparkPigStatusReporter.getInstance(); + ftf.setSparkCounters(counterReporter.getCounters()); + } + return ftf; + } +}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.rdd.RDD; + +public class StreamConverter implements + RDDConverter<Tuple, Tuple, POStream> { + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POStream poStream) throws IOException { + SparkUtil.assertPredecessorSize(predecessors, poStream, 1); + RDD<Tuple> rdd = predecessors.get(0); + StreamFunction streamFunction = new StreamFunction(poStream); + return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd(); + } + + private static class StreamFunction implements + FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { + private POStream poStream; + + private StreamFunction(POStream poStream) { + this.poStream = poStream; + } + + public Iterable<Tuple> call(final Iterator<Tuple> input) { + return new Iterable<Tuple>() { + @Override + public Iterator<Tuple> iterator() { + return new OutputConsumerIterator(input) { + + @Override + protected void attach(Tuple tuple) { + poStream.setInputs(null); + poStream.attachInput(tuple); + } + + @Override + protected Result getNextResult() throws ExecException { + Result result = poStream.getNextTuple(); + return result; + } + + @Override + protected void endOfInput() { + poStream.setFetchable(true); + } + }; + } + }; + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.util.List; + +import scala.collection.JavaConversions; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.SparkContext; +import org.apache.spark.rdd.RDD; +import org.apache.spark.rdd.UnionRDD; + +public class UnionConverter implements RDDConverter<Tuple, Tuple, POUnion> { + + private final SparkContext sc; + + public UnionConverter(SparkContext sc) { + this.sc = sc; + } + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POUnion physicalOperator) throws IOException { + SparkUtil.assertPredecessorSizeGreaterThan(predecessors, + physicalOperator, 0); + UnionRDD<Tuple> unionRDD = new UnionRDD<Tuple>(sc, + JavaConversions.asScalaBuffer(predecessors), + SparkUtil.getManifest(Tuple.class)); + return unionRDD; + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java Mon May 29 15:00:39 2017 @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.operator; + +import org.apache.hadoop.util.RunJar; +import org.apache.pig.backend.hadoop.executionengine.JobCreationException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.RunJarSecurityManager; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.spark.SparkStatsUtil; + +/** + * NativeSparkOperator: + */ +public class NativeSparkOperator extends SparkOperator { + private static final long serialVersionUID = 1L; + private static int countJobs = 0; + private String nativeSparkJar; + private String[] params; + private String jobId; + + public NativeSparkOperator(OperatorKey k, String sparkJar, String[] parameters) { + super(k); + nativeSparkJar = sparkJar; + params = parameters; + jobId = sparkJar + "_" + getJobNumber(); + } + + private static int getJobNumber() { + countJobs++; + return countJobs; + } + + public String getJobId() { + return jobId; + } + + public void runJob() throws JobCreationException { + RunJarSecurityManager secMan = new RunJarSecurityManager(); + try { + RunJar.main(getNativeMRParams()); + SparkStatsUtil.addNativeJobStats(PigStats.get(), this); + } catch (SecurityException se) { //java.lang.reflect.InvocationTargetException + if (secMan.getExitInvoked()) { + if (secMan.getExitCode() != 0) { + JobCreationException e = new JobCreationException("Native job returned with non-zero return code"); + SparkStatsUtil.addFailedNativeJobStats(PigStats.get(), this, e); + } else { + SparkStatsUtil.addNativeJobStats(PigStats.get(), this); + } + } + } catch (Throwable t) { + JobCreationException e = new JobCreationException( + "Cannot run native spark job " + t.getMessage(), t); + SparkStatsUtil.addFailedNativeJobStats(PigStats.get(), this, e); + throw e; + } finally { + secMan.retire(); + } + } + + private String[] getNativeMRParams() { + String[] paramArr = new String[params.length + 1]; + paramArr[0] = nativeSparkJar; + for (int i = 0; i < params.length; i++) { + paramArr[i + 1] = params[i]; + } + return paramArr; + } + + public String getCommandString() { + StringBuilder sb = new StringBuilder("hadoop jar "); + sb.append(nativeSparkJar); + for (String pr : params) { + sb.append(" "); + sb.append(pr); + } + return sb.toString(); + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.operator; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; + +/** + * POGlobalRearrange for spark mode + */ +public class POGlobalRearrangeSpark extends POGlobalRearrange { + // Use secondary key + private boolean useSecondaryKey; + // Sort order for secondary keys; + private boolean[] secondarySortOrder; + + public POGlobalRearrangeSpark(POGlobalRearrange copy) + throws ExecException { + super(copy); + } + + public boolean isUseSecondaryKey() { + return useSecondaryKey; + } + + public void setUseSecondaryKey(boolean useSecondaryKey) { + this.useSecondaryKey = useSecondaryKey; + } + + public boolean[] getSecondarySortOrder() { + return secondarySortOrder; + } + + public void setSecondarySortOrder(boolean[] secondarySortOrder) { + this.secondarySortOrder = secondarySortOrder; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.operator; + +import java.util.List; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.plan.VisitorException; + +/** + * Collapse POLocalRearrange,POGlobalRearrange and POPackage to POJoinGroupSpark to reduce unnecessary map operations in the join/group + */ +public class POJoinGroupSpark extends PhysicalOperator { + private List<POLocalRearrange> lraOps; + private POGlobalRearrangeSpark glaOp; + private POPackage pkgOp; + private List<PhysicalOperator> predecessors; + + public POJoinGroupSpark(List<POLocalRearrange> lraOps, POGlobalRearrangeSpark glaOp, POPackage pkgOp){ + super(glaOp.getOperatorKey()); + this.lraOps = lraOps; + this.glaOp = glaOp; + this.pkgOp = pkgOp; + } + + public List<POLocalRearrange> getLROps() { + return lraOps; + } + + public POGlobalRearrangeSpark getGROp() { + return glaOp; + } + + public POPackage getPkgOp() { + return pkgOp; + } + + @Override + public void visit(PhyPlanVisitor v) throws VisitorException { + + } + + @Override + public boolean supportsMultipleInputs() { + return true; + } + + @Override + public boolean supportsMultipleOutputs() { + return false; + } + + @Override + public String name() { + return getAliasString() + "POJoinGroupSpark"+ "[" + + DataType.findTypeName(resultType) + "]" + " - " + + mKey.toString(); + } + + @Override + public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { + return null; + } + + public void setPredecessors(List<PhysicalOperator> predecessors) { + this.predecessors = predecessors; + } + + public List<PhysicalOperator> getPredecessors() { + return predecessors; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.operator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.builtin.PoissonSampleLoader; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; + +public class POPoissonSampleSpark extends POPoissonSample { + private static final Log LOG = LogFactory.getLog(POPoissonSampleSpark.class); + // Only for Spark + private transient boolean endOfInput = false; + + public boolean isEndOfInput() { + return endOfInput; + } + + public void setEndOfInput(boolean isEndOfInput) { + endOfInput = isEndOfInput; + } + + public POPoissonSampleSpark(OperatorKey k, int rp, int sr, float hp, long tm) { + super(k, rp, sr, hp, tm); + } + + @Override + public Result getNextTuple() throws ExecException { + if (!initialized) { + numRowsSampled = 0; + avgTupleMemSz = 0; + rowNum = 0; + skipInterval = -1; + if (totalMemory == null) { + // Initialize in backend to get memory of task + totalMemory = Runtime.getRuntime().maxMemory(); + } + long availRedMem = (long) (totalMemory * heapPerc); + memToSkipPerSample = availRedMem/sampleRate; + initialized = true; + } + if (numRowSplTupleReturned) { + // row num special row has been returned after all inputs + // were read, nothing more to read + return RESULT_EOP; + } + + Result res; + res = processInput(); + + // if reaches at the end, pick a record and return + if (this.isEndOfInput()) { + // if skip enough, and the last record is OK. + if ( numSkipped == skipInterval + && res.returnStatus == POStatus.STATUS_OK) { + return createNumRowTuple((Tuple) res.result); + } else if (newSample != null) { + return createNumRowTuple((Tuple) newSample.result); + } + } + + // just return to read next record from input + if (res.returnStatus == POStatus.STATUS_NULL) { + return new Result(POStatus.STATUS_NULL, null); + } else if (res.returnStatus == POStatus.STATUS_EOP + || res.returnStatus == POStatus.STATUS_ERR) { + return res; + } + + // got a 'OK' record + rowNum++; + + if (numSkipped < skipInterval) { + numSkipped++; + + // skip this tuple, and continue to read from input + return new Result(POStatus.STATUS_EOP, null); + } + + // pick this record as sampled + newSample = res; + numSkipped = 0; + Result pickedSample = newSample; + updateSkipInterval((Tuple) pickedSample.result); + + if( LOG.isDebugEnabled()) { + LOG.debug("pickedSample:"); + if (pickedSample.result != null) { + for (int i = 0; i < ((Tuple) pickedSample.result).size(); i++) { + LOG.debug("the " + i + " ele:" + ((Tuple) pickedSample.result).get(i)); + } + } + } + return pickedSample; + } + + @Override + public String name() { + return getAliasString() + "PoissonSampleSpark - " + mKey.toString(); + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.operator; + +import java.util.List; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.data.DataType; +import org.apache.pig.impl.plan.OperatorKey; + +/** + * ReduceBy operator that maps to Sparks ReduceBy. + * Extends ForEach and adds packager, secondary sort and partitioner support. + */ +public class POReduceBySpark extends POForEach { + private String customPartitioner; + protected POLocalRearrange lr; + protected POPackage pkg; + + public POReduceBySpark(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean> isToBeFlattened, POPackage + pkg, POLocalRearrange lr){ + super(k, rp, inp, isToBeFlattened); + this.pkg = pkg; + this.lr = lr; + this.addOriginalLocation(lr.getAlias(), lr.getOriginalLocations()); + } + + public POPackage getPKGOp() { + return pkg; + } + + @Override + public String name() { + return getAliasString() + "Reduce By" + "(" + getFlatStr() + ")" + "[" + + DataType.findTypeName(resultType) + "]" + " - " + + mKey.toString(); + } + + protected String getFlatStr() { + if(isToBeFlattenedArray ==null) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (Boolean b : isToBeFlattenedArray) { + sb.append(b); + sb.append(','); + } + if(sb.length()>0){ + sb.deleteCharAt(sb.length()-1); + } + return sb.toString(); + } + + // Use secondary key + private boolean useSecondaryKey; + // Sort order for secondary keys; + private boolean[] secondarySortOrder; + + public boolean isUseSecondaryKey() { + return useSecondaryKey; + } + + public void setUseSecondaryKey(boolean useSecondaryKey) { + this.useSecondaryKey = useSecondaryKey; + } + + public boolean[] getSecondarySortOrder() { + return secondarySortOrder; + } + + public void setSecondarySortOrder(boolean[] secondarySortOrder) { + this.secondarySortOrder = secondarySortOrder; + } + + public String getCustomPartitioner() { + return customPartitioner; + } + + public void setCustomPartitioner(String customPartitioner) { + this.customPartitioner = customPartitioner; + } + + public POLocalRearrange getLROp() { + return lr; + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.operator; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.plan.VisitorException; + +public class POSampleSortSpark extends POSort { + + public POSampleSortSpark(POSort sort){ + super(sort.getOperatorKey(), sort.getRequestedParallelism(), null, sort.getSortPlans(), sort.getMAscCols(), sort + .getMSortFunc()); + } + + @Override + public void visit(PhyPlanVisitor v) throws VisitorException { + v.visit(this); + } + + @Override + public boolean supportsMultipleInputs() { + return false; + } + + @Override + public boolean supportsMultipleOutputs() { + return false; + } + + @Override + public String name() { + return getAliasString() + "POSparkSort" + "[" + + DataType.findTypeName(resultType) + "]" + "(" + + (super.getMSortFunc() != null ? super.getMSortFunc().getFuncSpec() : "") + ")" + + " - " + mKey.toString(); + } + + @Override + public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { + return null; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java Mon May 29 15:00:39 2017 @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.optimizer; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.VisitorException; + +import java.util.List; + +/** + * A visitor to optimize plans that determines if a vertex plan can run in + * accumulative mode. + */ +public class AccumulatorOptimizer extends SparkOpPlanVisitor { + + public AccumulatorOptimizer(SparkOperPlan plan) { + super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); + } + + @Override + public void visitSparkOp(SparkOperator sparkOperator) throws + VisitorException { + PhysicalPlan plan = sparkOperator.physicalPlan; + List<PhysicalOperator> pos = plan.getRoots(); + if (pos == null || pos.size() == 0) { + return; + } + + List<POGlobalRearrange> glrs = PlanHelper.getPhysicalOperators(plan, + POGlobalRearrange.class); + + for (POGlobalRearrange glr : glrs) { + List<PhysicalOperator> successors = plan.getSuccessors(glr); + AccumulatorOptimizerUtil.addAccumulator(plan, successors); + } + } +} \ No newline at end of file Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java Mon May 29 15:00:39 2017 @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.optimizer; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil; +import org.apache.pig.data.DataType; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.plan.optimizer.OptimizerException; +import org.apache.pig.impl.util.Pair; + +import com.google.common.collect.Maps; + +/** + * This class goes through the physical plan are replaces GlobalRearrange with ReduceBy + * where there are algebraic operations. + */ +public class CombinerOptimizer extends SparkOpPlanVisitor { + + private static Log LOG = LogFactory.getLog(CombinerOptimizer.class); + + public CombinerOptimizer(SparkOperPlan plan) { + super(plan, new DepthFirstWalker<>(plan)); + } + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + try { + addCombiner(sparkOp.physicalPlan); + } catch (Exception e) { + throw new VisitorException(e); + } + } + + // Checks for algebraic operations and if they exist. + // Replaces global rearrange (cogroup) with reduceBy as follows: + // Input: + // foreach (using algebraicOp) + // -> packager + // -> globalRearrange + // -> localRearrange + // Output: + // foreach (using algebraicOp.Final) + // -> reduceBy (uses algebraicOp.Intermediate) + // -> foreach (using algebraicOp.Initial) + // -> CombinerRearrange + private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException, CloneNotSupportedException { + + List<PhysicalOperator> leaves = phyPlan.getLeaves(); + if (leaves == null || leaves.size() != 1) { + return; + } + + // Ensure there is grouping. + List<POGlobalRearrange> glrs = PlanHelper.getPhysicalOperators(phyPlan, POGlobalRearrange.class); + if (glrs == null || glrs.size() == 0) { + return; + } + for (POGlobalRearrange glr : glrs) { + List<PhysicalOperator> glrSuccessors = phyPlan.getSuccessors(glr); + if (glrSuccessors == null || glrSuccessors.isEmpty()) { + continue; + } + + if (!(glrSuccessors.get(0) instanceof POPackage)) { + continue; + } + POPackage poPackage = (POPackage) glrSuccessors.get(0); + + List<PhysicalOperator> poPackageSuccessors = phyPlan.getSuccessors(poPackage); + if (poPackageSuccessors == null || poPackageSuccessors.size() != 1) { + continue; + } + PhysicalOperator successor = poPackageSuccessors.get(0); + + // Retaining the original successor to be used later in modifying the plan. + PhysicalOperator packageSuccessor = successor; + + if (successor instanceof POLimit) { + // POLimit is acceptable, as long as it has a single foreach as + // successor + List<PhysicalOperator> limitSucs = phyPlan.getSuccessors(successor); + if (limitSucs != null && limitSucs.size() == 1 && + limitSucs.get(0) instanceof POForEach) { + // the code below will now further examine the foreach + successor = limitSucs.get(0); + } + } + if (successor instanceof POForEach) { + POForEach foreach = (POForEach) successor; + List<PhysicalOperator> foreachSuccessors = phyPlan.getSuccessors(foreach); + // multi-query + if (foreachSuccessors == null || foreachSuccessors.size() != 1) { + continue; + } + // Clone foreach so it can be modified to a post-reduce foreach. + POForEach postReduceFE = foreach.clone(); + List<PhysicalPlan> feInners = postReduceFE.getInputPlans(); + + // find algebraic operators and also check if the foreach statement + // is suitable for combiner use + List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = CombinerOptimizerUtil.findAlgebraicOps + (feInners); + if (algebraicOps == null || algebraicOps.size() == 0) { + // the plan is not combinable or there is nothing to combine + // we're done + continue; + } + try { + List<PhysicalOperator> glrPredecessors = phyPlan.getPredecessors(glr); + // Exclude co-group from optimization + if (glrPredecessors == null || glrPredecessors.size() != 1) { + continue; + } + + if (!(glrPredecessors.get(0) instanceof POLocalRearrange)) { + continue; + } + + POLocalRearrange rearrange = (POLocalRearrange) glrPredecessors.get(0); + + LOG.info("Algebraic operations found. Optimizing plan to use combiner."); + + // Trim the global rearrange and the preceeding package. + convertToMapSideForEach(phyPlan, poPackage); + + // replace PODistinct->Project[*] with distinct udf (which is Algebraic) + for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) { + if (!(op2plan.first instanceof PODistinct)) { + continue; + } + CombinerOptimizerUtil.DistinctPatcher distinctPatcher + = new CombinerOptimizerUtil.DistinctPatcher(op2plan.second); + distinctPatcher.visit(); + if (distinctPatcher.getDistinct() == null) { + int errCode = 2073; + String msg = "Problem with replacing distinct operator with distinct built-in function."; + throw new PlanException(msg, errCode, PigException.BUG); + } + op2plan.first = distinctPatcher.getDistinct(); + } + + // create new map foreach - + POForEach mfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr() + .getKeyType()); + Map<PhysicalOperator, Integer> op2newpos = Maps.newHashMap(); + Integer pos = 1; + // create plan for each algebraic udf and add as inner plan in map-foreach + for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) { + PhysicalPlan udfPlan = CombinerOptimizerUtil.createPlanWithPredecessors(op2plan.first, + op2plan.second); + mfe.addInputPlan(udfPlan, false); + op2newpos.put(op2plan.first, pos++); + } + CombinerOptimizerUtil.changeFunc(mfe, POUserFunc.INITIAL); + + // since we will only be creating SingleTupleBag as input to + // the map foreach, we should flag the POProjects in the map + // foreach inner plans to also use SingleTupleBag + for (PhysicalPlan mpl : mfe.getInputPlans()) { + try { + new CombinerOptimizerUtil.fixMapProjects(mpl).visit(); + } catch (VisitorException e) { + int errCode = 2089; + String msg = "Unable to flag project operator to use single tuple bag."; + throw new PlanException(msg, errCode, PigException.BUG, e); + } + } + + // create new combine foreach + POForEach cfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr() + .getKeyType()); + // add algebraic functions with appropriate projection + CombinerOptimizerUtil.addAlgebraicFuncToCombineFE(cfe, op2newpos); + + // we have modified the foreach inner plans - so set them again + // for the foreach so that foreach can do any re-initialization + // around them. + mfe.setInputPlans(mfe.getInputPlans()); + cfe.setInputPlans(cfe.getInputPlans()); + + // tell POCombinerPackage which fields need projected and which + // placed in bags. First field is simple project rest need to go + // into bags + int numFields = algebraicOps.size() + 1; // algebraic funcs + group key + boolean[] bags = new boolean[numFields]; + bags[0] = false; + for (int i = 1; i < numFields; i++) { + bags[i] = true; + } + + // Use the POCombiner package in the combine plan + // as it needs to act differently than the regular + // package operator. + CombinerPackager pkgr = new CombinerPackager(poPackage.getPkgr(), bags); + POPackage combinePack = poPackage.clone(); + combinePack.setPkgr(pkgr); + + // A specialized local rearrange operator will replace + // the normal local rearrange in the map plan. + POLocalRearrange newRearrange = CombinerOptimizerUtil.getNewRearrange(rearrange); + POPreCombinerLocalRearrange combinerLocalRearrange = CombinerOptimizerUtil.getPreCombinerLR + (rearrange); + phyPlan.replace(rearrange, combinerLocalRearrange); + + // Create a reduceBy operator. + POReduceBySpark reduceOperator = new POReduceBySpark(cfe.getOperatorKey(), combinerLocalRearrange + .getRequestedParallelism(), cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack, + newRearrange); + reduceOperator.setCustomPartitioner(glr.getCustomPartitioner()); + fixReduceSideFE(postReduceFE, algebraicOps); + CombinerOptimizerUtil.changeFunc(reduceOperator, POUserFunc.INTERMEDIATE); + updatePackager(reduceOperator, newRearrange); + + // Add the new operators + phyPlan.add(reduceOperator); + phyPlan.add(mfe); + // Connect the new operators as follows: + // reduceBy (using algebraicOp.Intermediate) + // -> foreach (using algebraicOp.Initial) + phyPlan.connect(mfe, reduceOperator); + + // Insert the reduce stage between combiner rearrange and its successor. + phyPlan.disconnect(combinerLocalRearrange, packageSuccessor); + phyPlan.connect(reduceOperator, packageSuccessor); + phyPlan.connect(combinerLocalRearrange, mfe); + + // Replace foreach with post reduce foreach + phyPlan.add(postReduceFE); + phyPlan.replace(foreach, postReduceFE); + } catch (Exception e) { + int errCode = 2018; + String msg = "Internal error. Unable to introduce the combiner for optimization."; + throw new OptimizerException(msg, errCode, PigException.BUG, e); + } + } + } + } + + // Modifies the input plans of the post reduce foreach to match the output of reduce stage. + private void fixReduceSideFE(POForEach postReduceFE, List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps) + throws ExecException, PlanException { + int i=1; + for (Pair<PhysicalOperator, PhysicalPlan> algebraicOp : algebraicOps) { + POUserFunc combineUdf = (POUserFunc) algebraicOp.first; + PhysicalPlan pplan = algebraicOp.second; + combineUdf.setAlgebraicFunction(POUserFunc.FINAL); + + POProject newProj = new POProject( + CombinerOptimizerUtil.createOperatorKey(postReduceFE.getOperatorKey().getScope()), + 1, i + ); + newProj.setResultType(DataType.BAG); + + PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0); + pplan.disconnect(udfInput, combineUdf); + pplan.add(newProj); + pplan.connect(newProj, combineUdf); + i++; + } + postReduceFE.setResultType(DataType.TUPLE); + } + + // Modifies the map side of foreach (before reduce). + private void convertToMapSideForEach(PhysicalPlan physicalPlan, POPackage poPackage) + throws PlanException { + LinkedList<PhysicalOperator> operatorsToRemove = new LinkedList<>(); + for (PhysicalOperator physicalOperator : physicalPlan.getPredecessors(poPackage)) { + if (physicalOperator instanceof POGlobalRearrangeSpark) { + operatorsToRemove.add(physicalOperator); + break; + } + } + // Remove global rearranges preceeding POPackage + for (PhysicalOperator po : operatorsToRemove) { + physicalPlan.removeAndReconnect(po); + } + // Remove POPackage itself. + physicalPlan.removeAndReconnect(poPackage); + } + + // Update the ReduceBy Operator with the packaging used by Local rearrange. + private void updatePackager(POReduceBySpark reduceOperator, POLocalRearrange lrearrange) throws OptimizerException { + Packager pkgr = reduceOperator.getPKGOp().getPkgr(); + // annotate the package with information from the LORearrange + // update the keyInfo information if already present in the POPackage + Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkgr.getKeyInfo(); + if (keyInfo == null) + keyInfo = new HashMap<>(); + + if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) { + // something is wrong - we should not be getting key info + // for the same index from two different Local Rearranges + int errCode = 2087; + String msg = "Unexpected problem during optimization." + + " Found index:" + lrearrange.getIndex() + + " in multiple LocalRearrange operators."; + throw new OptimizerException(msg, errCode, PigException.BUG); + + } + keyInfo.put(Integer.valueOf(lrearrange.getIndex()), + new Pair<Boolean, Map<Integer, Integer>>( + lrearrange.isProjectStar(), lrearrange.getProjectedColsMap())); + pkgr.setKeyInfo(keyInfo); + pkgr.setKeyTuple(lrearrange.isKeyTuple()); + pkgr.setKeyCompound(lrearrange.isKeyCompound()); + } + + /** + * Look for a algebraic POUserFunc that is the leaf of an input plan. + * + * @param pplan physical plan + * @return null if any operator other POProject or non-algebraic POUserFunc is + * found while going down the plan, otherwise algebraic POUserFunc is returned + */ + private static POUserFunc getAlgebraicSuccessor(PhysicalPlan pplan) { + // check if it ends in an UDF + List<PhysicalOperator> leaves = pplan.getLeaves(); + if (leaves == null || leaves.size() != 1) { + return null; + } + + PhysicalOperator succ = leaves.get(0); + if (succ instanceof POUserFunc && ((POUserFunc) succ).combinable()) { + return (POUserFunc) succ; + } + + // some other operator ? can't combine + return null; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.optimizer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.MultiMap; + +/** + * Collapse LocalRearrange,GlobalRearrange,Package to POJoinGroupSpark to reduce unnecessary + * map operations to optimize join/group. Detail see PIG-4797 + */ +public class JoinGroupOptimizerSpark extends SparkOpPlanVisitor { + private static final Log LOG = LogFactory.getLog(JoinGroupOptimizerSpark.class); + + public JoinGroupOptimizerSpark(SparkOperPlan plan) { + super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan, true)); + } + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + if (sparkOp.physicalPlan != null) { + GlobalRearrangeDiscover glrDiscover = new GlobalRearrangeDiscover(sparkOp.physicalPlan); + glrDiscover.visit(); + List<PhysicalPlan> plans = glrDiscover.getPlansWithJoinAndGroup(); + handlePlans(plans, sparkOp); + } + + } + + private void handlePlans(List<PhysicalPlan> plans, SparkOperator sparkOp) throws VisitorException { + for(int i=0;i<plans.size();i++){ + PhysicalPlan planWithJoinAndGroup = plans.get(i); + POGlobalRearrangeSpark glrSpark = PlanHelper.getPhysicalOperators(planWithJoinAndGroup,POGlobalRearrangeSpark.class).get(0); + if (verifyJoinOrGroupCase(plans.get(i), glrSpark)) { + try { + restructSparkOp(planWithJoinAndGroup, glrSpark, sparkOp); + } catch (PlanException e) { + throw new VisitorException("GlobalRearrangeDiscover#visitSparkOp fails: ", e); + } + } + } + } + + static class GlobalRearrangeDiscover extends PhyPlanVisitor { + private List<PhysicalPlan> plansWithJoinAndGroup = new ArrayList<PhysicalPlan>(); + public GlobalRearrangeDiscover(PhysicalPlan plan) { + super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( + plan)); + } + + @Override + public void visitGlobalRearrange(POGlobalRearrange glr) throws VisitorException { + PhysicalPlan currentPlan = this.mCurrentWalker.getPlan();//If there are POSplit, we need traverse the POSplit.getPlans(), so use mCurrentWalker.getPlan() + if( currentPlan != null) { + plansWithJoinAndGroup.add(currentPlan); + } + } + + public List<PhysicalPlan> getPlansWithJoinAndGroup() { + return plansWithJoinAndGroup; + } + } + + //collapse LRA,GRA,PKG to POJoinGroupSpark + private void restructSparkOp(PhysicalPlan plan,POGlobalRearrangeSpark glaOp, SparkOperator sparkOp) throws PlanException { + + List<PhysicalOperator> predes = plan.getPredecessors(glaOp); + if (predes != null) { + List<POLocalRearrange> lraOps = new ArrayList<POLocalRearrange>(); + List<PhysicalOperator> allPredsOfLRA = new ArrayList<PhysicalOperator>(); + + //Get the predecessors of POJoinGroupSpark with correct order after JoinOptimizationSpark + //For other PhysicalOperator, we usually use OperatorPlan#getPredecessor(op) to get predecessors and sort predecessors[JobGraphBuilder#getPredecessors] to + //get the predecessor with correct order(in common case, PhysicalOperator + //with small OperatorKey must be executed before that with bigger OperatorKey),but this is not suitable for POJoinGroupSpark + //Give an example to explain this: + //original: + //POLOAD(scope-1) POLOAD(scope-2) + // \ / + // POFOREach(scope-3) POLocalRearrange(scope-5) + // \ / + // POLocalRearrange(scope-4) POLocalRearrange(scope-5) + // \ / + // POGlobalRearrange(scope-6) + // | + // POPackage(scope-7) + //after JoinOptimizationSpark: + //POLOAD(scope-1) POLOAD(scope-2) + // \ / + // POFOREach(scope-3) / + // \ / + // POJoinGroupSpark(scope-8) + + //the predecessor of POJoinGroupSpark(scope-8) is POForEach(scope-3) and POLoad(scope-2) because they are + //the predecessor of POLocalRearrange(scope-4) and POLocalRearrange(scope-5) while we will get + //will be POLoad(scope-2) and POForEach(scope-3) if use OperatorPlan#getPredecessor(op)to gain predecessors and sort predecessors + Collections.sort(predes); + for (PhysicalOperator lra : predes) { + lraOps.add((POLocalRearrange) lra); + List<PhysicalOperator> predOfLRAList = plan.getPredecessors(lra); + if( predOfLRAList != null && predOfLRAList.size() ==1) { + PhysicalOperator predOfLRA = predOfLRAList.get(0); + plan.disconnect(predOfLRA, lra); + allPredsOfLRA.add(predOfLRA); + } + } + + POPackage pkgOp = (POPackage) plan.getSuccessors(glaOp).get(0); + PhysicalOperator pkgSuccessor = plan.getSuccessors(pkgOp).get(0); + POJoinGroupSpark joinSpark = new POJoinGroupSpark(lraOps, glaOp, pkgOp); + if(allPredsOfLRA.size()>0) { + joinSpark.setPredecessors(allPredsOfLRA); + } + plan.add(joinSpark); + + for (PhysicalOperator predOfLRA : allPredsOfLRA) { + plan.connect(predOfLRA, joinSpark); + } + + plan.disconnect(pkgOp, pkgSuccessor); + plan.connect(joinSpark, pkgSuccessor); + for (POLocalRearrange lra : lraOps) { + replaceMultiqueryMapping(sparkOp, lra, joinSpark); + plan.remove(lra); + } + plan.remove(glaOp); + plan.remove(pkgOp); + } + } + + private void replaceMultiqueryMapping(SparkOperator sparkOperator, PhysicalOperator from, PhysicalOperator to) { + MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionItems = sparkOperator.getMultiQueryOptimizeConnectionItem(); + if (multiQueryOptimizeConnectionItems.containsKey(from.getOperatorKey())) { + List<OperatorKey> value = multiQueryOptimizeConnectionItems.get(from.getOperatorKey()); + multiQueryOptimizeConnectionItems.removeKey(from.getOperatorKey()); + multiQueryOptimizeConnectionItems.put(to.getOperatorKey(), value); + } + } + + private boolean verifyJoinOrGroupCase(PhysicalPlan plan, POGlobalRearrangeSpark glaOp) { + List<PhysicalOperator> lraOps = plan.getPredecessors(glaOp); + List<PhysicalOperator> pkgOps = plan.getSuccessors(glaOp); + boolean isAllPredecessorLRA = isAllPredecessorLRA(lraOps); + boolean isSuccessorPKG = isSuccessorPKG(pkgOps); + return isAllPredecessorLRA && isSuccessorPKG; + } + + private boolean isSuccessorPKG(List<PhysicalOperator> pkgOps) { + boolean result = false; + if (pkgOps != null && (pkgOps.size() == 1)) { + if (pkgOps.get(0) instanceof POPackage) { + result = true; + } + } else { + result = false; + } + + + return result; + } + + private boolean isAllPredecessorLRA(List<PhysicalOperator> lraOps) { + boolean result = true; + if (lraOps != null) { + for (PhysicalOperator lraOp : lraOps) { + if (!(lraOp instanceof POLocalRearrange)) { + result = false; + break; + } + } + } else { + result = false; + } + + return result; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.optimizer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.ReverseDependencyOrderWalker; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.plan.optimizer.OptimizerException; + + +/** + * MultiQueryOptimizer for spark + */ +public class MultiQueryOptimizerSpark extends SparkOpPlanVisitor { + + private static final Log LOG = LogFactory.getLog(MultiQueryOptimizerSpark.class); + + private String scope; + private NodeIdGenerator nig; + + public MultiQueryOptimizerSpark(SparkOperPlan plan) { + super(plan, new ReverseDependencyOrderWalker<SparkOperator, SparkOperPlan>(plan)); + nig = NodeIdGenerator.getGenerator(); + List<SparkOperator> roots = plan.getRoots(); + scope = roots.get(0).getOperatorKey().getScope(); + } + + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + try { + if (!sparkOp.isSplitter()) { + return; + } + + List<SparkOperator> splittees = getPlan().getSuccessors(sparkOp); + + if (splittees == null) { + return; + } + + //If the size of predecessors of splittee is more than 1, then not do multiquery optimization + //@see TestMultiQueryBasic#testMultiQueryWithFJ_2 + for (SparkOperator splittee : splittees) { + if (getPlan().getPredecessors(splittee).size() > 1) { + return; + } + } + + if (splittees.size() == 1) { + // We don't need a POSplit here, we can merge the splittee into spliter + SparkOperator spliter = sparkOp; + SparkOperator singleSplitee = splittees.get(0); + List<PhysicalOperator> roots = singleSplitee.physicalPlan.getRoots(); + List<PhysicalOperator> rootCopys = new ArrayList<PhysicalOperator>(roots); + //sort the roots by OperatorKey + //for the first element of roots, merge the physical plan of spliter and splittee + //for the other elements of roots,merge the clone physical plan of spliter and splittee + //the clone physical plan will have same type of physical operators but have more bigger OperatorKey + //thus physical operator with bigger OperatorKey will be executed later than those have small OperatorKey(see JobGraphBuilder.sortPredecessorRDDs()) + Collections.sort(rootCopys); + List<PhysicalPlan> spliterPhysicalPlan = getPhysicalPlans(spliter.physicalPlan, rootCopys.size()); + int i = 0; + for (PhysicalOperator root : rootCopys) { + if (root instanceof POLoad) { + POLoad load = (POLoad) root; + PhysicalPlan plClone = spliterPhysicalPlan.get(i); + POStore store = (POStore) plClone.getLeaves().get(0); + if (load.getLFile().getFileName().equals(store.getSFile().getFileName())) { + plClone.remove(store); + PhysicalOperator succOfload = singleSplitee.physicalPlan.getSuccessors(load).get(0); + singleSplitee.physicalPlan.remove(load); + mergePlanAWithPlanB(singleSplitee.physicalPlan, plClone, succOfload); + i++; + } + } + } + + addSubPlanPropertiesToParent(singleSplitee, spliter); + removeSpliter(getPlan(), spliter, singleSplitee); + } else { + //If the size of splittee is more than 1, we need create a split which type is POSplit, merge all the physical plans + // of splittees to the physical plan of split and remove the splittees. + List<PhysicalOperator> firstNodeLeaves = sparkOp.physicalPlan.getLeaves(); + PhysicalOperator firstNodeLeaf = firstNodeLeaves.size() > 0 ? firstNodeLeaves.get(0) : null; + POStore poStore = null; + if (firstNodeLeaf != null && firstNodeLeaf instanceof POStore) { + poStore = (POStore) firstNodeLeaf; + PhysicalOperator predOfPoStore = sparkOp.physicalPlan.getPredecessors(poStore).get(0); + sparkOp.physicalPlan.remove(poStore); // remove unnecessary store + POSplit poSplit = createSplit(); + ArrayList<SparkOperator> spliteesCopy = new ArrayList + <SparkOperator>(splittees); + for (SparkOperator splitee : spliteesCopy) { + List<PhysicalOperator> rootsOfSplitee = new ArrayList(splitee.physicalPlan.getRoots()); + for (int i = 0; i < rootsOfSplitee.size(); i++) { + if (rootsOfSplitee.get(i) instanceof POLoad) { + POLoad poLoad = (POLoad) rootsOfSplitee.get(i); + if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) { + List<PhysicalOperator> successorsOfPoLoad = splitee.physicalPlan.getSuccessors(poLoad); + List<PhysicalOperator> successorofPoLoadsCopy = new ArrayList<PhysicalOperator>(successorsOfPoLoad); + splitee.physicalPlan.remove(poLoad); // remove unnecessary load + for (PhysicalOperator successorOfPoLoad : successorofPoLoadsCopy) { + //we store from to relationship in SparkOperator#multiQueryOptimizeConnectionMap + sparkOp.addMultiQueryOptimizeConnectionItem(successorOfPoLoad.getOperatorKey(), predOfPoStore.getOperatorKey()); + LOG.debug(String.format("add multiQueryOptimize connection item: to:%s, from:%s for %s", + successorOfPoLoad.toString(), predOfPoStore.getOperatorKey().toString(), splitee.getOperatorKey())); + } + } + } + } + poSplit.addPlan(splitee.physicalPlan); + addSubPlanPropertiesToParent(sparkOp, splitee); + removeSplittee(getPlan(), sparkOp, splitee); + } + sparkOp.physicalPlan.addAsLeaf(poSplit); + } + } + } catch (PlanException e) { + throw new VisitorException(e); + } + } + + private List<PhysicalPlan> getPhysicalPlans(PhysicalPlan physicalPlan, int size) throws OptimizerException { + List<PhysicalPlan> ppList = new ArrayList<PhysicalPlan>(); + try { + ppList.add(physicalPlan); + for (int i = 1; i < size; i++) { + ppList.add(physicalPlan.clone()); + } + } catch (CloneNotSupportedException e) { + int errCode = 2127; + String msg = "Internal Error: Cloning of plan failed for optimization."; + throw new OptimizerException(msg, errCode, PigException.BUG, e); + } + return ppList; + } + + //Merge every operators in planB to operator "to" of planA + private void mergePlanAWithPlanB(PhysicalPlan planA, PhysicalPlan planB, PhysicalOperator to) throws PlanException { + PhysicalOperator predOfStore = planB.getLeaves().get(0); + planA.merge(planB); + planA.connect(predOfStore, to); + } + + private void removeSpliter(SparkOperPlan plan, SparkOperator spliter, SparkOperator splittee) throws PlanException { + if (plan.getPredecessors(spliter) != null) { + List<SparkOperator> preds = new ArrayList(plan.getPredecessors(spliter)); + plan.disconnect(spliter, splittee); + for (SparkOperator pred : preds) { + plan.disconnect(pred, spliter); + plan.connect(pred, splittee); + } + } + plan.remove(spliter); + } + + private void removeSplittee(SparkOperPlan plan, SparkOperator splitter, + SparkOperator splittee) throws PlanException { + if (plan.getSuccessors(splittee) != null) { + List<SparkOperator> succs = new ArrayList(); + succs.addAll(plan.getSuccessors(splittee)); + plan.disconnect(splitter, splittee); + for (SparkOperator succSparkOperator : succs) { + plan.disconnect(splittee, succSparkOperator); + plan.connect(splitter, succSparkOperator); + } + } + getPlan().remove(splittee); + } + + private POSplit createSplit() { + return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope))); + } + + static public void addSubPlanPropertiesToParent(SparkOperator parentOper, SparkOperator subPlanOper) { + // Copy only map side properties. For eg: crossKeys. + // Do not copy reduce side specific properties. For eg: useSecondaryKey, segmentBelow, sortOrder, etc + if (subPlanOper.getCrossKeys() != null) { + for (String key : subPlanOper.getCrossKeys()) { + parentOper.addCrossKey(key); + } + } + parentOper.copyFeatures(subPlanOper, null); + + if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) { + parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism()); + } + subPlanOper.setRequestedParallelismByReference(parentOper); + parentOper.UDFs.addAll(subPlanOper.UDFs); + parentOper.scalars.addAll(subPlanOper.scalars); + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java Mon May 29 15:00:39 2017 @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.optimizer; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NoopFilterRemoverUtil; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.VisitorException; + +/** + * For historical reasons splits will always produce filters that pass + * everything through unchanged. This optimizer removes these. + * <p/> + * The condition we look for is POFilters with a constant boolean + * (true) expression as it's plan. + */ +public class NoopFilterRemover extends SparkOpPlanVisitor { + private Log log = LogFactory.getLog(NoopFilterRemover.class); + + public NoopFilterRemover(SparkOperPlan plan) { + super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan)); + } + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + List<POFilter> filters = PlanHelper.getPhysicalOperators(sparkOp + .physicalPlan, POFilter.class); + for (POFilter filter : filters) { + PhysicalPlan filterPlan = filter.getPlan(); + if (filterPlan.size() == 1) { + PhysicalOperator fp = filterPlan.getRoots().get(0); + if (fp instanceof ConstantExpression) { + ConstantExpression exp = (ConstantExpression) fp; + Object value = exp.getValue(); + if (value instanceof Boolean) { + Boolean filterValue = (Boolean) value; + if (filterValue) { + NoopFilterRemoverUtil.removeFilter(filter, sparkOp.physicalPlan); + } + } + } + } + } + } +}