[ https://issues.apache.org/jira/browse/SPARK-31784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
wuyi updated SPARK-31784: ------------------------- Description: {code:java} test("share messages with allGather() call") { val conf = new SparkConf() .setMaster("local-cluster[4, 1, 1024]") .setAppName("test-cluster") sc = new SparkContext(conf) val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() // Sleep for a random time before global sync. Thread.sleep(Random.nextInt(1000)) // Pass partitionId message in val message: String = context.partitionId().toString val messages: Array[String] = context.allGather(message) messages.toList.iterator } // Take a sorted list of all the partitionId messages val messages = rdd2.collect().head // All the task partitionIds are shared for((x, i) <- messages.view.zipWithIndex) assert(x.toString == i.toString) } {code} In this test, the desired `messages` should be ["0", "1", "2", "3"], but only "0" in reality. > Fix test BarrierTaskContextSuite."share messages with allGather() call" > ----------------------------------------------------------------------- > > Key: SPARK-31784 > URL: https://issues.apache.org/jira/browse/SPARK-31784 > Project: Spark > Issue Type: Test > Components: Spark Core > Affects Versions: 3.1.0 > Environment: > {code:java} > {code} > > Reporter: wuyi > Priority: Major > > {code:java} > test("share messages with allGather() call") { > val conf = new SparkConf() > .setMaster("local-cluster[4, 1, 1024]") > .setAppName("test-cluster") > sc = new SparkContext(conf) > val rdd = sc.makeRDD(1 to 10, 4) > val rdd2 = rdd.barrier().mapPartitions { it => > val context = BarrierTaskContext.get() > // Sleep for a random time before global sync. > Thread.sleep(Random.nextInt(1000)) > // Pass partitionId message in > val message: String = context.partitionId().toString > val messages: Array[String] = context.allGather(message) > messages.toList.iterator > } > // Take a sorted list of all the partitionId messages > val messages = rdd2.collect().head > // All the task partitionIds are shared > for((x, i) <- messages.view.zipWithIndex) assert(x.toString == i.toString) > } > {code} > In this test, the desired `messages` should be ["0", "1", "2", "3"], but only > "0" in reality. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org