Author: szita
Date: Fri Oct  6 12:02:22 2017
New Revision: 1811322

URL: http://svn.apache.org/viewvc?rev=1811322&view=rev
Log:
PIG-5305: Enable yarn-client mode execution of tests in Spark (1) mode (szita)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/trunk/src/org/apache/pig/impl/util/JarManager.java
    pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1811322&r1=1811321&r2=1811322&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Oct  6 12:02:22 2017
@@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley
  
 IMPROVEMENTS
 
+PIG-5305: Enable yarn-client mode execution of tests in Spark (1) mode (szita)
+
 PIG-4120: Broadcast the index file in case of POMergeCoGroup and POMergeJoin 
(satishsaley via rohini)
 
 PIG-5306: REGEX_EXTRACT() logs every line that doesn't match (satishsaley via 
rohini)

Modified: pig/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1811322&r1=1811321&r2=1811322&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Fri Oct  6 12:02:22 2017
@@ -118,6 +118,7 @@
     <property name="smoke.tests.jarfile" 
value="${build.dir}/${final.name}-smoketests.jar" />
     <property name="test.pigunit.src.dir" 
value="${test.src.dir}/org/apache/pig/test/pigunit" />
     <property name="test.pigunit.file" value="${test.src.dir}/pigunit-tests"/>
+    <property name="pigtest.jarfile" value="pigtest.jar" />
 
 
     <!-- test configuration, use ${user.home}/build.properties to configure 
values  -->
@@ -160,7 +161,6 @@
         <propertyreset name="isHadoop2" value="true" />
         <propertyreset name="src.shims.dir" 
value="${basedir}/shims/src/hadoop${hadoopversion}" />
         <propertyreset name="src.shims.test.dir" 
value="${basedir}/shims/test/hadoop${hadoopversion}" />
-        <propertyreset name="src.exclude.dir" value="" />
         <propertyreset name="test.exec.type" value="tez" />
     </target>
 
@@ -278,6 +278,7 @@
               
value="${mvnrepo}/org/codehaus/jackson/jackson-core-asl/${jackson-pig-3039-test.version}/jackson-core-asl-${jackson-pig-3039-test.version}.jar"/>
     <property name="jackson_mapper_repo_url"
               
value="${mvnrepo}/org/codehaus/jackson/jackson-mapper-asl/${jackson-pig-3039-test.version}/jackson-mapper-asl-${jackson-pig-3039-test.version}.jar"/>
+    <property name="test.spark.spark_master" value="yarn-client" />
 
     <!--this is the naming policy for artifacts we want pulled down-->
     <property name="ivy.artifact.retrieve.pattern" 
value="${ant.project.name}/[artifact]-[revision](-[classifier]).[ext]"/>
@@ -893,32 +894,32 @@
     <!-- ================================================================== -->
     <!-- Run unit tests                                                     -->
     <!-- ================================================================== -->
-    <target name="test-core" 
depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download"
 description="Run full set of unit tests">
+    <target name="test-core" 
depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check,jackson-pig-3039-test-download"
 description="Run full set of unit tests">
         <macro-test-runner test.file="${test.all.file}" 
tests.failed="test-core.failed" />
         <fail if="test-core.failed">Tests failed!</fail>
     </target>
 
-    <target name="test-commit" 
depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check" 
description="Run approximate 10-minute set of unit tests prior to commiting">
+    <target name="test-commit" 
depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check" 
description="Run approximate 10-minute set of unit tests prior to commiting">
         <macro-test-runner test.file="${test.commit.file}" 
tests.failed="test-commit.failed"/>
         <fail if="test-commit.failed">Tests failed!</fail>
     </target>
 
-    <target name="test-unit" 
depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check" 
description="Run all true unit tests">
+    <target name="test-unit" 
depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check" 
description="Run all true unit tests">
         <macro-test-runner test.file="${test.unit.file}" 
tests.failed="test-unit.failed"/>
         <fail if="test-unit.failed">Tests failed!</fail>
     </target>
 
-    <target name="test-smoke" 
depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check" 
description="Run 30 min smoke tests">
+    <target name="test-smoke" 
depends="setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check" 
description="Run 30 min smoke tests">
         <macro-test-runner test.file="${test.smoke.file}" 
