[ https://issues.apache.org/jira/browse/SPARK-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15021187#comment-15021187 ]
Richard W. Eggert II commented on SPARK-4514: --------------------------------------------- This test, however, still fails: {code} test("getJobIdsForGroup() with takeAsync() across multiple partitions") { sc = new SparkContext("local", "test", new SparkConf(false)) sc.setJobGroup("my-job-group2", "description") sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty val firstJobFuture = sc.parallelize(1 to 1000, 2).takeAsync(999) val firstJobId = eventually(timeout(10 seconds)) { firstJobFuture.jobIds.head } eventually(timeout(10 seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group2") should have size 2 } } {code} > SparkContext localProperties does not inherit property updates across thread > reuse > ---------------------------------------------------------------------------------- > > Key: SPARK-4514 > URL: https://issues.apache.org/jira/browse/SPARK-4514 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.1.0, 1.1.1, 1.2.0 > Reporter: Erik Erlandson > Assignee: Josh Rosen > Priority: Critical > > The current job group id of a Spark context is stored in the > {{localProperties}} member value. This data structure is designed to be > thread local, and its settings are not preserved when {{ComplexFutureAction}} > instantiates a new {{Future}}. > One consequence of this is that {{takeAsync()}} does not behave in the same > way as other async actions, e.g. {{countAsync()}}. For example, this test > (if copied into StatusTrackerSuite.scala), will fail, because > {{"my-job-group2"}} is not propagated to the Future which actually > instantiates the job: > {code:java} > test("getJobIdsForGroup() with takeAsync()") { > sc = new SparkContext("local", "test", new SparkConf(false)) > sc.setJobGroup("my-job-group2", "description") > sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty) > val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1) > val firstJobId = eventually(timeout(10 seconds)) { > firstJobFuture.jobIds.head > } > eventually(timeout(10 seconds)) { > sc.statusTracker.getJobIdsForGroup("my-job-group2") should be > (Seq(firstJobId)) > } > } > {code} > It also impacts current PR for SPARK-1021, which involves additional uses of > {{ComplexFutureAction}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org