Github user choochootrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9367#discussion_r43942556
  
    --- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
    @@ -366,6 +367,72 @@ class SparkSubmitSuite
         }
       }
     
    +  // SPARK-11195
    +  test("classes are correctly loaded when tasks fail") {
    +    // Compile a simple jar that throws a user defined exception on the 
driver
    +    val tempDir = Utils.createTempDir()
    +    val srcDir = new File(tempDir, "repro/")
    +    srcDir.mkdirs()
    +    // scalastyle:off line.size.limit
    +    val mainSource = new JavaSourceFromString(new File(srcDir, 
"MyJob").getAbsolutePath,
    +      """package repro;
    +        |
    +        |import java.util.*;
    +        |import java.util.regex.*;
    +        |import org.apache.spark.*;
    +        |import org.apache.spark.api.java.*;
    +        |import org.apache.spark.api.java.function.*;
    +        |
    +        |public class MyJob {
    +        |  public static class MyException extends Exception {
    +        |  }
    +        |
    +        |  public static void main(String[] args) {
    +        |    SparkConf conf = new SparkConf();
    +        |    JavaSparkContext sc = new JavaSparkContext(conf);
    +        |
    +        |    JavaRDD rdd = sc.parallelize(Arrays.asList(new Integer[]{1}), 
1).map(new Function<Integer, Boolean>() {
    +        |      public Boolean call(Integer x) throws MyException {
    +        |        throw new MyException();
    +        |      }
    +        |    });
    +        |
    +        |    try {
    +        |      rdd.collect();
    +        |
    +        |      assert(false); // should be unreachable
    +        |    } catch (Exception e) {
    +        |      // the driver should not have any problems resolving the 
exception class and determining
    +        |      // why the task failed.
    +        |
    +        |      Pattern unknownFailure = Pattern.compile(".*Lost task.*: 
UnknownReason.*", Pattern.DOTALL);
    +        |      Pattern expectedFailure = Pattern.compile(".*Lost task.*: 
repro.MyJob\\$MyException.*", Pattern.DOTALL);
    +        |
    +        |      assert(!unknownFailure.matcher(e.getMessage()).matches());
    +        |      assert(expectedFailure.matcher(e.getMessage()).matches());
    +        |    }
    +        |  }
    +        |}
    +      """.stripMargin)
    +    // scalastyle:on line.size.limit
    +    val sparkJar = 
"../assembly/target/scala-2.10/spark-assembly-1.5.1-hadoop2.2.0.jar"
    --- End diff --
    
    all this patch does is make `TaskResult` deserialization use 
`Utils.getContextOrSparkClassLoader` (the classloader which loaded the 
`spark-submit`ed jar) instead of `Utils.getSparkClassLoader` (this is 
`AppClassLoader` which only has spark classes in it). without this patch, a 
failed task would not be able to deserialize an exception if it did not exist 
in `Utils.getSparkClassLoader`.
    
    in order to reproduce this issue, i set up a situation where 
`Utils.getContextOnSparkClassLoader` contains `MyException` but 
`Utils.getSparkClassLoader` does not (see 
https://issues.apache.org/jira/browse/SPARK-11195). this is easy to manually 
test with `spark-submit` and a user defined exception, but turning this into an 
automated test is proving to be much trickier. here are the 3 options:
    
    * :x: if i place all of the code into `SparkSubmitSuite`, the bug won't be 
hit because `MyException` will be in the root classloader and my patch makes no 
difference.
    
    * :grey_question: if i place all of the code into an external jar and run 
`spark-submit`, i can set up the same situation as my repro which found this 
bug. the issue i am running into is that i need a spark classpath in order to 
compile my jar. i can use the assembled jar, but this changes depending on the 
maven profiles that are enabled and so on.
    
    * :grey_question: i can try @brkyvz & @yhuai's hybrid approach of putting 
only the exception into a jar and the rest of the code into `SparkSubmitSuite`. 
i will have to do the following in order to repro this issue:
      * load the jar with `MyException` in a new classloader whose parent is 
the root classloader
      * somehow allow this classloader to be used by the driver and the 
executor *without* changing `Utils.getSparkClassLoader`.
    
     at this point am i not reimplementing `spark-submit`? :)
    
    the final approach is certainly worth trying, i'll take a look at it later 
today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to