tests.failed="test-smoke.failed"/>
         <fail if="test-smoke.failed">Tests failed!</fail>
     </target>
 
-    <target name="test-tez" 
depends="setTezEnv,setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download"
 description="Run tez unit tests">
+    <target name="test-tez" 
depends="setTezEnv,setWindowsPath,setLinuxPath,compile-test,debugger.check,jackson-pig-3039-test-download"
 description="Run tez unit tests">
         <macro-test-runner test.file="${test.all.file}" 
tests.failed="test-tez.failed"/>
         <fail if="test-tez.failed">Tests failed!</fail>
     </target>
 
-    <target name="test-spark" 
depends="setSparkEnv,setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download"
 description="Run Spark unit tests in Spark cluster-local mode">
+    <target name="test-spark" 
depends="setSparkEnv,setWindowsPath,setLinuxPath,compile-test,pigtest-jar,debugger.check,jackson-pig-3039-test-download"
 description="Run Spark unit tests in Spark cluster-local mode">
         <macro-test-runner test.file="${test.all.file}" 
tests.failed="test-spark.failed"/>
         <fail if="test-spark.failed">Tests failed!</fail>
     </target>
@@ -957,6 +958,7 @@
             <sysproperty key="java.security.krb5.kdc" value="" />
             <sysproperty key="log4j.configuration" 
value="file:${basedir}/conf/test-log4j.properties"/>
             <env key="MALLOC_ARENA_MAX" value="4"/>
+            <env key="SPARK_MASTER" value="${test.spark.spark_master}"/>
             <env key="PATH" path="${build.path}"/>
             <classpath>
                 <pathelement location="${output.jarfile.core}" />
@@ -999,7 +1001,7 @@
     </target>
 
     <target name="test-core-mrtez" description="run core tests on both mr and 
tez mode"
-            
depends="setWindowsPath,setLinuxPath,compile-test,jar-simple,debugger.check,jackson-pig-3039-test-download">
+            
depends="setWindowsPath,setLinuxPath,compile-test,debugger.check,jackson-pig-3039-test-download">
         <fail message="hadoopversion must be set to 2 when invoking 
test-core-mrtez">
           <condition>
             <not>
@@ -1072,6 +1074,16 @@
         <ant dir="${pigmix.dir}" target="test"/>
     </target>
 
+    <target name="pigtest-jar" depends="compile-test, ivy-test" 
description="create the pigtest jar file">
+        <echo> *** Creating pigtest.jar ***</echo>
+        <jar destfile="${pigtest.jarfile}">
+            <fileset dir="${test.build.classes}">
+                <include name="**/org/apache/pig/test/**"/>
+            </fileset>
+            <zipfileset 
src="${ivy.lib.dir}/commons-lang-${commons-lang.version}.jar" />
+        </jar>
+    </target>
+
     <!-- ================================================================== -->
     <!-- Pigunit                                                            -->
     <!-- ================================================================== -->

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1811322&r1=1811321&r2=1811322&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Fri Oct  6 12:02:22 2017
@@ -431,6 +431,7 @@ public class SparkLauncher extends Launc
         Set<String> allJars = new HashSet<String>();
         LOG.info("Add default jars to Spark Job");
         allJars.addAll(JarManager.getDefaultJars());
+        JarManager.addPigTestJarIfPresent(allJars);
         LOG.info("Add script jars to Spark Job");
         for (String scriptJar : pigContext.scriptJars) {
             allJars.add(scriptJar);
@@ -536,23 +537,35 @@ public class SparkLauncher extends Launc
         return sparkPlan;
     }
 
