[ 
https://issues.apache.org/jira/browse/SPARK-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15021187#comment-15021187
 ] 

Richard W. Eggert II commented on SPARK-4514:
---------------------------------------------

This test, however, still fails:

{code}
 test("getJobIdsForGroup() with takeAsync() across multiple partitions") {
    sc = new SparkContext("local", "test", new SparkConf(false))
    sc.setJobGroup("my-job-group2", "description")
    sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty
    val firstJobFuture = sc.parallelize(1 to 1000, 2).takeAsync(999)
    val firstJobId = eventually(timeout(10 seconds)) {
      firstJobFuture.jobIds.head
    }
    eventually(timeout(10 seconds)) {
      sc.statusTracker.getJobIdsForGroup("my-job-group2") should have size 2
    }
  }
{code}

> SparkContext localProperties does not inherit property updates across thread 
> reuse
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-4514
>                 URL: https://issues.apache.org/jira/browse/SPARK-4514
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.0, 1.1.1, 1.2.0
>            Reporter: Erik Erlandson
>            Assignee: Josh Rosen
>            Priority: Critical
>
> The current job group id of a Spark context is stored in the 
> {{localProperties}} member value.   This data structure is designed to be 
> thread local, and its settings are not preserved when {{ComplexFutureAction}} 
> instantiates a new {{Future}}.  
> One consequence of this is that {{takeAsync()}} does not behave in the same 
> way as other async actions, e.g. {{countAsync()}}.  For example, this test 
> (if copied into StatusTrackerSuite.scala), will fail, because 
> {{"my-job-group2"}} is not propagated to the Future which actually 
> instantiates the job:
> {code:java}
>   test("getJobIdsForGroup() with takeAsync()") {
>     sc = new SparkContext("local", "test", new SparkConf(false))
>     sc.setJobGroup("my-job-group2", "description")
>     sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty)
>     val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1)
>     val firstJobId = eventually(timeout(10 seconds)) {
>       firstJobFuture.jobIds.head
>     }
>     eventually(timeout(10 seconds)) {
>       sc.statusTracker.getJobIdsForGroup("my-job-group2") should be 
> (Seq(firstJobId))
>     }
>   }
> {code}
> It also impacts current PR for SPARK-1021, which involves additional uses of 
> {{ComplexFutureAction}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to