[ 
https://issues.apache.org/jira/browse/SPARK-31784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-31784:
-------------------------
    Description: 
{code:java}
test("share messages with allGather() call") {  
  val conf = new SparkConf()
      .setMaster("local-cluster[4, 1, 1024]")
      .setAppName("test-cluster")
  sc = new SparkContext(conf)
  val rdd = sc.makeRDD(1 to 10, 4)
  val rdd2 = rdd.barrier().mapPartitions { it =>  
    val context = BarrierTaskContext.get()      
    // Sleep for a random time before global sync.     
Thread.sleep(Random.nextInt(1000))  
    // Pass partitionId message in
    val message: String = context.partitionId().toString
    val messages: Array[String] = context.allGather(message)        
messages.toList.iterator 
   } 
   // Take a sorted list of all the partitionId messages
   val messages = rdd2.collect().head
   // All the task partitionIds are shared
   for((x, i) <- messages.view.zipWithIndex) assert(x.toString == i.toString)
 }
{code}
In this test, the desired `messages` should be ["0", "1", "2", "3"], but only 
"0" in reality.

> Fix test BarrierTaskContextSuite."share messages with allGather() call"
> -----------------------------------------------------------------------
>
>                 Key: SPARK-31784
>                 URL: https://issues.apache.org/jira/browse/SPARK-31784
>             Project: Spark
>          Issue Type: Test
>          Components: Spark Core
>    Affects Versions: 3.1.0
>         Environment:  
> {code:java}
>  {code}
>  
>            Reporter: wuyi
>            Priority: Major
>
> {code:java}
> test("share messages with allGather() call") {  
>   val conf = new SparkConf()
>       .setMaster("local-cluster[4, 1, 1024]")
>       .setAppName("test-cluster")
>   sc = new SparkContext(conf)
>   val rdd = sc.makeRDD(1 to 10, 4)
>   val rdd2 = rdd.barrier().mapPartitions { it =>  
>     val context = BarrierTaskContext.get()      
>     // Sleep for a random time before global sync.     
> Thread.sleep(Random.nextInt(1000))  
>     // Pass partitionId message in
>     val message: String = context.partitionId().toString
>     val messages: Array[String] = context.allGather(message)        
> messages.toList.iterator 
>    } 
>    // Take a sorted list of all the partitionId messages
>    val messages = rdd2.collect().head
>    // All the task partitionIds are shared
>    for((x, i) <- messages.view.zipWithIndex) assert(x.toString == i.toString)
>  }
> {code}
> In this test, the desired `messages` should be ["0", "1", "2", "3"], but only 
> "0" in reality.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to