+
+    private static String getMaster(PigContext pc){
+        String master = null;
+        if (pc.getExecType().isLocal()) {
+            master = "local";
+        } else {
+            master = System.getenv("SPARK_MASTER");
+            if (master == null) {
+                LOG.info("SPARK_MASTER not specified, using \"local\"");
+                master = "local";
+            }
+        }
+        return master;
+    }
+
     /**
      * Only one SparkContext may be active per JVM (SPARK-2243). When multiple 
threads start SparkLaucher,
-     * the static member sparkContext should be initialized only once
+     * the static member sparkContext should be initialized only by either 
local or cluster mode at a time.
+     *
+     * In case it was already initialized with a different mode than what the 
new pigContext instance wants, it will
+     * close down the existing SparkContext and re-initalize it with the new 
mode.
      */
     private static synchronized void startSparkIfNeeded(JobConf jobConf, 
PigContext pc) throws PigException {
+        String master = getMaster(pc);
+        if (sparkContext != null && !master.equals(sparkContext.master())){
+            sparkContext.close();
+            sparkContext = null;
+        }
         if (sparkContext == null) {
-            String master = null;
-            if (pc.getExecType().isLocal()) {
-                master = "local";
-            } else {
-                master = System.getenv("SPARK_MASTER");
-                if (master == null) {
-                    LOG.info("SPARK_MASTER not specified, using \"local\"");
-                    master = "local";
-                }
-            }
-
             String sparkHome = System.getenv("SPARK_HOME");
             if (!master.startsWith("local") && !master.equals("yarn-client")) {
                 // Check that we have the Mesos native library and Spark home
@@ -590,8 +603,10 @@ public class SparkLauncher extends Launc
                 }
             }
 
-            //see PIG-5200 why need to set spark.executor.userClassPathFirst 
as true
-            sparkConf.set("spark.executor.userClassPathFirst", "true");
+            //see PIG-5200 why need to set spark.executor.userClassPathFirst 
as true on cluster modes
+            if (! "local".equals(master)) {
+                sparkConf.set("spark.executor.userClassPathFirst", "true");
+            }
             checkAndConfigureDynamicAllocation(master, sparkConf);
 
             sparkContext = new JavaSparkContext(sparkConf);

Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1811322&r1=1811321&r2=1811322&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Fri Oct  6 12:02:22 
2017
@@ -58,6 +58,7 @@ import dk.brics.automaton.Automaton;
 public class JarManager {
 
     private static Log log = LogFactory.getLog(JarManager.class);
+    private static final String PIGTEST_JAR = "pigtest.jar";
 
     private static enum DefaultPigPackages {
 
@@ -306,4 +307,15 @@ public class JarManager {
         }
     }
 
+    /**
+     * Adds jar file where pig test classes are packed (build/test/classes)
+     * @param jars Set of jars to append pig tests jar into
+     */
+    public static void addPigTestJarIfPresent(Set<String> jars) {
+        File file = new File(PIGTEST_JAR);
+        if (file.exists()) {
+            jars.add(file.getAbsolutePath());
+        }
+    }
+
 }

Modified: pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java?rev=1811322&r1=1811321&r2=1811322&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java Fri Oct  
6 12:02:22 2017
@@ -119,7 +119,7 @@ public class TestOrcStoragePushdown {
     }
 
     private static void createInputData() throws Exception {
-        pigServer = new PigServer(Util.getLocalTestMode());
+        pigServer = new PigServer(cluster.getExecType(), 
cluster.getProperties());
 
         new File(inpbasedir).mkdirs();
         new File(outbasedir).mkdirs();
@@ -162,10 +162,11 @@ public class TestOrcStoragePushdown {
         }
         bw.close();
 
+        Util.copyFromLocalToCluster(cluster, inputTxtFile, inputTxtFile);
+
         // Store only 1000 rows in each row block (MIN_ROW_INDEX_STRIDE is 
1000. So can't use less than that)
         pigServer.registerQuery("A = load '" + Util.generateURI(inputTxtFile, 
pigServer.getPigContext()) + "' as (f1:boolean, f2:int, f3:int, f4:int, 
f5:long, f6:float, f7:double, f8:bytearray, f9:chararray, f10:datetime, 
f11:bigdecimal);");
-        pigServer.registerQuery("store A into '" + Util.generateURI(INPUT, 
pigServer.getPigContext()) +"' using OrcStorage('-r 1000 -s 100000');");
-        Util.copyFromLocalToCluster(cluster, INPUT, INPUT);
+        pigServer.registerQuery("store A into '" + INPUT +"' using 
OrcStorage('-r 1000 -s 100000');");
     }
 
     @AfterClass


Reply via email to