Author: szita Date: Mon May 29 15:00:39 2017 New Revision: 1796639 URL: http://svn.apache.org/viewvc?rev=1796639&view=rev Log: PIG-4059: Pig On Spark
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java pig/trunk/src/org/apache/pig/tools/pigstats/PigWarnCounter.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java pig/trunk/test/e2e/pig/conf/spark.conf pig/trunk/test/excluded-tests-spark pig/trunk/test/org/apache/pig/spark/ pig/trunk/test/org/apache/pig/spark/TestIndexedKey.java pig/trunk/test/org/apache/pig/spark/TestSecondarySortSpark.java pig/trunk/test/org/apache/pig/spark/TestSparkCompiler.java pig/trunk/test/org/apache/pig/test/SparkMiniCluster.java pig/trunk/test/org/apache/pig/test/YarnMiniCluster.java pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld Modified: pig/trunk/CHANGES.txt pig/trunk/bin/pig pig/trunk/build.xml pig/trunk/ivy.xml pig/trunk/ivy/libraries.properties pig/trunk/src/META-INF/services/org.apache.pig.ExecType pig/trunk/src/docs/src/documentation/content/xdocs/start.xml pig/trunk/src/org/apache/pig/PigConfiguration.java pig/trunk/src/org/apache/pig/PigWarning.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java pig/trunk/src/org/apache/pig/data/SelfSpillBag.java pig/trunk/src/org/apache/pig/impl/PigContext.java pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java pig/trunk/src/org/apache/pig/impl/util/UDFContext.java pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java pig/trunk/test/e2e/pig/build.xml pig/trunk/test/e2e/pig/drivers/TestDriverPig.pm pig/trunk/test/e2e/pig/tests/bigdata.conf pig/trunk/test/e2e/pig/tests/cmdline.conf pig/trunk/test/e2e/pig/tests/grunt.conf pig/trunk/test/e2e/pig/tests/hcat.conf pig/trunk/test/e2e/pig/tests/multiquery.conf pig/trunk/test/e2e/pig/tests/negative.conf pig/trunk/test/e2e/pig/tests/nightly.conf pig/trunk/test/e2e/pig/tests/orc.conf pig/trunk/test/e2e/pig/tests/streaming.conf pig/trunk/test/e2e/pig/tests/turing_jython.conf pig/trunk/test/e2e/pig/tools/test/floatpostprocessor.pl pig/trunk/test/excluded-tests-mr pig/trunk/test/excluded-tests-tez pig/trunk/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java pig/trunk/test/org/apache/pig/pigunit/PigTest.java pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java pig/trunk/test/org/apache/pig/test/TestAssert.java pig/trunk/test/org/apache/pig/test/TestCase.java pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java pig/trunk/test/org/apache/pig/test/TestCombiner.java pig/trunk/test/org/apache/pig/test/TestCubeOperator.java pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java pig/trunk/test/org/apache/pig/test/TestFinish.java pig/trunk/test/org/apache/pig/test/TestFlatten.java pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java pig/trunk/test/org/apache/pig/test/TestGrunt.java pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java pig/trunk/test/org/apache/pig/test/TestLimitVariable.java pig/trunk/test/org/apache/pig/test/TestLineageFindRelVisitor.java pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java pig/trunk/test/org/apache/pig/test/TestMultiQuery.java pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java pig/trunk/test/org/apache/pig/test/TestNullConstant.java pig/trunk/test/org/apache/pig/test/TestPigRunner.java pig/trunk/test/org/apache/pig/test/TestPigServer.java pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java pig/trunk/test/org/apache/pig/test/TestProjectRange.java pig/trunk/test/org/apache/pig/test/TestPruneColumn.java pig/trunk/test/org/apache/pig/test/TestRank1.java pig/trunk/test/org/apache/pig/test/TestRank2.java pig/trunk/test/org/apache/pig/test/TestRank3.java pig/trunk/test/org/apache/pig/test/TestSecondarySort.java pig/trunk/test/org/apache/pig/test/TestStoreBase.java pig/trunk/test/org/apache/pig/test/TezMiniCluster.java pig/trunk/test/org/apache/pig/test/Util.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Mon May 29 15:00:39 2017 @@ -36,6 +36,8 @@ PIG-5067: Revisit union on numeric type  IMPROVEMENTS +PIG-4059: Pig On Spark + PIG-5188: Review pig-index.xml (szita) PIG-4924: Translate failures.maxpercent MR setting to Tez Tez (rohini) Modified: pig/trunk/bin/pig URL: http://svn.apache.org/viewvc/pig/trunk/bin/pig?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/bin/pig (original) +++ pig/trunk/bin/pig Mon May 29 15:00:39 2017 @@ -57,6 +57,21 @@ remaining=() includeHCatalog=""; addJarString=-Dpig.additional.jars.uris\=; additionalJars=""; +prevArgExecType=false; +isSparkMode=false; +isSparkLocalMode=false; + +#verify the execType is SPARK or SPARK_LOCAL or not +function processExecType(){ + execType=$1 + execTypeUpperCase=$(echo $execType |tr [a-z] [A-Z]) + if [[ "$execTypeUpperCase" == "SPARK" ]]; then + isSparkMode=true + elif [[ "$execTypeUpperCase" == "SPARK_LOCAL" ]]; then + isSparkLocalMode=true + fi +} + # filter command line parameter for f in "$@"; do if [[ $f == "-secretDebugCmd" || $f == "-printCmdDebug" ]]; then @@ -70,6 +85,13 @@ for f in "$@"; do includeHCatalog=true; elif [[ "$includeHCatalog" == "true" && $f == $addJarString* ]]; then additionalJars=`echo $f | sed s/$addJarString//` + elif [[ "$f" == "-x" || "$f" == "-exectype" ]]; then + prevArgExecType=true; + remaining[${#remaining[@]}]="$f" + elif [[ "$prevArgExecType" == "true" ]]; then + prevArgExecType=false; + processExecType $f + remaining[${#remaining[@]}]="$f" else remaining[${#remaining[@]}]="$f" fi @@ -362,6 +384,44 @@ if [ "$includeHCatalog" == "true" ]; the PIG_OPTS="$PIG_OPTS -Dpig.additional.jars.uris=$ADDITIONAL_CLASSPATHS" fi +################# ADDING SPARK DEPENDENCIES ################## +# For spark_local mode: +if [ "$isSparkLocalMode" == "true" ]; then +#SPARK_MASTER is forced to be "local" in spark_local mode + SPARK_MASTER="local" + for f in $PIG_HOME/lib/spark/*.jar; do + CLASSPATH=${CLASSPATH}:$f; + done +fi + +# For spark mode: +# Please specify SPARK_HOME first so that we can locate $SPARK_HOME/lib/spark-assembly*.jar, +# we will add spark-assembly*.jar to the classpath. +if [ "$isSparkMode" == "true" ]; then + if [ -z "$SPARK_HOME" ]; then + echo "Error: SPARK_HOME is not set!" + exit 1 + fi + + # Please specify SPARK_JAR which is the hdfs path of spark-assembly*.jar to allow YARN to cache spark-assembly*.jar on nodes so that it doesn't need to be distributed each time an application runs. + if [ -z "$SPARK_JAR" ]; then + echo "Error: SPARK_JAR is not set, SPARK_JAR stands for the hdfs location of spark-assembly*.jar. This allows YARN to cache spark-assembly*.jar on nodes so that it doesn't need to be distributed each time an application runs." + exit 1 + fi + + if [ -n "$SPARK_HOME" ]; then + echo "Using Spark Home: " ${SPARK_HOME} + SPARK_ASSEMBLY_JAR=`ls ${SPARK_HOME}/lib/spark-assembly*` + CLASSPATH=${CLASSPATH}:$SPARK_ASSEMBLY_JAR + fi +fi + +#spark-assembly.jar contains jcl-over-slf4j which would create a LogFactory implementation that is incompatible +if [ "$isSparkMode" == "true" ]; then + PIG_OPTS="$PIG_OPTS -Dorg.apache.commons.logging.LogFactory=org.apache.commons.logging.impl.LogFactoryImpl" +fi +################# ADDING SPARK DEPENDENCIES ################## + # run it if [ -n "$HADOOP_BIN" ]; then if [ "$debug" == "true" ]; then Modified: pig/trunk/build.xml URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/build.xml (original) +++ pig/trunk/build.xml Mon May 29 15:00:39 2017 @@ -46,6 +46,7 @@ <!-- source properties --> <property name="lib.dir" value="${basedir}/lib" /> + <property name="spark.lib.dir" value="${basedir}/lib/spark" /> <property name="src.dir" value="${basedir}/src" /> <property name="python.src.dir" value="${src.dir}/python" /> <property name="src.lib.dir" value="${basedir}/lib-src" /> @@ -106,9 +107,12 @@ <property name="test.unit.file" value="${test.src.dir}/unit-tests"/> <property name="test.smoke.file" value="${test.src.dir}/smoke-tests"/> <property name="test.all.file" value="${test.src.dir}/all-tests"/> + <property name="test.spark.file" value="${test.src.dir}/spark-tests"/> + <property name="test.spark_local.file" value="${test.src.dir}/spark-local-tests"/> <property name="test.exclude.file" value="${test.src.dir}/excluded-tests"/> <property name="test.exclude.file.mr" value="${test.src.dir}/excluded-tests-mr"/> <property name="test.exclude.file.tez" value="${test.src.dir}/excluded-tests-tez"/> + <property name="test.exclude.file.spark" value="${test.src.dir}/excluded-tests-spark"/> <property name="pigunit.jarfile" value="pigunit.jar" /> <property name="piggybank.jarfile" value="${basedir}/contrib/piggybank/java/piggybank.jar" /> <property name="smoke.tests.jarfile" value="${build.dir}/${final.name}-smoketests.jar" /> @@ -160,6 +164,10 @@ <propertyreset name="test.exec.type" value="tez" /> </target> + <target name="setSparkEnv"> + <propertyreset name="test.exec.type" value="spark" /> + </target> + <target name="setWindowsPath" if="${isWindows}"> <property name="build.path" value="${env.Path};${hadoop.root}\bin" /> </target> @@ -253,6 +261,7 @@ <property name="build.ivy.dir" location="${build.dir}/ivy" /> <property name="build.ivy.lib.dir" location="${build.ivy.dir}/lib" /> <property name="ivy.lib.dir" location="${build.ivy.lib.dir}/${ant.project.name}"/> + <property name="ivy.lib.dir.spark" location="${ivy.lib.dir}/spark" /> <property name="build.ivy.report.dir" location="${build.ivy.dir}/report" /> <property name="build.ivy.maven.dir" location="${build.ivy.dir}/maven" /> <property name="pom.xml" location="${build.ivy.maven.dir}/pom.xml"/> @@ -322,6 +331,9 @@ <fileset dir="${ivy.lib.dir}"> <include name="**.*jar"/> </fileset> + <fileset dir="${ivy.lib.dir.spark}"> + <include name="**.*jar"/> + </fileset> </path> <taskdef name="eclipse" classname="prantl.ant.eclipse.EclipseTask" @@ -352,6 +364,7 @@ <path id="classpath"> <fileset file="${ivy.lib.dir}/${zookeeper.jarfile}"/> <fileset dir="${ivy.lib.dir}" includes="*.jar"/> + <fileset dir="${ivy.lib.dir.spark}" includes="*.jar"/> </path> <!-- javadoc-classpath --> @@ -370,6 +383,7 @@ <fileset dir="${ivy.lib.dir}" id="core.dependencies.jar"> <exclude name="**.*jar"/> + <exclude name="spark/**.*jar"/> </fileset> <fileset dir="${ivy.lib.dir}" id="runtime.dependencies-withouthadoop.jar"> @@ -678,6 +692,7 @@ <buildJar svnString="${svn.revision}" outputFile="${output.jarfile.core}" includedJars="core.dependencies.jar"/> <buildJar svnString="${svn.revision}" outputFile="${output.jarfile.withouthadoop}" includedJars="runtime.dependencies-withouthadoop.jar"/> <antcall target="copyCommonDependencies"/> + <antcall target="copySparkDependencies"/> <antcall target="copyh2Dependencies"/> <antcall target="copyHadoop2LocalRuntimeDependencies" /> </target> @@ -715,9 +730,17 @@ <fileset dir="${ivy.lib.dir}" includes="httpdlog-*-${basjes-httpdlog-pigloader.version}.jar"/> <fileset dir="${ivy.lib.dir}" includes="parser-core-${basjes-httpdlog-pigloader.version}.jar"/> <fileset dir="${ivy.lib.dir}" includes="ivy-*.jar"/> + <fileset dir="${ivy.lib.dir}" includes="commons-logging-*.jar"/> </copy> </target> + <target name="copySparkDependencies"> + <mkdir dir="${spark.lib.dir}" /> + <copy todir="${spark.lib.dir}"> + <fileset dir="${ivy.lib.dir.spark}" includes="*.jar"/> + </copy> + </target> + <target name="copyh2Dependencies" if="isHadoop2"> <mkdir dir="${lib.dir}/h2" /> <copy todir="${lib.dir}/h2"> @@ -856,7 +879,12 @@ <macro-test-runner test.file="${test.all.file}" tests.failed="test-tez.failed"/> <fail if="test-tez.failed">Tests failed!</fail> </target> - + + <target name="test-spark" depends="setSparkEnv,setWindowsPath,setLinuxPath,compile-test,jar,debugger.check,jackson-pig-3039-test-download" description="Run Spark unit tests in Spark cluster-local mode"> + <macro-test-runner test.file="${test.all.file}" tests.failed="test-spark.failed"/> + <fail if="test-spark.failed">Tests failed!</fail> + </target> + <target name="debugger.check" depends="debugger.set,debugger.unset"/> <target name="debugger.set" if="debugPort"> <property name="debugArgs" value="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=${debugPort}"/> @@ -881,9 +909,6 @@ <sysproperty key="test.exec.type" value="${test.exec.type}" /> <sysproperty key="ssh.gateway" value="${ssh.gateway}" /> <sysproperty key="hod.server" value="${hod.server}" /> - <sysproperty key="build.classes" value="${build.classes}" /> - <sysproperty key="test.build.classes" value="${test.build.classes}" /> - <sysproperty key="ivy.lib.dir" value="${ivy.lib.dir}" /> <sysproperty key="java.io.tmpdir" value="${junit.tmp.dir}" /> <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/> <jvmarg line="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=128M ${debugArgs} -Djava.library.path=${hadoop.root}\bin"/> @@ -978,6 +1003,10 @@ <ant dir="${test.e2e.dir}" target="test-tez"/> </target> + <target name="test-e2e-spark" depends="jar, piggybank" description="run end-to-end tests in spark mode"> + <ant dir="${test.e2e.dir}" target="test-spark"/> + </target> + <target name="test-e2e-deploy" depends="jar" description="deploy end-to-end tests to existing cluster"> <ant dir="${test.e2e.dir}" target="deploy"/> </target> @@ -1624,6 +1653,8 @@ <target name="ivy-compile" depends="ivy-resolve" description="Retrieve Ivy-managed artifacts for compile configuration"> <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="compile"/> + <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}" + pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark"/> <ivy:cachepath pathid="compile.classpath" conf="compile"/> </target> Modified: pig/trunk/ivy.xml URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/ivy.xml (original) +++ pig/trunk/ivy.xml Mon May 29 15:00:39 2017 @@ -40,6 +40,7 @@ <conf name="buildJar" extends="compile,test" visibility="private"/> <conf name="hadoop2" visibility="private"/> <conf name="hbase1" visibility="private"/> + <conf name="spark" visibility="private" /> </configurations> <publications> <artifact name="pig" conf="master"/> @@ -406,6 +407,24 @@ <dependency org="com.twitter" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/> + <!-- for Spark integration --> + <dependency org="org.apache.spark" name="spark-core_2.11" rev="${spark.version}" conf="spark->default"> + <exclude org="org.eclipse.jetty.orbit" module="javax.servlet"/> + <exclude org="org.eclipse.jetty.orbit" module="javax.transaction"/> + <exclude org="org.eclipse.jetty.orbit" module="javax.mail.glassfish"/> + <exclude org="org.eclipse.jetty.orbit" module="javax.activation"/> + <exclude org="org.apache.hadoop" /> + <exclude org="com.esotericsoftware.kryo" /> + <exclude org="jline" module="jline"/> + <exclude org="com.google.guava" /> + </dependency> + <dependency org="org.apache.spark" name="spark-yarn_2.11" rev="${spark.version}" conf="spark->default"> + <exclude org="org.apache.hadoop" /> + </dependency> + <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->master"/> + <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" conf="spark->default"/> + <dependency org="org.scala-lang.modules" name="scala-xml_2.11" rev="${scala-xml.version}" conf="spark->default"/> + <!-- for Tez integration --> <dependency org="org.apache.tez" name="tez" rev="${tez.version}" conf="hadoop2->master"/> Modified: pig/trunk/ivy/libraries.properties URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/ivy/libraries.properties (original) +++ pig/trunk/ivy/libraries.properties Mon May 29 15:00:39 2017 @@ -73,6 +73,7 @@ netty-all.version=4.0.23.Final rats-lib.version=0.5.1 slf4j-api.version=1.6.1 slf4j-log4j12.version=1.6.1 +spark.version=1.6.1 xerces.version=2.10.0 xalan.version=2.7.1 wagon-http.version=1.0-beta-2 @@ -88,7 +89,7 @@ jsr311-api.version=1.1.1 mockito.version=1.8.4 jansi.version=1.9 asm.version=3.3.1 -snappy-java.version=1.1.0.1 +snappy-java.version=1.1.1.3 tez.version=0.7.0 parquet-pig-bundle.version=1.2.3 snappy.version=0.2 @@ -96,3 +97,4 @@ leveldbjni.version=1.8 curator.version=2.6.0 htrace.version=3.1.0-incubating commons-lang3.version=3.1 +scala-xml.version=1.0.5 Modified: pig/trunk/src/META-INF/services/org.apache.pig.ExecType URL: http://svn.apache.org/viewvc/pig/trunk/src/META-INF/services/org.apache.pig.ExecType?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/META-INF/services/org.apache.pig.ExecType (original) +++ pig/trunk/src/META-INF/services/org.apache.pig.ExecType Mon May 29 15:00:39 2017 @@ -15,4 +15,5 @@ org.apache.pig.backend.hadoop.executione org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecType org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType org.apache.pig.backend.hadoop.executionengine.tez.TezExecType - +org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType +org.apache.pig.backend.hadoop.executionengine.spark.SparkLocalExecType Modified: pig/trunk/src/docs/src/documentation/content/xdocs/start.xml URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/start.xml?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/docs/src/documentation/content/xdocs/start.xml (original) +++ pig/trunk/src/docs/src/documentation/content/xdocs/start.xml Mon May 29 15:00:39 2017 @@ -26,45 +26,45 @@ <!-- SET UP PIG --> <section> - <title>Pig Setup</title> - + <title>Pig Setup</title> + <!-- ++++++++++++++++++++++++++++++++++ --> <section id="req"> <title>Requirements</title> <p><strong>Mandatory</strong></p> <p>Unix and Windows users need the following:</p> - <ul> - <li> <strong>Hadoop 2.X</strong> - <a href="http://hadoop.apache.org/common/releases.html">http://hadoop.apache.org/common/releases.html</a> (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to point to the directory where you have installed Hadoop. If you do not set HADOOP_HOME, by default Pig will run with the embedded version, currently Hadoop 2.7.3.)</li> - <li> <strong>Java 1.7</strong> - <a href="http://java.sun.com/javase/downloads/index.jsp">http://java.sun.com/javase/downloads/index.jsp</a> (set JAVA_HOME to the root of your Java installation)</li> - </ul> - <p></p> + <ul> + <li> <strong>Hadoop 2.X</strong> - <a href="http://hadoop.apache.org/common/releases.html">http://hadoop.apache.org/common/releases.html</a> (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to point to the directory where you have installed Hadoop. If you do not set HADOOP_HOME, by default Pig will run with the embedded version, currently Hadoop 2.7.3.)</li> + <li> <strong>Java 1.7</strong> - <a href="http://java.sun.com/javase/downloads/index.jsp">http://java.sun.com/javase/downloads/index.jsp</a> (set JAVA_HOME to the root of your Java installation)</li> + </ul> + <p></p> <p><strong>Optional</strong></p> - <ul> + <ul> <li> <strong>Python 2.7</strong> - <a href="http://jython.org/downloads.html">https://www.python.org</a> (when using Streaming Python UDFs) </li> <li> <strong>Ant 1.8</strong> - <a href="http://ant.apache.org/">http://ant.apache.org/</a> (for builds) </li> - </ul> + </ul> </section> <!-- ++++++++++++++++++++++++++++++++++ --> <section id="download"> <title>Download Pig</title> - <p>To get a Pig distribution, do the following:</p> - - <ol> - <li>Download a recent stable release from one of the Apache Download Mirrors - (see <a href="http://hadoop.apache.org/pig/releases.html"> Pig Releases</a>).</li> - + <p>To get a Pig distribution, do the following:</p> + + <ol> + <li>Download a recent stable release from one of the Apache Download Mirrors + (see <a href="http://hadoop.apache.org/pig/releases.html"> Pig Releases</a>).</li> + <li>Unpack the downloaded Pig distribution, and then note the following: - <ul> - <li>The Pig script file, pig, is located in the bin directory (/pig-n.n.n/bin/pig). - The Pig environment variables are described in the Pig script file.</li> - <li>The Pig properties file, pig.properties, is located in the conf directory (/pig-n.n.n/conf/pig.properties). - You can specify an alternate location using the PIG_CONF_DIR environment variable.</li> - </ul> - </li> - <li>Add /pig-n.n.n/bin to your path. Use export (bash,sh,ksh) or setenv (tcsh,csh). For example: <br></br> - <code>$ export PATH=/<my-path-to-pig>/pig-n.n.n/bin:$PATH</code> + <ul> + <li>The Pig script file, pig, is located in the bin directory (/pig-n.n.n/bin/pig). + The Pig environment variables are described in the Pig script file.</li> + <li>The Pig properties file, pig.properties, is located in the conf directory (/pig-n.n.n/conf/pig.properties). + You can specify an alternate location using the PIG_CONF_DIR environment variable.</li> + </ul> + </li> + <li>Add /pig-n.n.n/bin to your path. Use export (bash,sh,ksh) or setenv (tcsh,csh). For example: <br></br> + <code>$ export PATH=/<my-path-to-pig>/pig-n.n.n/bin:$PATH</code> </li> <li> Test the Pig installation with this simple command: <code>$ pig -help</code> @@ -78,10 +78,10 @@ Test the Pig installation with this simp <title>Build Pig</title> <p>To build pig, do the following:</p> <ol> - <li> Check out the Pig code from SVN: <code>svn co http://svn.apache.org/repos/asf/pig/trunk</code> </li> - <li> Build the code from the top directory: <code>ant</code> <br></br> - If the build is successful, you should see the pig.jar file created in that directory. </li> - <li> Validate the pig.jar by running a unit test: <code>ant test</code></li> + <li> Check out the Pig code from SVN: <code>svn co http://svn.apache.org/repos/asf/pig/trunk</code> </li> + <li> Build the code from the top directory: <code>ant</code> <br></br> + If the build is successful, you should see the pig.jar file created in that directory. </li> + <li> Validate the pig.jar by running a unit test: <code>ant test</code></li> </ol> </section> </section> @@ -90,46 +90,53 @@ Test the Pig installation with this simp <!-- RUNNING PIG --> <section id="run"> - <title>Running Pig </title> - <p>You can run Pig (execute Pig Latin statements and Pig commands) using various modes.</p> - <table> - <tr> - <td></td> + <title>Running Pig </title> + <p>You can run Pig (execute Pig Latin statements and Pig commands) using various modes.</p> + <table> + <tr> + <td></td> <td><strong>Local Mode</strong></td> <td><strong>Tez Local Mode</strong></td> + <td><strong>Spark Local Mode</strong></td> <td><strong>Mapreduce Mode</strong></td> <td><strong>Tez Mode</strong></td> - </tr> - <tr> - <td><strong>Interactive Mode </strong></td> + <td><strong>Spark Mode</strong></td> + </tr> + <tr> + <td><strong>Interactive Mode </strong></td> <td>yes</td> <td>experimental</td> <td>yes</td> <td>yes</td> - </tr> - <tr> - <td><strong>Batch Mode</strong> </td> + </tr> + <tr> + <td><strong>Batch Mode</strong> </td> <td>yes</td> <td>experimental</td> <td>yes</td> <td>yes</td> - </tr> - </table> - - <!-- ++++++++++++++++++++++++++++++++++ --> - <section id="execution-modes"> - <title>Execution Modes</title> -<p>Pig has two execution modes or exectypes: </p> + </tr> + </table> + + <!-- ++++++++++++++++++++++++++++++++++ --> + <section id="execution-modes"> + <title>Execution Modes</title> +<p>Pig has six execution modes or exectypes: </p> <ul> <li><strong>Local Mode</strong> - To run Pig in local mode, you need access to a single machine; all files are installed and run using your local host and file system. Specify local mode using the -x flag (pig -x local). </li> <li><strong>Tez Local Mode</strong> - To run Pig in tez local mode. It is similar to local mode, except internally Pig will invoke tez runtime engine. Specify Tez local mode using the -x flag (pig -x tez_local). <p><strong>Note:</strong> Tez local mode is experimental. There are some queries which just error out on bigger data in local mode.</p> </li> +<li><strong>Spark Local Mode</strong> - To run Pig in spark local mode. It is similar to local mode, except internally Pig will invoke spark runtime engine. Specify Spark local mode using the -x flag (pig -x spark_local). +<p><strong>Note:</strong> Spark local mode is experimental. There are some queries which just error out on bigger data in local mode.</p> +</li> <li><strong>Mapreduce Mode</strong> - To run Pig in mapreduce mode, you need access to a Hadoop cluster and HDFS installation. Mapreduce mode is the default mode; you can, <em>but don't need to</em>, specify it using the -x flag (pig OR pig -x mapreduce). </li> <li><strong>Tez Mode</strong> - To run Pig in Tez mode, you need access to a Hadoop cluster and HDFS installation. Specify Tez mode using the -x flag (-x tez). </li> +<li><strong>Spark Mode</strong> - To run Pig in Spark mode, you need access to a Spark, Yarn or Mesos cluster and HDFS installation. Specify Spark mode using the -x flag (-x spark). In Spark execution mode, it is necessary to set env::SPARK_MASTER to an appropriate value (local - local mode, yarn-client - yarn-client mode, mesos://host:port - spark on mesos or spark://host:port - spark cluster. For more information refer to spark documentation on Master URLs, <em>yarn-cluster mode is currently not supported</em>). Pig scripts run on Spark can take advantage of the <a href="http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation">dynamic allocation</a> feature. The feature can be enabled by simply enabling <em>spark.dynamicAllocation.enabled</em>. Refer to spark <a href="http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation">configuration</a> for additional configuration details. In general all properties in the pig script prefixed with <em>spark.</em> are copied to the Spark Application Configuration. Please note that Yarn auxillary service need to be enabled on Spark for this to work. See Spark documentation for additional details. +</li> </ul> <p></p> @@ -148,6 +155,9 @@ $ pig -x local ... /* Tez local mode */ $ pig -x tez_local ... +/* Spark local mode */ +$ pig -x spark_local ... + /* mapreduce mode */ $ pig ... or @@ -155,6 +165,9 @@ $ pig -x mapreduce ... /* Tez mode */ $ pig -x tez ... + +/* Spark mode */ +$ pig -x spark ... </source> </section> @@ -179,7 +192,7 @@ grunt> dump B; <source> $ pig -x local ... - Connecting to ... -grunt> +grunt> </source> <p><strong>Tez Local Mode</strong></p> @@ -189,6 +202,13 @@ $ pig -x tez_local grunt> </source> +<p><strong>Spark Local Mode</strong></p> +<source> +$ pig -x spark_local +... - Connecting to ... +grunt> +</source> + <p><strong>Mapreduce Mode</strong> </p> <source> $ pig -x mapreduce @@ -208,6 +228,14 @@ $ pig -x tez ... - Connecting to ... grunt> </source> + +<p><strong>Spark Mode</strong> </p> +<source> +$ pig -x spark +... - Connecting to ... +grunt> +</source> + </section> </section> @@ -237,6 +265,10 @@ $ pig -x local id.pig <source> $ pig -x tez_local id.pig </source> +<p><strong>Spark Local Mode</strong></p> +<source> +$ pig -x spark_local id.pig +</source> <p><strong>Mapreduce Mode</strong> </p> <source> $ pig id.pig @@ -247,22 +279,26 @@ $ pig -x mapreduce id.pig <source> $ pig -x tez id.pig </source> +<p><strong>Spark Mode</strong> </p> +<source> +$ pig -x spark id.pig +</source> </section> <!-- ==================================================================== --> - + <!-- PIG SCRIPTS --> <section id="pig-scripts"> - <title>Pig Scripts</title> - -<p>Use Pig scripts to place Pig Latin statements and Pig commands in a single file. While not required, it is good practice to identify the file using the *.pig extension.</p> - + <title>Pig Scripts</title> + +<p>Use Pig scripts to place Pig Latin statements and Pig commands in a single file. While not required, it is good practice to identify the file using the *.pig extension.</p> + <p>You can run Pig scripts from the command line and from the Grunt shell (see the <a href="cmds.html#run">run</a> and <a href="cmds.html#exec">exec</a> commands). </p> - + <p>Pig scripts allow you to pass values to parameters using <a href="cont.html#Parameter-Sub">parameter substitution</a>. </p> -<!-- +++++++++++++++++++++++++++++++++++++++++++ --> +<!-- +++++++++++++++++++++++++++++++++++++++++++ --> <p id="comments"><strong>Comments in Scripts</strong></p> <p>You can include comments in Pig scripts:</p> @@ -284,8 +320,8 @@ A = LOAD 'student' USING PigStorage() AS B = FOREACH A GENERATE name; -- transforming data DUMP B; -- retrieving results </source> - -<!-- +++++++++++++++++++++++++++++++++++++++++++ --> + +<!-- +++++++++++++++++++++++++++++++++++++++++++ --> <p id="dfs"><strong>Scripts and Distributed File Systems</strong></p> @@ -293,7 +329,7 @@ DUMP B; -- retrieving results <source> $ pig hdfs://nn.mydomain.com:9020/myscripts/script.pig </source> -</section> +</section> </section> </section> @@ -355,7 +391,7 @@ hadoop.security.krb5.keytab=/home/niels/ <!-- PIG LATIN STATEMENTS --> <section id="pl-statements"> - <title>Pig Latin Statements</title> + <title>Pig Latin Statements</title> <p>Pig Latin statements are the basic constructs you use to process data using Pig. A Pig Latin statement is an operator that takes a <a href="basic.html#relations">relation</a> as input and produces another relation as output. (This definition applies to all Pig Latin operators except LOAD and STORE which read data from and write data to the file system.) @@ -496,20 +532,20 @@ However, in a production environment you <p></p> <p id="pig-properties">To specify Pig properties use one of these mechanisms:</p> <ul> - <li>The pig.properties file (add the directory that contains the pig.properties file to the classpath)</li> - <li>The -D and a Pig property in PIG_OPTS environment variable (export PIG_OPTS=-Dpig.tmpfilecompression=true)</li> - <li>The -P command line option and a properties file (pig -P mypig.properties)</li> - <li>The <a href="cmds.html#set">set</a> command (set pig.exec.nocombiner true)</li> + <li>The pig.properties file (add the directory that contains the pig.properties file to the classpath)</li> + <li>The -D and a Pig property in PIG_OPTS environment variable (export PIG_OPTS=-Dpig.tmpfilecompression=true)</li> + <li>The -P command line option and a properties file (pig -P mypig.properties)</li> + <li>The <a href="cmds.html#set">set</a> command (set pig.exec.nocombiner true)</li> </ul> <p><strong>Note:</strong> The properties file uses standard Java property file format.</p> <p>The following precedence order is supported: pig.properties < -D Pig property < -P properties file < set command. This means that if the same property is provided using the âD command line option as well as the âP command line option (properties file), the value of the property in the properties file will take precedence.</p> <p id="hadoop-properties">To specify Hadoop properties you can use the same mechanisms:</p> <ul> - <li>Hadoop configuration files (include pig-cluster-hadoop-site.xml)</li> - <li>The -D and a Hadoop property in PIG_OPTS environment variable (export PIG_OPTS=âDmapreduce.task.profile=true) </li> - <li>The -P command line option and a property file (pig -P property_file)</li> - <li>The <a href="cmds.html#set">set</a> command (set mapred.map.tasks.speculative.execution false)</li> + <li>Hadoop configuration files (include pig-cluster-hadoop-site.xml)</li> + <li>The -D and a Hadoop property in PIG_OPTS environment variable (export PIG_OPTS=âDmapreduce.task.profile=true) </li> + <li>The -P command line option and a property file (pig -P property_file)</li> + <li>The <a href="cmds.html#set">set</a> command (set mapred.map.tasks.speculative.execution false)</li> </ul> <p></p> <p>The same precedence holds: Hadoop configuration files < -D Hadoop property < -P properties_file < set command.</p> @@ -523,7 +559,7 @@ However, in a production environment you <section id="tutorial"> <title>Pig Tutorial </title> -<p>The Pig tutorial shows you how to run Pig scripts using Pig's local mode, mapreduce mode and Tez mode (see <a href="#execution-modes">Execution Modes</a>).</p> +<p>The Pig tutorial shows you how to run Pig scripts using Pig's local mode, mapreduce mode, Tez mode and Spark mode (see <a href="#execution-modes">Execution Modes</a>).</p> <p>To get started, do the following preliminary tasks:</p> @@ -541,8 +577,8 @@ $ export PIG_HOME=/<my-path-to-pig> <li>Create the pigtutorial.tar.gz file: <ul> <li>Move to the Pig tutorial directory (.../pig-0.16.0/tutorial).</li> - <li>Run the "ant" command from the tutorial directory. This will create the pigtutorial.tar.gz file. - </li> + <li>Run the "ant" command from the tutorial directory. This will create the pigtutorial.tar.gz file. + </li> </ul> </li> @@ -574,6 +610,10 @@ Or if you are using Tez local mode: <source> $ pig -x tez_local script1-local.pig </source> +Or if you are using Spark local mode: +<source> +$ pig -x spark_local script1-local.pig +</source> </li> <li>Review the result files, located in the script1-local-results.txt directory. <p>The output may contain a few Hadoop warnings which can be ignored:</p> @@ -587,7 +627,7 @@ $ pig -x tez_local script1-local.pig <!-- ++++++++++++++++++++++++++++++++++ --> <section> -<title> Running the Pig Scripts in Mapreduce Mode or Tez Mode</title> +<title> Running the Pig Scripts in Mapreduce Mode, Tez Mode or Spark Mode</title> <p>To run the Pig scripts in mapreduce mode, do the following: </p> <ol> @@ -606,6 +646,8 @@ export PIG_CLASSPATH=/mycluster/conf <source> export PIG_CLASSPATH=/mycluster/conf:/tez/conf </source> +<p>If you are using Spark, you will also need to specify SPARK_HOME and specify SPARK_JAR which is the hdfs location where you uploaded $SPARK_HOME/lib/spark-assembly*.jar:</p> +<source>export SPARK_HOME=/mysparkhome/; export SPARK_JAR=hdfs://example.com:8020/spark-assembly*.jar</source> <p><strong>Note:</strong> The PIG_CLASSPATH can also be used to add any other 3rd party dependencies or resource files a pig script may require. If there is also a need to make the added entries take the highest precedence in the Pig JVM's classpath order, one may also set the env-var PIG_USER_CLASSPATH_FIRST to any value, such as 'true' (and unset the env-var to disable).</p></li> <li>Set the HADOOP_CONF_DIR environment variable to the location of the cluster configuration directory: <source> @@ -620,6 +662,10 @@ Or if you are using Tez: <source> $ pig -x tez script1-hadoop.pig </source> +Or if you are using Spark: +<source> +$ pig -x spark script1-hadoop.pig +</source> </li> <li>Review the result files, located in the script1-hadoop-results or script2-hadoop-results HDFS directory: Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/PigConfiguration.java (original) +++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon May 29 15:00:39 2017 @@ -482,6 +482,11 @@ public class PigConfiguration { public static final String PIG_LOG_TRACE_ID = "pig.log.trace.id"; /** + * Use Netty file server for Pig on Spark, true or false, default value is false + */ + public static final String PIG_SPARK_USE_NETTY_FILESERVER = "pig.spark.rpc.useNettyFileServer"; + + /** * @deprecated use {@link #PIG_LOG_TRACE_ID} instead. Will be removed in Pig 0.18 */ public static final String CALLER_ID = PIG_LOG_TRACE_ID; Modified: pig/trunk/src/org/apache/pig/PigWarning.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigWarning.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/PigWarning.java (original) +++ pig/trunk/src/org/apache/pig/PigWarning.java Mon May 29 15:00:39 2017 @@ -68,6 +68,8 @@ public enum PigWarning { DELETE_FAILED, PROJECTION_INVALID_RANGE, NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY, - SKIP_UDF_CALL_FOR_NULL + SKIP_UDF_CALL_FOR_NULL, + SPARK_WARN, //bulk collection of warnings under Spark exec engine + SPARK_CUSTOM_WARN // same as above but for custom UDF warnings only, see PIG-2207 ; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java Mon May 29 15:00:39 2017 @@ -35,6 +35,6 @@ public class AccumulatorOptimizer extend } public void visitMROp(MapReduceOper mr) throws VisitorException { - AccumulatorOptimizerUtil.addAccumulator(mr.reducePlan); + AccumulatorOptimizerUtil.addAccumulator(mr.reducePlan, mr.reducePlan.getRoots()); } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java Mon May 29 15:00:39 2017 @@ -28,7 +28,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; -import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.Pair; @@ -69,7 +68,7 @@ class NoopFilterRemover extends MROpPlan public void visit() throws VisitorException { super.visit(); for (Pair<POFilter, PhysicalPlan> pair: removalQ) { - removeFilter(pair.first, pair.second); + NoopFilterRemoverUtil.removeFilter(pair.first, pair.second); } removalQ.clear(); } @@ -91,23 +90,5 @@ class NoopFilterRemover extends MROpPlan } } } - - private void removeFilter(POFilter filter, PhysicalPlan plan) { - if (plan.size() > 1) { - try { - List<PhysicalOperator> fInputs = filter.getInputs(); - List<PhysicalOperator> sucs = plan.getSuccessors(filter); - - plan.removeAndReconnect(filter); - if(sucs!=null && sucs.size()!=0){ - for (PhysicalOperator suc : sucs) { - suc.setInputs(fInputs); - } - } - } catch (PlanException pe) { - log.info("Couldn't remove a filter in optimizer: "+pe.getMessage()); - } - } - } } } Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java Mon May 29 15:00:39 2017 @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; +import org.apache.pig.impl.plan.PlanException; + +import java.util.List; + +public class NoopFilterRemoverUtil { + private static Log log = LogFactory.getLog(NoopFilterRemoverUtil.class); + + public static void removeFilter(POFilter filter, PhysicalPlan plan) { + if (plan.size() > 1) { + try { + List<PhysicalOperator> fInputs = filter.getInputs(); + List<PhysicalOperator> sucs = plan.getSuccessors(filter); + + plan.removeAndReconnect(filter); + if(sucs!=null && sucs.size()!=0){ + for (PhysicalOperator suc : sucs) { + suc.setInputs(fInputs); + } + } + } catch (PlanException pe) { + log.info("Couldn't remove a filter in optimizer: "+pe.getMessage()); + } + } + } +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Mon May 29 15:00:39 2017 @@ -23,7 +23,7 @@ import org.apache.pig.EvalFunc; import org.apache.pig.LoadFunc; import org.apache.pig.StoreFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger; -import org.apache.pig.tools.pigstats.PigStatusReporter; +import org.apache.pig.tools.pigstats.PigWarnCounter; /** * @@ -36,7 +36,7 @@ public final class PigHadoopLogger imple private static Log log = LogFactory.getLog(PigHadoopLogger.class); private static PigHadoopLogger logger = null; - private PigStatusReporter reporter = null; + private PigWarnCounter reporter = null; private boolean aggregate = false; private PigHadoopLogger() { @@ -52,7 +52,7 @@ public final class PigHadoopLogger imple return logger; } - public void setReporter(PigStatusReporter reporter) { + public void setReporter(PigWarnCounter reporter) { this.reporter = reporter; } @@ -65,10 +65,10 @@ public final class PigHadoopLogger imple if (getAggregate()) { if (reporter != null) { if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) { - reporter.incrCounter(className, warningEnum.name(), 1); + reporter.incrWarnCounter(className, warningEnum.name(), 1L); } // For backwards compatibility, always report with warningEnum, see PIG-3739 - reporter.incrCounter(warningEnum, 1); + reporter.incrWarnCounter(warningEnum, 1L); } else { //TODO: //in local mode of execution if the PigHadoopLogger is used initially, Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Mon May 29 15:00:39 2017 @@ -61,14 +61,6 @@ public class PigInputFormat extends Inpu public static final String PIG_INPUT_SIGNATURES = "pig.inpSignatures"; public static final String PIG_INPUT_LIMITS = "pig.inpLimits"; - /** - * @deprecated Use {@link UDFContext} instead in the following way to get - * the job's {@link Configuration}: - * <pre>UdfContext.getUdfContext().getJobConf()</pre> - */ - @Deprecated - public static Configuration sJob; - /* (non-Javadoc) * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) */ @@ -78,43 +70,66 @@ public class PigInputFormat extends Inpu org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - // We need to create a TaskAttemptContext based on the Configuration which - // was used in the getSplits() to produce the split supplied here. For - // this, let's find out the input of the script which produced the split - // supplied here and then get the corresponding Configuration and setup - // TaskAttemptContext based on it and then call the real InputFormat's - // createRecordReader() method - - PigSplit pigSplit = (PigSplit)split; - activeSplit = pigSplit; - // XXX hadoop 20 new API integration: get around a hadoop 20 bug by - // passing total # of splits to each split so it can be retrieved - // here and set it to the configuration object. This number is needed - // by PoissonSampleLoader to compute the number of samples - int n = pigSplit.getTotalSplits(); - context.getConfiguration().setInt("pig.mapsplits.count", n); - Configuration conf = context.getConfiguration(); - PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer - .deserialize(conf.get("udf.import.list"))); - MapRedUtil.setupUDFContext(conf); - LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf); - // Pass loader signature to LoadFunc and to InputFormat through - // the conf - passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf); - - // merge entries from split specific conf into the conf we got - PigInputFormat.mergeSplitSpecificConf(loadFunc, pigSplit, conf); - - // for backward compatibility - PigInputFormat.sJob = conf; + RecordReaderFactory factory = new RecordReaderFactory(split, context); + return factory.createRecordReader(); + } - InputFormat inputFormat = loadFunc.getInputFormat(); - List<Long> inpLimitLists = - (ArrayList<Long>)ObjectSerializer.deserialize( - conf.get(PIG_INPUT_LIMITS)); + /** + * Helper class to create record reader + */ + protected static class RecordReaderFactory { + protected InputFormat inputFormat; + protected PigSplit pigSplit; + protected LoadFunc loadFunc; + protected TaskAttemptContext context; + protected long limit; + + public RecordReaderFactory(org.apache.hadoop.mapreduce.InputSplit split, + TaskAttemptContext context) throws IOException { + + // We need to create a TaskAttemptContext based on the Configuration which + // was used in the getSplits() to produce the split supplied here. For + // this, let's find out the input of the script which produced the split + // supplied here and then get the corresponding Configuration and setup + // TaskAttemptContext based on it and then call the real InputFormat's + // createRecordReader() method + + PigSplit pigSplit = (PigSplit)split; + // XXX hadoop 20 new API integration: get around a hadoop 20 bug by + // passing total # of splits to each split so it can be retrieved + // here and set it to the configuration object. This number is needed + // by PoissonSampleLoader to compute the number of samples + int n = pigSplit.getTotalSplits(); + context.getConfiguration().setInt("pig.mapsplits.count", n); + Configuration conf = context.getConfiguration(); + PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer + .deserialize(conf.get("udf.import.list"))); + MapRedUtil.setupUDFContext(conf); + LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf); + // Pass loader signature to LoadFunc and to InputFormat through + // the conf + passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf); + + // merge entries from split specific conf into the conf we got + PigInputFormat.mergeSplitSpecificConf(loadFunc, pigSplit, conf); + + InputFormat inputFormat = loadFunc.getInputFormat(); + + List<Long> inpLimitLists = + (ArrayList<Long>)ObjectSerializer.deserialize( + conf.get(PIG_INPUT_LIMITS)); + + this.inputFormat = inputFormat; + this.pigSplit = pigSplit; + this.loadFunc = loadFunc; + this.context = context; + this.limit = inpLimitLists.get(pigSplit.getInputIndex()); + } - return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, inpLimitLists.get(pigSplit.getInputIndex())); + public org.apache.hadoop.mapreduce.RecordReader<Text, Tuple> createRecordReader() throws IOException, InterruptedException { + return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, limit); + } } @@ -339,10 +354,4 @@ public class PigInputFormat extends Inpu return pigSplit; } - public static PigSplit getActiveSplit() { - return activeSplit; - } - - private static PigSplit activeSplit; - } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Mon May 29 15:00:39 2017 @@ -48,6 +48,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; @@ -125,6 +126,12 @@ public class PigSplit extends InputSplit */ String[] locations = null; + + /** + * overall splitLocationInfos + */ + SplitLocationInfo[] splitLocationInfos = null; + // this seems necessary for Hadoop to instatiate this split on the // backend public PigSplit() {} @@ -201,6 +208,51 @@ public class PigSplit extends InputSplit return locations; } + + @Override + public SplitLocationInfo[] getLocationInfo() throws IOException { + if (splitLocationInfos == null) { + HashMap<SplitLocationInfo, Long> locMap = new HashMap<SplitLocationInfo, Long>(); + Long lenInMap; + for (InputSplit split : wrappedSplits) { + SplitLocationInfo[] locs = split.getLocationInfo(); + if( locs != null) { + for (SplitLocationInfo loc : locs) { + try { + if ((lenInMap = locMap.get(loc)) == null) + locMap.put(loc, split.getLength()); + else + locMap.put(loc, lenInMap + split.getLength()); + } catch (InterruptedException e) { + throw new IOException("InputSplit.getLength throws exception: ", e); + } + } + } + } + Set<Map.Entry<SplitLocationInfo, Long>> entrySet = locMap.entrySet(); + Map.Entry<SplitLocationInfo, Long>[] hostSize = + entrySet.toArray(new Map.Entry[entrySet.size()]); + Arrays.sort(hostSize, new Comparator<Map.Entry<SplitLocationInfo, Long>>() { + + @Override + public int compare(Entry<SplitLocationInfo, Long> o1, Entry<SplitLocationInfo, Long> o2) { + long diff = o1.getValue() - o2.getValue(); + if (diff < 0) return 1; + if (diff > 0) return -1; + return 0; + } + }); + // maximum 5 locations are in list: refer to PIG-1648 for more details + int nHost = Math.min(hostSize.length, 5); + splitLocationInfos = new SplitLocationInfo[nHost]; + for (int i = 0; i < nHost; ++i) { + splitLocationInfos[i] = hostSize[i].getKey(); + } + } + return splitLocationInfos; + } + + @Override public long getLength() throws IOException, InterruptedException { if (length == -1) { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java Mon May 29 15:00:39 2017 @@ -52,7 +52,8 @@ public class SecondaryKeyOptimizerMR ext if (mr.getCustomPartitioner()!=null) return; - info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(mr.mapPlan, mr.reducePlan); + SecondaryKeyOptimizerUtil secondaryKeyOptUtil = new SecondaryKeyOptimizerUtil(); + info = secondaryKeyOptUtil.applySecondaryKeySort(mr.mapPlan, mr.reducePlan); if (info != null && info.isUseSecondaryKey()) { mr.setUseSecondaryKey(true); mr.setSecondarySortOrder(info.getSecondarySortOrder()); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Mon May 29 15:00:39 2017 @@ -497,6 +497,10 @@ public abstract class PhysicalOperator e parentPlan = physicalPlan; } + public PhysicalPlan getParentPlan() { + return parentPlan; + } + public Log getLogger() { return log; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon May 29 15:00:39 2017 @@ -120,6 +120,10 @@ public class POUserFunc extends Expressi instantiateFunc(funcSpec); } + public void setFuncInputSchema(){ + setFuncInputSchema(signature); + } + private void instantiateFunc(FuncSpec fSpec) { this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec); this.setSignature(signature); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Mon May 29 15:00:39 2017 @@ -73,6 +73,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark; import org.apache.pig.impl.plan.PlanVisitor; import org.apache.pig.impl.plan.PlanWalker; import org.apache.pig.impl.plan.VisitorException; @@ -363,4 +364,8 @@ public class PhyPlanVisitor extends Plan public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException { } + + public void visitBroadcastSpark(POBroadcastSpark poBroadcastSpark) { + } + } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Mon May 29 15:00:39 2017 @@ -143,13 +143,6 @@ public class PhysicalPlan extends Operat to.setInputs(getPredecessors(to)); } - /*public void connect(List<PhysicalOperator> from, PhysicalOperator to) throws IOException{ - if(!to.supportsMultipleInputs()){ - throw new IOException("Invalid Operation on " + to.name() + ". It doesn't support multiple inputs."); - } - - }*/ - @Override public void remove(PhysicalOperator op) { op.setInputs(null); Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; + +public class POBroadcastSpark extends PhysicalOperator { + private static final long serialVersionUID = 1L; + + protected String broadcastedVariableName; + + public POBroadcastSpark(OperatorKey k) { + super(k); + } + + public POBroadcastSpark(POBroadcastSpark copy) + throws ExecException { + super(copy); + } + + /** + * Set your broadcast variable name so that + * BroadcastConverter can put this broadcasted variable in a map + * which can be referenced by other functions / closures in Converters + * + * @param varName + */ + public void setBroadcastedVariableName(String varName) { + broadcastedVariableName = varName; + } + + public String getBroadcastedVariableName() { + return broadcastedVariableName; + } + + @Override + public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { + return null; + } + + @Override + public boolean supportsMultipleInputs() { + return false; + } + + @Override + public boolean supportsMultipleOutputs() { + return false; + } + + @Override + public String name() { + return getAliasString() + "BroadcastSpark - " + mKey.toString(); + } + + @Override + public void visit(PhyPlanVisitor v) throws VisitorException { + v.visitBroadcastSpark(this); + } +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Mon May 29 15:00:39 2017 @@ -70,6 +70,15 @@ public class POCollectedGroup extends Ph private transient boolean useDefaultBag; + //For Spark + private transient boolean endOfInput = false; + public boolean isEndOfInput() { + return endOfInput; + } + public void setEndOfInput (boolean isEndOfInput) { + endOfInput = isEndOfInput; + } + public POCollectedGroup(OperatorKey k) { this(k, -1, null); } @@ -132,7 +141,7 @@ public class POCollectedGroup extends Ph if (inp.returnStatus == POStatus.STATUS_EOP) { // Since the output is buffered, we need to flush the last // set of records when the close method is called by mapper. - if (this.parentPlan.endOfAllInput) { + if (this.parentPlan.endOfAllInput || isEndOfInput()) { return getStreamCloseResult(); } else { break; @@ -257,13 +266,13 @@ public class POCollectedGroup extends Ph leafOps.add(leaf); } } - + private void setIllustratorEquivalenceClasses(Tuple tin) { if (illustrator != null) { illustrator.getEquivalenceClasses().get(0).add(tin); } } - + @Override public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { return null; Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Mon May 29 15:00:39 2017 @@ -519,6 +519,10 @@ public class POFRJoin extends PhysicalOp return LRs; } + public boolean isLeftOuterJoin() { + return isLeftOuterJoin; + } + public int getFragment() { return fragment; } Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.data.DataType; +import org.apache.pig.data.SchemaTupleBackend; +import org.apache.pig.data.SchemaTupleClassGenerator; +import org.apache.pig.data.SchemaTupleFactory; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; + +import java.util.List; +import java.util.Map; + +public class POFRJoinSpark extends POFRJoin { + private static final Log log = LogFactory.getLog(POFRJoinSpark.class); + + private Map<String, List<Tuple>> broadcasts; + + public POFRJoinSpark(POFRJoin copy) throws ExecException { + super(copy); + } + + @Override + protected void setUpHashMap() throws ExecException { + log.info("Building replication hash table"); + + SchemaTupleFactory[] inputSchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length]; + SchemaTupleFactory[] keySchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length]; + for (int i = 0; i < inputSchemas.length; i++) { + addSchemaToFactories(inputSchemas[i], inputSchemaTupleFactories, i); + addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i); + } + + replicates.set(fragment, null); + int i = -1; + long start = System.currentTimeMillis(); + for (int k = 0; k < inputSchemas.length; ++k) { + ++i; + + SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[i]; + SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i]; + + if (i == fragment) { + replicates.set(i, null); + continue; + } + + TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory); + + log.debug("Completed setup. Trying to build replication hash table"); + List<Tuple> tuples = broadcasts.get(parentPlan.getPredecessors(this).get(i).getOperatorKey().toString()); + + POLocalRearrange localRearrange = LRs[i]; + + for (Tuple t : tuples) { + localRearrange.attachInput(t); + Result res = localRearrange.getNextTuple(); + if (getReporter() != null) { + getReporter().progress(); + } + Tuple tuple = (Tuple) res.result; + if (isKeyNull(tuple.get(1))) continue; + Object key = tuple.get(1); + Tuple value = getValueTuple(localRearrange, tuple); + + if (replicate.get(key) == null) { + replicate.put(key, new POMergeJoin.TuplesToSchemaTupleList(1, inputSchemaTupleFactory)); + } + + replicate.get(key).add(value); + + } + replicates.set(i, replicate); + } + long end = System.currentTimeMillis(); + log.debug("Hash Table built. Time taken: " + (end - start)); + } + + @Override + public String name() { + return getAliasString() + "FRJoinSpark[" + DataType.findTypeName(resultType) + + "]" + " - " + mKey.toString(); + } + + private void addSchemaToFactories(Schema schema, SchemaTupleFactory[] schemaTupleFactories, int index) { + if (schema != null) { + log.debug("Using SchemaTuple for FR Join Schema: " + schema); + schemaTupleFactories[index] = SchemaTupleBackend.newSchemaTupleFactory(schema, false, SchemaTupleClassGenerator.GenContext.FR_JOIN); + } + } + + public void attachInputs(Map<String, List<Tuple>> broadcasts) { + this.broadcasts = broadcasts; + } +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Mon May 29 15:00:39 2017 @@ -828,6 +828,10 @@ public class POForEach extends PhysicalO } } + public PhysicalOperator[] getPlanLeafOps() { + return planLeafOps; + } + public void setMapSideOnly(boolean mapSideOnly) { this.mapSideOnly = mapSideOnly; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java Mon May 29 15:00:39 2017 @@ -74,7 +74,13 @@ public class POGlobalRearrange extends P public POGlobalRearrange(OperatorKey k, int rp, List inp) { super(k, rp, inp); } - + + public POGlobalRearrange(POGlobalRearrange copy) throws ExecException { + super(copy); + this.cross = copy.cross; + this.customPartitioner = copy.customPartitioner; + } + @Override public void visit(PhyPlanVisitor v) throws VisitorException { v.visitGlobalRearrange(this);