Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Mon May 29 15:00:39 2017 @@ -0,0 +1,735 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.TransformerException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.pig.PigConfiguration; +import org.apache.pig.PigException; +import org.apache.pig.PigWarning; +import org.apache.pig.backend.BackendException; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.Launcher; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.BroadcastConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.JoinGroupSparkConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeCogroupConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.PoissonSampleConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.ReduceByConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.SparkSampleSortConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer; +import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer; +import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.JoinGroupOptimizerSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover; +import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter; +import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.DotSparkPrinter; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.XMLSparkPrinter; +import org.apache.pig.data.SchemaTupleBackend; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.JarManager; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.impl.util.Utils; +import org.apache.pig.tools.pigstats.OutputStats; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.spark.SparkCounterGroup; +import org.apache.pig.tools.pigstats.spark.SparkCounters; +import org.apache.pig.tools.pigstats.spark.SparkPigStats; +import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.scheduler.JobLogger; +import org.apache.spark.scheduler.StatsReportListener; + +import com.google.common.base.Joiner; + +/** + * Main class that launches pig for Spark + */ +public class SparkLauncher extends Launcher { + + private static final Log LOG = LogFactory.getLog(SparkLauncher.class); + + // Our connection to Spark. It needs to be static so that it can be reused + // across jobs, because a + // new SparkLauncher gets created for each job. + private static JavaSparkContext sparkContext = null; + private static JobMetricsListener jobMetricsListener = new JobMetricsListener(); + private String jobGroupID; + private PigContext pigContext = null; + private JobConf jobConf = null; + private String currentDirectoryPath = null; + private SparkEngineConf sparkEngineConf = new SparkEngineConf(); + private static final String PIG_WARNING_FQCN = PigWarning.class.getCanonicalName(); + + @Override + public PigStats launchPig(PhysicalPlan physicalPlan, String grpName, + PigContext pigContext) throws Exception { + if (LOG.isDebugEnabled()) + LOG.debug(physicalPlan); + this.pigContext = pigContext; + initialize(physicalPlan); + SparkOperPlan sparkplan = compile(physicalPlan, pigContext); + if (LOG.isDebugEnabled()) { + LOG.debug(sparkplan); + } + SparkPigStats sparkStats = (SparkPigStats) pigContext + .getExecutionEngine().instantiatePigStats(); + sparkStats.initialize(pigContext, sparkplan, jobConf); + PigStats.start(sparkStats); + + startSparkIfNeeded(pigContext); + + jobGroupID = String.format("%s-%s",sparkContext.getConf().getAppId(), + UUID.randomUUID().toString()); + jobConf.set(MRConfiguration.JOB_ID,jobGroupID); + + sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster", + false); + jobMetricsListener.reset(); + + this.currentDirectoryPath = Paths.get(".").toAbsolutePath() + .normalize().toString() + + "/"; + + new ParallelismSetter(sparkplan, jobConf).visit(); + + prepareSparkCounters(jobConf); + + // Create conversion map, mapping between pig operator and spark convertor + Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap + = new HashMap<Class<? extends PhysicalOperator>, RDDConverter>(); + convertMap.put(POLoad.class, new LoadConverter(pigContext, + physicalPlan, sparkContext.sc(), jobConf, sparkEngineConf)); + convertMap.put(POStore.class, new StoreConverter(jobConf)); + convertMap.put(POForEach.class, new ForEachConverter(jobConf)); + convertMap.put(POFilter.class, new FilterConverter()); + convertMap.put(POPackage.class, new PackageConverter()); + convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter()); + convertMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter()); + convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter()); + convertMap.put(POLimit.class, new LimitConverter()); + convertMap.put(PODistinct.class, new DistinctConverter()); + convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc())); + convertMap.put(POSort.class, new SortConverter()); + convertMap.put(POSplit.class, new SplitConverter()); + convertMap.put(POSkewedJoin.class, new SkewedJoinConverter()); + convertMap.put(POMergeJoin.class, new MergeJoinConverter()); + convertMap.put(POCollectedGroup.class, new CollectedGroupConverter()); + convertMap.put(POCounter.class, new CounterConverter()); + convertMap.put(PORank.class, new RankConverter()); + convertMap.put(POStream.class, new StreamConverter()); + convertMap.put(POFRJoinSpark.class, new FRJoinConverter()); + convertMap.put(POMergeCogroup.class, new MergeCogroupConverter()); + convertMap.put(POReduceBySpark.class, new ReduceByConverter()); + convertMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter()); + convertMap.put(POBroadcastSpark.class, new BroadcastConverter(sparkContext)); + convertMap.put(POSampleSortSpark.class, new SparkSampleSortConverter()); + convertMap.put(POPoissonSampleSpark.class, new PoissonSampleConverter()); + //Print SPARK plan before launching if needed + Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); + if (conf.getBoolean(PigConfiguration.PIG_PRINT_EXEC_PLAN, false)) { + LOG.info(sparkplan); + } + uploadResources(sparkplan); + + new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID, jobConf, pigContext).visit(); + cleanUpSparkJob(sparkStats); + sparkStats.finish(); + resetUDFContext(); + return sparkStats; + } + + private void resetUDFContext() { + UDFContext.getUDFContext().addJobConf(null); + } + + private void uploadResources(SparkOperPlan sparkPlan) throws IOException { + addFilesToSparkJob(sparkPlan); + addJarsToSparkJob(sparkPlan); + } + + private void optimize(SparkOperPlan plan, PigContext pigContext) throws IOException { + + Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); + + // Should be the first optimizer as it introduces new operators to the plan. + boolean noCombiner = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_COMBINER, false); + if (!pigContext.inIllustrator && !noCombiner) { + CombinerOptimizer combinerOptimizer = new CombinerOptimizer(plan); + combinerOptimizer.visit(); + if (LOG.isDebugEnabled()) { + LOG.debug("After combiner optimization:"); + LOG.debug(plan); + } + } + + boolean noSecondaryKey = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, false); + if (!pigContext.inIllustrator && !noSecondaryKey) { + SecondaryKeyOptimizerSpark skOptimizer = new SecondaryKeyOptimizerSpark(plan); + skOptimizer.visit(); + } + + boolean isAccum = conf.getBoolean(PigConfiguration.PIG_OPT_ACCUMULATOR, true); + if (isAccum) { + AccumulatorOptimizer accum = new AccumulatorOptimizer(plan); + accum.visit(); + } + + // removes the filter(constant(true)) operators introduced by + // splits. + NoopFilterRemover fRem = new NoopFilterRemover(plan); + fRem.visit(); + + boolean isMultiQuery = conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true); + + if (LOG.isDebugEnabled()) { + LOG.debug("Before multiquery optimization:"); + LOG.debug(plan); + } + + if (isMultiQuery) { + // reduces the number of SparkOpers in the Spark plan generated + // by multi-query (multi-store) script. + MultiQueryOptimizerSpark mqOptimizer = new MultiQueryOptimizerSpark(plan); + mqOptimizer.visit(); + } + + //since JoinGroupOptimizerSpark modifies the plan and collapses LRA+GLA+PKG into POJoinGroupSpark while + //CombinerOptimizer collapses GLA+PKG into ReduceBy, so if JoinGroupOptimizerSpark first, the spark plan will be + //changed and not suitable for CombinerOptimizer.More detail see PIG-4797 + JoinGroupOptimizerSpark joinOptimizer = new JoinGroupOptimizerSpark(plan); + joinOptimizer.visit(); + + if (LOG.isDebugEnabled()) { + LOG.debug("After multiquery optimization:"); + LOG.debug(plan); + } + } + + private void cleanUpSparkJob(SparkPigStats sparkStats) throws ExecException { + LOG.info("Clean up Spark Job"); + boolean isLocal = System.getenv("SPARK_MASTER") != null ? System + .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true; + if (isLocal) { + String shipFiles = pigContext.getProperties().getProperty( + "pig.streaming.ship.files"); + if (shipFiles != null) { + for (String file : shipFiles.split(",")) { + File shipFile = new File(file); + File deleteFile = new File(currentDirectoryPath + "/" + + shipFile.getName()); + if (deleteFile.exists()) { + LOG.info(String.format("Delete ship file result: %b", + deleteFile.delete())); + } + } + } + String cacheFiles = pigContext.getProperties().getProperty( + "pig.streaming.cache.files"); + if (cacheFiles != null) { + for (String file : cacheFiles.split(",")) { + String fileName = extractFileName(file.trim()); + File deleteFile = new File(currentDirectoryPath + "/" + + fileName); + if (deleteFile.exists()) { + LOG.info(String.format("Delete cache file result: %b", + deleteFile.delete())); + } + } + } + } + + // run cleanup for all of the stores + for (OutputStats output : sparkStats.getOutputStats()) { + POStore store = output.getPOStore(); + try { + if (!output.isSuccessful()) { + store.getStoreFunc().cleanupOnFailure( + store.getSFile().getFileName(), + Job.getInstance(output.getConf())); + } else { + store.getStoreFunc().cleanupOnSuccess( + store.getSFile().getFileName(), + Job.getInstance(output.getConf())); + } + } catch (IOException e) { + throw new ExecException(e); + } catch (AbstractMethodError nsme) { + // Just swallow it. This means we're running against an + // older instance of a StoreFunc that doesn't implement + // this method. + } + } + } + + private void addFilesToSparkJob(SparkOperPlan sparkPlan) throws IOException { + LOG.info("Add files Spark Job"); + String shipFiles = pigContext.getProperties().getProperty( + "pig.streaming.ship.files"); + shipFiles(shipFiles); + String cacheFiles = pigContext.getProperties().getProperty( + "pig.streaming.cache.files"); + cacheFiles(cacheFiles); + addUdfResourcesToSparkJob(sparkPlan); + } + + private void addUdfResourcesToSparkJob(SparkOperPlan sparkPlan) throws IOException { + SparkPOUserFuncVisitor sparkPOUserFuncVisitor = new SparkPOUserFuncVisitor(sparkPlan); + sparkPOUserFuncVisitor.visit(); + Joiner joiner = Joiner.on(","); + String shipFiles = joiner.join(sparkPOUserFuncVisitor.getShipFiles()); + shipFiles(shipFiles); + String cacheFiles = joiner.join(sparkPOUserFuncVisitor.getCacheFiles()); + cacheFiles(cacheFiles); + } + + private void shipFiles(String shipFiles) + throws IOException { + if (shipFiles != null && !shipFiles.isEmpty()) { + for (String file : shipFiles.split(",")) { + File shipFile = new File(file.trim()); + if (shipFile.exists()) { + addResourceToSparkJobWorkingDirectory(shipFile, + shipFile.getName(), ResourceType.FILE); + } + } + } + } + + private void cacheFiles(String cacheFiles) throws IOException { + if (cacheFiles != null && !cacheFiles.isEmpty()) { + File tmpFolder = Files.createTempDirectory("cache").toFile(); + tmpFolder.deleteOnExit(); + for (String file : cacheFiles.split(",")) { + String fileName = extractFileName(file.trim()); + if( fileName != null) { + String fileUrl = extractFileUrl(file.trim()); + if( fileUrl != null) { + Path src = new Path(fileUrl); + File tmpFile = new File(tmpFolder, fileName); + Path tmpFilePath = new Path(tmpFile.getAbsolutePath()); + FileSystem fs = tmpFilePath.getFileSystem(jobConf); + //TODO:PIG-5241 Specify the hdfs path directly to spark and avoid the unnecessary download and upload in SparkLauncher.java + fs.copyToLocalFile(src, tmpFilePath); + tmpFile.deleteOnExit(); + LOG.info(String.format("CacheFile:%s", fileName)); + addResourceToSparkJobWorkingDirectory(tmpFile, fileName, + ResourceType.FILE); + } + } + } + } + } + + public static enum ResourceType { + JAR, + FILE + } + + + private void addJarsToSparkJob(SparkOperPlan sparkPlan) throws IOException { + Set<String> allJars = new HashSet<String>(); + LOG.info("Add default jars to Spark Job"); + allJars.addAll(JarManager.getDefaultJars()); + LOG.info("Add extra jars to Spark Job"); + for (String scriptJar : pigContext.scriptJars) { + allJars.add(scriptJar); + } + + LOG.info("Add udf jars to Spark Job"); + UDFJarsFinder udfJarsFinder = new UDFJarsFinder(sparkPlan, pigContext); + udfJarsFinder.visit(); + Set<String> udfJars = udfJarsFinder.getUdfJars(); + for (String udfJar : udfJars) { + allJars.add(udfJar); + } + + File scriptUDFJarFile = JarManager.createPigScriptUDFJar(pigContext); + if (scriptUDFJarFile != null) { + LOG.info("Add script udf jar to Spark job"); + allJars.add(scriptUDFJarFile.getAbsolutePath().toString()); + } + + //Upload all jars to spark working directory + for (String jar : allJars) { + File jarFile = new File(jar); + addResourceToSparkJobWorkingDirectory(jarFile, jarFile.getName(), + ResourceType.JAR); + } + } + + private void addResourceToSparkJobWorkingDirectory(File resourcePath, + String resourceName, ResourceType resourceType) throws IOException { + if (resourceType == ResourceType.JAR) { + LOG.info("Added jar " + resourceName); + } else { + LOG.info("Added file " + resourceName); + } + boolean isLocal = System.getenv("SPARK_MASTER") != null ? System + .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true; + if (isLocal) { + File localFile = new File(currentDirectoryPath + "/" + + resourceName); + if (resourcePath.getAbsolutePath().equals(localFile.getAbsolutePath()) + && resourcePath.exists()) { + return; + } + // When multiple threads start SparkLauncher, delete/copy actions should be in a critical section + synchronized(SparkLauncher.class) { + if (localFile.exists()) { + LOG.info(String.format( + "Jar file %s exists, ready to delete", + localFile.getAbsolutePath())); + localFile.delete(); + } else { + LOG.info(String.format("Jar file %s not exists,", + localFile.getAbsolutePath())); + } + Files.copy(Paths.get(new Path(resourcePath.getAbsolutePath()).toString()), + Paths.get(localFile.getAbsolutePath())); + } + } else { + if(resourceType == ResourceType.JAR){ + sparkContext.addJar(resourcePath.toURI().toURL() + .toExternalForm()); + }else if( resourceType == ResourceType.FILE){ + sparkContext.addFile(resourcePath.toURI().toURL() + .toExternalForm()); + } + } + } + + private String extractFileName(String cacheFileUrl) { + String[] tmpAry = cacheFileUrl.split("#"); + String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[1] + : null; + return fileName; + } + + private String extractFileUrl(String cacheFileUrl) { + String[] tmpAry = cacheFileUrl.split("#"); + String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[0] + : null; + return fileName; + } + + public SparkOperPlan compile(PhysicalPlan physicalPlan, + PigContext pigContext) throws PlanException, IOException, + VisitorException { + SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan, + pigContext); + sparkCompiler.compile(); + sparkCompiler.connectSoftLink(); + SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan(); + + // optimize key - value handling in package + SparkPOPackageAnnotator pkgAnnotator = new SparkPOPackageAnnotator( + sparkPlan); + pkgAnnotator.visit(); + + optimize(sparkPlan, pigContext); + return sparkPlan; + } + + /** + * Only one SparkContext may be active per JVM (SPARK-2243). When multiple threads start SparkLaucher, + * the static member sparkContext should be initialized only once + */ + private static synchronized void startSparkIfNeeded(PigContext pc) throws PigException { + if (sparkContext == null) { + String master = null; + if (pc.getExecType().isLocal()) { + master = "local"; + } else { + master = System.getenv("SPARK_MASTER"); + if (master == null) { + LOG.info("SPARK_MASTER not specified, using \"local\""); + master = "local"; + } + } + + String sparkHome = System.getenv("SPARK_HOME"); + if (!master.startsWith("local") && !master.equals("yarn-client")) { + // Check that we have the Mesos native library and Spark home + // are set + if (sparkHome == null) { + System.err + .println("You need to set SPARK_HOME to run on a Mesos cluster!"); + throw new PigException("SPARK_HOME is not set"); + } + } + + SparkConf sparkConf = new SparkConf(); + Properties pigCtxtProperties = pc.getProperties(); + + sparkConf.setMaster(master); + sparkConf.setAppName(pigCtxtProperties.getProperty(PigContext.JOB_NAME,"pig")); + // On Spark 1.6, Netty file server doesn't allow adding the same file with the same name twice + // This is a problem for streaming using a script + explicit ship the same script combination (PIG-5134) + // HTTP file server doesn't have this restriction, it overwrites the file if added twice + String useNettyFileServer = pigCtxtProperties.getProperty(PigConfiguration.PIG_SPARK_USE_NETTY_FILESERVER, "false"); + sparkConf.set("spark.rpc.useNettyFileServer", useNettyFileServer); + + if (sparkHome != null && !sparkHome.isEmpty()) { + sparkConf.setSparkHome(sparkHome); + } else { + LOG.warn("SPARK_HOME is not set"); + } + + //Copy all spark.* properties to SparkConf + for (String key : pigCtxtProperties.stringPropertyNames()) { + if (key.startsWith("spark.")) { + LOG.debug("Copying key " + key + " with value " + + pigCtxtProperties.getProperty(key) + " to SparkConf"); + sparkConf.set(key, pigCtxtProperties.getProperty(key)); + } + } + + //see PIG-5200 why need to set spark.executor.userClassPathFirst as true + sparkConf.set("spark.executor.userClassPathFirst", "true"); + checkAndConfigureDynamicAllocation(master, sparkConf); + + sparkContext = new JavaSparkContext(sparkConf); + sparkContext.sc().addSparkListener(new StatsReportListener()); + sparkContext.sc().addSparkListener(new JobLogger()); + sparkContext.sc().addSparkListener(jobMetricsListener); + } + } + + private static void checkAndConfigureDynamicAllocation(String master, SparkConf sparkConf) { + if (sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + if (!master.startsWith("yarn")) { + LOG.warn("Dynamic allocation is enabled, but " + + "script isn't running on yarn. Ignoring ..."); + } + if (!sparkConf.getBoolean("spark.shuffle.service.enabled", false)) { + LOG.info("Spark shuffle service is being enabled as dynamic " + + "allocation is enabled"); + sparkConf.set("spark.shuffle.service.enabled", "true"); + } + } + } + + // You can use this in unit tests to stop the SparkContext between tests. + static void stopSpark() { + if (sparkContext != null) { + sparkContext.stop(); + sparkContext = null; + } + } + + + @Override + public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps, + String format, boolean verbose) throws IOException { + SparkOperPlan sparkPlan = compile(pp, pc); + explain(sparkPlan, ps, format, verbose); + } + + private void explain(SparkOperPlan sparkPlan, PrintStream ps, + String format, boolean verbose) + throws IOException { + Map<OperatorKey, SparkOperator> allOperKeys = sparkPlan.getKeys(); + List<OperatorKey> operKeyList = new ArrayList<>(allOperKeys.keySet()); + Collections.sort(operKeyList); + + if (format.equals("text")) { + for (OperatorKey operatorKey : operKeyList) { + SparkOperator op = sparkPlan.getOperator(operatorKey); + ps.print(op.getOperatorKey()); + List<SparkOperator> successors = sparkPlan.getSuccessors(op); + if (successors != null) { + ps.print("->"); + for (SparkOperator suc : successors) { + ps.print(suc.getOperatorKey() + " "); + } + } + ps.println(); + } + SparkPrinter printer = new SparkPrinter(ps, sparkPlan); + printer.setVerbose(verbose); + printer.visit(); + } else if (format.equals("dot")) { + ps.println("#--------------------------------------------------"); + ps.println("# Spark Plan"); + ps.println("#--------------------------------------------------"); + + DotSparkPrinter printer = new DotSparkPrinter(sparkPlan, ps); + printer.setVerbose(verbose); + printer.dump(); + ps.println(""); + } else if (format.equals("xml")) { + try { + XMLSparkPrinter printer = new XMLSparkPrinter(ps, sparkPlan); + printer.visit(); + printer.closePlan(); + } catch (ParserConfigurationException e) { + e.printStackTrace(); + } catch (TransformerException e) { + e.printStackTrace(); + } + } + else { + throw new IOException( + "Unsupported explain format. Supported formats are: text, dot, xml"); + } + } + + @Override + public void kill() throws BackendException { + if (sparkContext != null) { + sparkContext.stop(); + sparkContext = null; + } + } + + @Override + public void killJob(String jobID, Configuration conf) + throws BackendException { + if (sparkContext != null) { + sparkContext.stop(); + sparkContext = null; + } + } + + /** + * We store the value of udf.import.list in SparkEngineConf#properties + * Later we will serialize it in SparkEngineConf#writeObject and deserialize in SparkEngineConf#readObject. More + * detail see PIG-4920 + */ + private void saveUdfImportList() { + String udfImportList = Joiner.on(",").join(PigContext.getPackageImportList()); + sparkEngineConf.setSparkUdfImportListStr(udfImportList); + } + + private void initialize(PhysicalPlan physicalPlan) throws IOException { + saveUdfImportList(); + jobConf = SparkUtil.newJobConf(pigContext, physicalPlan, sparkEngineConf); + SchemaTupleBackend.initialize(jobConf, pigContext); + Utils.setDefaultTimeZone(jobConf); + PigMapReduce.sJobConfInternal.set(jobConf); + String parallelism = pigContext.getProperties().getProperty("spark.default.parallelism"); + if (parallelism != null) { + SparkPigContext.get().setDefaultParallelism(Integer.parseInt(parallelism)); + } + } + + /** + * Creates new SparkCounters instance for the job, initializes aggregate warning counters if required + * @param jobConf + * @throws IOException + */ + private static void prepareSparkCounters(JobConf jobConf) throws IOException { + SparkPigStatusReporter statusReporter = SparkPigStatusReporter.getInstance(); + SparkCounters counters = new SparkCounters(sparkContext); + + if ("true".equalsIgnoreCase(jobConf.get("aggregate.warning"))) { + SparkCounterGroup pigWarningGroup = new SparkCounterGroup.MapSparkCounterGroup( + PIG_WARNING_FQCN, PIG_WARNING_FQCN,sparkContext + ); + pigWarningGroup.createCounter(PigWarning.SPARK_WARN.name(), new HashMap<String,Long>()); + pigWarningGroup.createCounter(PigWarning.SPARK_CUSTOM_WARN.name(), new HashMap<String,Long>()); + counters.getSparkCounterGroups().put(PIG_WARNING_FQCN, pigWarningGroup); + } + statusReporter.setCounters(counters); + jobConf.set("pig.spark.counters", ObjectSerializer.serialize(counters)); + } +}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java Mon May 29 15:00:39 2017 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.util.Properties; + +/** + * SparkLocalExecType is the ExecType for local mode in Spark. + */ +public class SparkLocalExecType extends SparkExecType { + + private static final long serialVersionUID = 1L; + private static final String mode = "SPARK_LOCAL"; + + @Override + public boolean accepts(Properties properties) { + String execTypeSpecified = properties.getProperty("exectype", "") + .toUpperCase(); + if (execTypeSpecified.equals(mode)) + return true; + return false; + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public String name() { + return "SPARK_LOCAL"; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java Mon May 29 15:00:39 2017 @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.VisitorException; + +import java.util.HashSet; +import java.util.Set; + +public class SparkPOUserFuncVisitor extends SparkOpPlanVisitor { + private Set<String> cacheFiles = new HashSet<>(); + private Set<String> shipFiles = new HashSet<>(); + + public SparkPOUserFuncVisitor(SparkOperPlan plan) { + super(plan, new DepthFirstWalker<>(plan)); + } + + @Override + public void visitSparkOp(SparkOperator sparkOperator) throws VisitorException { + if(!sparkOperator.physicalPlan.isEmpty()) { + UdfCacheShipFilesVisitor udfCacheFileVisitor = new UdfCacheShipFilesVisitor(sparkOperator.physicalPlan); + udfCacheFileVisitor.visit(); + cacheFiles.addAll(udfCacheFileVisitor.getCacheFiles()); + shipFiles.addAll(udfCacheFileVisitor.getShipFiles()); + } + } + + public Set<String> getCacheFiles() { + return cacheFiles; + } + + public Set<String> getShipFiles() { + return shipFiles; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java Mon May 29 15:00:39 2017 @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.data.Tuple; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.rdd.RDD; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +/** + * singleton class like PigContext + */ +public class SparkPigContext { + + private static SparkPigContext context = null; + private static ThreadLocal<Integer> defaultParallelism = null; + private static ConcurrentHashMap<String, Broadcast<List<Tuple>>> broadcastedVars = new ConcurrentHashMap() ; + + public static SparkPigContext get(){ + if( context == null){ + context = new SparkPigContext(); + } + return context; + } + public static int getDefaultParallelism() { + return defaultParallelism.get(); + } + + + public static int getParallelism(List<RDD<Tuple>> predecessors, + PhysicalOperator physicalOperator) { + if (defaultParallelism != null) { + return getDefaultParallelism(); + } + + int parallelism = physicalOperator.getRequestedParallelism(); + if (parallelism <= 0) { + //Spark automatically sets the number of "map" tasks to run on each file according to its size (though + // you can control it through optional parameters to SparkContext.textFile, etc), and for distributed + //"reduce" operations, such as groupByKey and reduceByKey, it uses the largest parent RDD's number of + // partitions. + int maxParallism = 0; + for (int i = 0; i < predecessors.size(); i++) { + int tmpParallelism = predecessors.get(i).getNumPartitions(); + if (tmpParallelism > maxParallism) { + maxParallism = tmpParallelism; + } + } + parallelism = maxParallism; + } + return parallelism; + } + + public static void setDefaultParallelism(int defaultParallelism) { + SparkPigContext.defaultParallelism.set(defaultParallelism); + } + + public static ConcurrentHashMap<String, Broadcast<List<Tuple>>> getBroadcastedVars() { + return broadcastedVars; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java Mon May 29 15:00:39 2017 @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; + +/** + * Record reader for Spark mode - handles SparkPigSplit + */ +public class SparkPigRecordReader extends PigRecordReader { + + + /** + * @param inputformat + * @param pigSplit + * @param loadFunc + * @param context + * @param limit + */ + public SparkPigRecordReader(InputFormat<?, ?> inputformat, PigSplit pigSplit, LoadFunc loadFunc, TaskAttemptContext context, long limit) throws IOException, InterruptedException { + super(inputformat, pigSplit, loadFunc, context, limit); + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + SparkPigSplit sparkPigSplit = (SparkPigSplit)split; + super.initialize(sparkPigSplit.getWrappedPigSplit(), context); + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java Mon May 29 15:00:39 2017 @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; + +/** + * Wrapper class for PigSplits in Spark mode + * + * Spark only counts HDFS bytes read if we provide a FileSplit, so we have to wrap PigSplits and have the wrapper + * class extend FileSplit + */ +public interface SparkPigSplit extends Writable, Configurable, Serializable { + + InputSplit getWrappedSplit(); + + InputSplit getWrappedSplit(int idx); + + SplitLocationInfo[] getLocationInfo() throws IOException; + + long getLength(int idx) throws IOException, InterruptedException; + + int getSplitIndex(); + + void setMultiInputs(boolean b); + + boolean isMultiInputs(); + + int getNumPaths(); + + void setDisableCounter(boolean disableCounter); + + boolean disableCounter(); + + void setCurrentIdx(int idx); + + PigSplit getWrappedPigSplit(); + + public static class FileSparkPigSplit extends FileSplit implements SparkPigSplit { + + private PigSplit pigSplit; + + /** + * Spark executor's deserializer calls this, and we have to instantiate a default wrapped object + */ + public FileSparkPigSplit () { + pigSplit = new PigSplit(); + } + + public FileSparkPigSplit(PigSplit pigSplit) { + this.pigSplit = pigSplit; + } + + @Override + public SplitLocationInfo[] getLocationInfo() throws IOException { + return pigSplit.getLocationInfo(); + } + + @Override + public String toString() { + return pigSplit.toString(); + } + + @Override + public long getLength() { + try { + return pigSplit.getLength(); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public String[] getLocations() throws IOException { + try { + return pigSplit.getLocations(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public InputSplit getWrappedSplit() { + return pigSplit.getWrappedSplit(); + } + + @Override + public InputSplit getWrappedSplit(int idx) { + return pigSplit.getWrappedSplit(idx); + } + + @Override + public long getLength(int idx) throws IOException, InterruptedException { + return pigSplit.getLength(idx); + } + + @Override + public void readFields(DataInput is) throws IOException { + pigSplit.readFields(is); + } + + @Override + public void write(DataOutput os) throws IOException { + pigSplit.write(os); + } + + @Override + public int getSplitIndex() { + return pigSplit.getSplitIndex(); + } + + @Override + public void setMultiInputs(boolean b) { + pigSplit.setMultiInputs(b); + } + + @Override + public boolean isMultiInputs() { + return pigSplit.isMultiInputs(); + } + + @Override + public Configuration getConf() { + return pigSplit.getConf(); + } + + @Override + public void setConf(Configuration conf) { + pigSplit.setConf(conf); + } + + @Override + public int getNumPaths() { + return pigSplit.getNumPaths(); + } + + @Override + public void setDisableCounter(boolean disableCounter) { + pigSplit.setDisableCounter(disableCounter); + } + + @Override + public boolean disableCounter() { + return pigSplit.disableCounter(); + } + + @Override + public void setCurrentIdx(int idx) { + pigSplit.setCurrentIdx(idx); + } + + @Override + public PigSplit getWrappedPigSplit() { + return this.pigSplit; + } + + @Override + public Path getPath() { + return ((FileSplit)getWrappedPigSplit().getWrappedSplit()).getPath(); + } + } + + public static class GenericSparkPigSplit extends InputSplit implements SparkPigSplit { + + private static final long serialVersionUID = 1L; + + private PigSplit pigSplit; + + /** + * Spark executor's deserializer calls this, and we have to instantiate a default wrapped object + */ + public GenericSparkPigSplit() { + pigSplit = new PigSplit(); + } + + public GenericSparkPigSplit(PigSplit pigSplit) { + this.pigSplit = pigSplit; + } + + @Override + public SplitLocationInfo[] getLocationInfo() throws IOException { + return pigSplit.getLocationInfo(); + } + + @Override + public String toString() { + return pigSplit.toString(); + } + + @Override + public long getLength() throws IOException, InterruptedException { + return pigSplit.getLength(); + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return pigSplit.getLocations(); + } + + @Override + public InputSplit getWrappedSplit() { + return pigSplit.getWrappedSplit(); + } + + @Override + public InputSplit getWrappedSplit(int idx) { + return pigSplit.getWrappedSplit(idx); + } + + @Override + public long getLength(int idx) throws IOException, InterruptedException { + return pigSplit.getLength(idx); + } + + @Override + public void readFields(DataInput is) throws IOException { + pigSplit.readFields(is); + } + + @Override + public void write(DataOutput os) throws IOException { + pigSplit.write(os); + } + + @Override + public int getSplitIndex() { + return pigSplit.getSplitIndex(); + } + + @Override + public void setMultiInputs(boolean b) { + pigSplit.setMultiInputs(b); + } + + @Override + public boolean isMultiInputs() { + return pigSplit.isMultiInputs(); + } + + @Override + public Configuration getConf() { + return pigSplit.getConf(); + } + + @Override + public void setConf(Configuration conf) { + pigSplit.setConf(conf); + } + + @Override + public int getNumPaths() { + return pigSplit.getNumPaths(); + } + + @Override + public void setDisableCounter(boolean disableCounter) { + pigSplit.setDisableCounter(disableCounter); + } + + @Override + public boolean disableCounter() { + return pigSplit.disableCounter(); + } + + @Override + public void setCurrentIdx(int idx) { + pigSplit.setCurrentIdx(idx); + } + + @Override + public PigSplit getWrappedPigSplit() { + return this.pigSplit; + } + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java Mon May 29 15:00:39 2017 @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import scala.Product2; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.pig.PigConstants; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.spark.HashPartitioner; +import org.apache.spark.Partitioner; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.rdd.RDD; + +public class SparkUtil { + public static <T> ClassTag<T> getManifest(Class<T> clazz) { + return ClassTag$.MODULE$.apply(clazz); + } + + @SuppressWarnings("unchecked") + public static <K, V> ClassTag<Tuple2<K, V>> getTuple2Manifest() { + return (ClassTag<Tuple2<K, V>>) (Object) getManifest(Tuple2.class); + } + + @SuppressWarnings("unchecked") + public static <K, V> ClassTag<Product2<K, V>> getProduct2Manifest() { + return (ClassTag<Product2<K, V>>) (Object) getManifest(Product2.class); + } + + public static JobConf newJobConf(PigContext pigContext, PhysicalPlan physicalPlan, SparkEngineConf sparkEngineConf) throws + IOException { + JobConf jobConf = new JobConf( + ConfigurationUtil.toConfiguration(pigContext.getProperties())); + //Serialize the thread local variable UDFContext#udfConfs, UDFContext#clientSysProps and PigContext#packageImportList + //inside SparkEngineConf separately + jobConf.set("spark.engine.conf",ObjectSerializer.serialize(sparkEngineConf)); + //Serialize the PigContext so it's available in Executor thread. + jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext)); + // Serialize the thread local variable inside PigContext separately + // Although after PIG-4920, we store udf.import.list in SparkEngineConf + // but we still need store it in jobConf because it will be used in PigOutputFormat#setupUdfEnvAndStores + jobConf.set("udf.import.list", + ObjectSerializer.serialize(PigContext.getPackageImportList())); + + Random rand = new Random(); + jobConf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, Integer.toString(rand.nextInt())); + jobConf.set(PigConstants.LOCAL_CODE_DIR, + System.getProperty("java.io.tmpdir")); + jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString()); + + LinkedList<POStore> stores = PlanHelper.getPhysicalOperators( + physicalPlan, POStore.class); + POStore firstStore = stores.getFirst(); + if (firstStore != null) { + MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext, + jobConf); + } + return jobConf; + } + + public static <T> Seq<T> toScalaSeq(List<T> list) { + return JavaConversions.asScalaBuffer(list); + } + + public static void assertPredecessorSize(List<RDD<Tuple>> predecessors, + PhysicalOperator physicalOperator, int size) { + if (predecessors.size() != size) { + throw new RuntimeException("Should have " + size + + " predecessors for " + physicalOperator.getClass() + + ". Got : " + predecessors.size()); + } + } + + public static void assertPredecessorSizeGreaterThan( + List<RDD<Tuple>> predecessors, PhysicalOperator physicalOperator, + int size) { + if (predecessors.size() <= size) { + throw new RuntimeException("Should have greater than" + size + + " predecessors for " + physicalOperator.getClass() + + ". Got : " + predecessors.size()); + } + } + + public static Partitioner getPartitioner(String customPartitioner, int parallelism) { + if (customPartitioner == null) { + return new HashPartitioner(parallelism); + } else { + return new MapReducePartitionerWrapper(customPartitioner, parallelism); + } + } + + // createIndexerSparkNode is a utility to create an indexer spark node with baseSparkOp + static public void createIndexerSparkNode(SparkOperator baseSparkOp, String scope, NodeIdGenerator nig) throws PlanException, ExecException { + List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>(); + PhysicalPlan ep = new PhysicalPlan(); + POProject prj = new POProject(new OperatorKey(scope, + nig.getNextNodeId(scope))); + prj.setStar(true); + prj.setOverloaded(false); + prj.setResultType(DataType.TUPLE); + ep.add(prj); + eps.add(ep); + + List<Boolean> ascCol = new ArrayList<Boolean>(); + ascCol.add(true); + + int requestedParallelism = baseSparkOp.requestedParallelism; + POSort sort = new POSort(new OperatorKey(scope, nig.getNextNodeId(scope)), requestedParallelism, null, eps, ascCol, null); + //POSort is added to sort the index tuples genereated by MergeJoinIndexer.More detail, see PIG-4601 + baseSparkOp.physicalPlan.addAsLeaf(sort); + } + + + +} \ No newline at end of file Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java Mon May 29 15:00:39 2017 @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.JarManager; + +//find udf jars which will be downloaded with spark job on every nodes +public class UDFJarsFinder extends SparkOpPlanVisitor { + private PigContext pigContext = null; + private Set<String> udfJars = new HashSet(); + + public UDFJarsFinder(SparkOperPlan plan, PigContext pigContext) { + super(plan, new DependencyOrderWalker(plan)); + this.pigContext = pigContext; + } + + public void visitSparkOp(SparkOperator sparkOp) + throws VisitorException { + for (String udf : sparkOp.UDFs) { + try { + Class clazz = this.pigContext.getClassForAlias(udf); + if (clazz != null) { + String jar = JarManager.findContainingJar(clazz); + if (jar != null) { + this.udfJars.add(jar); + } + } + } catch (IOException e) { + throw new VisitorException(e); + } + } + } + + public Set<String> getUdfJars() { + return this.udfJars; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.rdd.RDD; + +import java.util.List; + +public class BroadcastConverter implements RDDConverter<Tuple, Tuple, POBroadcastSpark> { + + private final JavaSparkContext sc; + + public BroadcastConverter(JavaSparkContext sc) { + this.sc = sc; + } + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POBroadcastSpark po) { + SparkUtil.assertPredecessorSize(predecessors, po, 1); + RDD<Tuple> rdd = predecessors.get(0); + + // Just collect the predecessor RDD, and broadcast it + JavaRDD<Tuple> javaRDD = new JavaRDD<>(rdd, SparkUtil.getManifest(Tuple.class)); + Broadcast<List<Tuple>> broadcastedRDD = sc.broadcast(javaRDD.collect()); + + // Save the broadcast variable to broadcastedVars map, so that this + // broadcasted variable can be referenced by the driver client. + SparkPigContext.get().getBroadcastedVars().put(po.getBroadcastedVariableName(), broadcastedRDD); + + return rdd; + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.rdd.RDD; + +@SuppressWarnings({"serial"}) +public class CollectedGroupConverter implements RDDConverter<Tuple, Tuple, POCollectedGroup> { + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POCollectedGroup physicalOperator) throws IOException { + SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1); + RDD<Tuple> rdd = predecessors.get(0); + CollectedGroupFunction collectedGroupFunction + = new CollectedGroupFunction(physicalOperator); + return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true) + .rdd(); + } + + private static class CollectedGroupFunction + implements FlatMapFunction<Iterator<Tuple>, Tuple> { + + private POCollectedGroup poCollectedGroup; + + public long current_val; + public boolean proceed; + + private CollectedGroupFunction(POCollectedGroup poCollectedGroup) { + this.poCollectedGroup = poCollectedGroup; + this.current_val = 0; + } + + public Iterable<Tuple> call(final Iterator<Tuple> input) { + + return new Iterable<Tuple>() { + + @Override + public Iterator<Tuple> iterator() { + + return new OutputConsumerIterator(input) { + + @Override + protected void attach(Tuple tuple) { + poCollectedGroup.setInputs(null); + poCollectedGroup.attachInput(tuple); + } + + @Override + protected Result getNextResult() throws ExecException { + return poCollectedGroup.getNextTuple(); + } + + @Override + protected void endOfInput() { + poCollectedGroup.setEndOfInput(true); + } + }; + } + }; + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.rdd.RDD; + +public class CounterConverter implements RDDConverter<Tuple, Tuple, POCounter> { + + private static final Log LOG = LogFactory.getLog(CounterConverter.class); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + POCounter poCounter) throws IOException { + SparkUtil.assertPredecessorSize(predecessors, poCounter, 1); + RDD<Tuple> rdd = predecessors.get(0); + CounterConverterFunction f = new CounterConverterFunction(poCounter); + JavaRDD<Tuple> jRdd = rdd.toJavaRDD().mapPartitionsWithIndex(f, true); +// jRdd = jRdd.cache(); + return jRdd.rdd(); + } + + @SuppressWarnings("serial") + private static class CounterConverterFunction implements + Function2<Integer, Iterator<Tuple>, Iterator<Tuple>>, Serializable { + + private final POCounter poCounter; + private long localCount = 1L; + private long sparkCount = 0L; + + private CounterConverterFunction(POCounter poCounter) { + this.poCounter = poCounter; + } + + @Override + public Iterator<Tuple> call(Integer index, final + Iterator<Tuple> input) { + Tuple inp = null; + Tuple output = null; + long sizeBag = 0L; + + List<Tuple> listOutput = new ArrayList<Tuple>(); + + try { + while (input.hasNext()) { + inp = input.next(); + output = TupleFactory.getInstance() + .newTuple(inp.getAll().size() + 3); + + for (int i = 0; i < inp.getAll().size(); i++) { + output.set(i + 3, inp.get(i)); + } + + if (poCounter.isRowNumber() || poCounter.isDenseRank()) { + output.set(2, getLocalCounter()); + incrementSparkCounter(); + incrementLocalCounter(); + } else if (!poCounter.isDenseRank()) { + int positionBag = inp.getAll().size()-1; + if (inp.getType(positionBag) == DataType.BAG) { + sizeBag = ((org.apache.pig.data.DefaultAbstractBag) + inp.get(positionBag)).size(); + } + + output.set(2, getLocalCounter()); + + addToSparkCounter(sizeBag); + addToLocalCounter(sizeBag); + } + + output.set(0, index); + output.set(1, getSparkCounter()); + listOutput.add(output); + } + } catch(ExecException e) { + throw new RuntimeException(e); + } + + + return listOutput.iterator(); + } + + private long getLocalCounter() { + return localCount; + } + + private long incrementLocalCounter() { + return localCount++; + } + + private long addToLocalCounter(long amount) { + return localCount += amount; + } + + private long getSparkCounter() { + return sparkCount; + } + + private long incrementSparkCounter() { + return sparkCount++; + } + + private long addToSparkCounter(long amount) { + return sparkCount += amount; + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.converter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.Tuple; +import org.apache.spark.rdd.PairRDDFunctions; +import org.apache.spark.rdd.RDD; + +import scala.Tuple2; +import scala.runtime.AbstractFunction1; +import scala.runtime.AbstractFunction2; + +@SuppressWarnings({ "serial" }) +public class DistinctConverter implements RDDConverter<Tuple, Tuple, PODistinct> { + private static final Log LOG = LogFactory.getLog(DistinctConverter.class); + + @Override + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, + PODistinct op) throws IOException { + SparkUtil.assertPredecessorSize(predecessors, op, 1); + RDD<Tuple> rdd = predecessors.get(0); + + // In DISTINCT operation, the key is the entire tuple. + // RDD<Tuple> -> RDD<Tuple2<Tuple, null>> + RDD<Tuple2<Tuple, Object>> keyValRDD = rdd.map(new ToKeyValueFunction(), + SparkUtil.<Tuple, Object> getTuple2Manifest()); + PairRDDFunctions<Tuple, Object> pairRDDFunctions + = new PairRDDFunctions<Tuple, Object>(keyValRDD, + SparkUtil.getManifest(Tuple.class), + SparkUtil.getManifest(Object.class), null); + int parallelism = SparkPigContext.get().getParallelism(predecessors, op); + return pairRDDFunctions.reduceByKey( + SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism), + new MergeValuesFunction()) + .map(new ToValueFunction(), SparkUtil.getManifest(Tuple.class)); + } + + /** + * Tuple -> Tuple2<Tuple, null> + */ + private static final class ToKeyValueFunction extends + AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements + Serializable { + @Override + public Tuple2<Tuple, Object> apply(Tuple t) { + Tuple key = t; + Object value = null; + Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value); + return out; + } + } + + /** + * No-op + */ + private static final class MergeValuesFunction extends + AbstractFunction2<Object, Object, Object> implements Serializable { + @Override + public Object apply(Object arg0, Object arg1) { + return null; + } + } + + /** + * Tuple2<Tuple, null> -> Tuple + */ + private static final class ToValueFunction extends + AbstractFunction1<Tuple2<Tuple, Object>, Tuple> implements + Serializable { + @Override + public Tuple apply(Tuple2<Tuple, Object> input) { + return input._1; + } + } +} \ No newline at end of file