Author: szita Date: Fri Oct 6 12:02:22 2017 New Revision: 1811322 URL: http://svn.apache.org/viewvc?rev=1811322&view=rev Log: PIG-5305: Enable yarn-client mode execution of tests in Spark (1) mode (szita)
Modified: pig/trunk/CHANGES.txt pig/trunk/build.xml pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java pig/trunk/src/org/apache/pig/impl/util/JarManager.java pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1811322&r1=1811321&r2=1811322&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Fri Oct 6 12:02:22 2017 @@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley IMPROVEMENTS +PIG-5305: Enable yarn-client mode execution of tests in Spark (1) mode (szita) + PIG-4120: Broadcast the index file in case of POMergeCoGroup and POMergeJoin (satishsaley via rohini) PIG-5306: REGEX_EXTRACT() logs every line that doesn't match (satishsaley via rohini) Modified: pig/trunk/build.xml URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1811322&r1=1811321&r2=1811322&view=diff ============================================================================== --- pig/trunk/build.xml (original) +++ pig/trunk/build.xml Fri Oct 6 12:02:22 2017 @@ -118,6 +118,7 @@ <property name="smoke.tests.jarfile" value="${build.dir}/${final.name}-smoketests.jar" /> <property name="test.pigunit.src.dir" value="${test.src.dir}/org/apache/pig/test/pigunit" /> <property name="test.pigunit.file" value="${test.src.dir}/pigunit-tests"/> + <property name="pigtest.jarfile" value="pigtest.jar" /> <!-- test configuration, use ${user.home}/build.properties to configure values --> @@ -160,7 +161,6 @@ <propertyreset name="isHadoop2" value="true" /> <propertyreset name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" /> <propertyreset name="src.shims.test.dir" value="${basedir}/shims/test/hadoop${hadoopversion}" /> - <propertyreset name="src.exclude.dir" value="" /> <propertyreset name="test.exec.type" value="tez" /> </target> @@ -278,6 +278,7 @@ value="${mvnrepo}/org/codehaus/jackson/jackson-core-asl/${jackson-pig-3039-test.version}/jackson-core-asl-${jackson-pig-3039-test.version}.jar"/> <property name="jackson_mapper_repo_url" value="${mvnrepo}/org/codehaus/jackson/jackson-mapper-asl/${jackson-pig-3039-test.version}/jackson-mapper-asl-${jackson-pig-3039-test.version}.jar"/> + <property name="test.spark.spark_master" value="yarn-client" /> <!--this is the naming policy for artifacts we want pulled down--> <property name="ivy.artifact.retrieve.pattern" value="${ant.project.name}/[artifact]-[revision](-[classifier]).[ext]"/> @@ -893,32 +894,32 @@ <!-- ================================================================== --> <!-- Run unit tests --> <!-- ================================================================== --> - <target name="test-core" depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download" description="Run full set of unit tests"> + <target name="test-core" depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check,jackson-pig-3039-test-download" description="Run full set of unit tests"> <macro-test-runner test.file="${test.all.file}" tests.failed="test-core.failed" /> <fail if="test-core.failed">Tests failed!</fail> </target> - <target name="test-commit" depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check" description="Run approximate 10-minute set of unit tests prior to commiting"> + <target name="test-commit" depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check" description="Run approximate 10-minute set of unit tests prior to commiting"> <macro-test-runner test.file="${test.commit.file}" tests.failed="test-commit.failed"/> <fail if="test-commit.failed">Tests failed!</fail> </target> - <target name="test-unit" depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check" description="Run all true unit tests"> + <target name="test-unit" depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check" description="Run all true unit tests"> <macro-test-runner test.file="${test.unit.file}" tests.failed="test-unit.failed"/> <fail if="test-unit.failed">Tests failed!</fail> </target> - <target name="test-smoke" depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check" description="Run 30 min smoke tests"> + <target name="test-smoke" depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check" description="Run 30 min smoke tests"> <macro-test-runner test.file="${test.smoke.file}" tests.failed="test-smoke.failed"/> <fail if="test-smoke.failed">Tests failed!</fail> </target> - <target name="test-tez" depends="setTezEnv,setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download" description="Run tez unit tests"> + <target name="test-tez" depends="setTezEnv,setWindowsPath,setLinuxPath,compile-test,debugger.check,jackson-pig-3039-test-download" description="Run tez unit tests"> <macro-test-runner test.file="${test.all.file}" tests.failed="test-tez.failed"/> <fail if="test-tez.failed">Tests failed!</fail> </target> - <target name="test-spark" depends="setSparkEnv,setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download" description="Run Spark unit tests in Spark cluster-local mode"> + <target name="test-spark" depends="setSparkEnv,setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check,jackson-pig-3039-test-download" description="Run Spark unit tests in Spark cluster-local mode"> <macro-test-runner test.file="${test.all.file}" tests.failed="test-spark.failed"/> <fail if="test-spark.failed">Tests failed!</fail> </target> @@ -957,6 +958,7 @@ <sysproperty key="java.security.krb5.kdc" value="" /> <sysproperty key="log4j.configuration" value="file:${basedir}/conf/test-log4j.properties"/> <env key="MALLOC_ARENA_MAX" value="4"/> + <env key="SPARK_MASTER" value="${test.spark.spark_master}"/> <env key="PATH" path="${build.path}"/> <classpath> <pathelement location="${output.jarfile.core}" /> @@ -999,7 +1001,7 @@ </target> <target name="test-core-mrtez" description="run core tests on both mr and tez mode" - depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download"> + depends="setWindowsPath,setLinuxPath,compile-test,debugger.check,jackson-pig-3039-test-download"> <fail message="hadoopversion must be set to 2 when invoking test-core-mrtez"> <condition> <not> @@ -1072,6 +1074,16 @@ <ant dir="${pigmix.dir}" target="test"/> </target> + <target name="pigtest-jar" depends="compile-test, ivy-test" description="create the pigtest jar file"> + <echo> *** Creating pigtest.jar ***</echo> + <jar destfile="${pigtest.jarfile}"> + <fileset dir="${test.build.classes}"> + <include name="**/org/apache/pig/test/**"/> + </fileset> + <zipfileset src="${ivy.lib.dir}/commons-lang-${commons-lang.version}.jar" /> + </jar> + </target> + <!-- ================================================================== --> <!-- Pigunit --> <!-- ================================================================== --> Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1811322&r1=1811321&r2=1811322&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Fri Oct 6 12:02:22 2017 @@ -431,6 +431,7 @@ public class SparkLauncher extends Launc Set<String> allJars = new HashSet<String>(); LOG.info("Add default jars to Spark Job"); allJars.addAll(JarManager.getDefaultJars()); + JarManager.addPigTestJarIfPresent(allJars); LOG.info("Add script jars to Spark Job"); for (String scriptJar : pigContext.scriptJars) { allJars.add(scriptJar); @@ -536,23 +537,35 @@ public class SparkLauncher extends Launc return sparkPlan; } + + private static String getMaster(PigContext pc){ + String master = null; + if (pc.getExecType().isLocal()) { + master = "local"; + } else { + master = System.getenv("SPARK_MASTER"); + if (master == null) { + LOG.info("SPARK_MASTER not specified, using \"local\""); + master = "local"; + } + } + return master; + } + /** * Only one SparkContext may be active per JVM (SPARK-2243). When multiple threads start SparkLaucher, - * the static member sparkContext should be initialized only once + * the static member sparkContext should be initialized only by either local or cluster mode at a time. + * + * In case it was already initialized with a different mode than what the new pigContext instance wants, it will + * close down the existing SparkContext and re-initalize it with the new mode. */ private static synchronized void startSparkIfNeeded(JobConf jobConf, PigContext pc) throws PigException { + String master = getMaster(pc); + if (sparkContext != null && !master.equals(sparkContext.master())){ + sparkContext.close(); + sparkContext = null; + } if (sparkContext == null) { - String master = null; - if (pc.getExecType().isLocal()) { - master = "local"; - } else { - master = System.getenv("SPARK_MASTER"); - if (master == null) { - LOG.info("SPARK_MASTER not specified, using \"local\""); - master = "local"; - } - } - String sparkHome = System.getenv("SPARK_HOME"); if (!master.startsWith("local") && !master.equals("yarn-client")) { // Check that we have the Mesos native library and Spark home @@ -590,8 +603,10 @@ public class SparkLauncher extends Launc } } - //see PIG-5200 why need to set spark.executor.userClassPathFirst as true - sparkConf.set("spark.executor.userClassPathFirst", "true"); + //see PIG-5200 why need to set spark.executor.userClassPathFirst as true on cluster modes + if (! "local".equals(master)) { + sparkConf.set("spark.executor.userClassPathFirst", "true"); + } checkAndConfigureDynamicAllocation(master, sparkConf); sparkContext = new JavaSparkContext(sparkConf); Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1811322&r1=1811321&r2=1811322&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Fri Oct 6 12:02:22 2017 @@ -58,6 +58,7 @@ import dk.brics.automaton.Automaton; public class JarManager { private static Log log = LogFactory.getLog(JarManager.class); + private static final String PIGTEST_JAR = "pigtest.jar"; private static enum DefaultPigPackages { @@ -306,4 +307,15 @@ public class JarManager { } } + /** + * Adds jar file where pig test classes are packed (build/test/classes) + * @param jars Set of jars to append pig tests jar into + */ + public static void addPigTestJarIfPresent(Set<String> jars) { + File file = new File(PIGTEST_JAR); + if (file.exists()) { + jars.add(file.getAbsolutePath()); + } + } + } Modified: pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java?rev=1811322&r1=1811321&r2=1811322&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java (original) +++ pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java Fri Oct 6 12:02:22 2017 @@ -119,7 +119,7 @@ public class TestOrcStoragePushdown { } private static void createInputData() throws Exception { - pigServer = new PigServer(Util.getLocalTestMode()); + pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); new File(inpbasedir).mkdirs(); new File(outbasedir).mkdirs(); @@ -162,10 +162,11 @@ public class TestOrcStoragePushdown { } bw.close(); + Util.copyFromLocalToCluster(cluster, inputTxtFile, inputTxtFile); + // Store only 1000 rows in each row block (MIN_ROW_INDEX_STRIDE is 1000. So can't use less than that) pigServer.registerQuery("A = load '" + Util.generateURI(inputTxtFile, pigServer.getPigContext()) + "' as (f1:boolean, f2:int, f3:int, f4:int, f5:long, f6:float, f7:double, f8:bytearray, f9:chararray, f10:datetime, f11:bigdecimal);"); - pigServer.registerQuery("store A into '" + Util.generateURI(INPUT, pigServer.getPigContext()) +"' using OrcStorage('-r 1000 -s 100000');"); - Util.copyFromLocalToCluster(cluster, INPUT, INPUT); + pigServer.registerQuery("store A into '" + INPUT +"' using OrcStorage('-r 1000 -s 100000');"); } @AfterClass