[ https://issues.apache.org/jira/browse/SPARK-40932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan resolved SPARK-40932. --------------------------------- Fix Version/s: 3.3.2 3.4.0 Resolution: Fixed Issue resolved by pull request 38410 [https://github.com/apache/spark/pull/38410] > Barrier: messages for allGather will be overridden by the following barrier > APIs > -------------------------------------------------------------------------------- > > Key: SPARK-40932 > URL: https://issues.apache.org/jira/browse/SPARK-40932 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.3.0, 3.3.1 > Reporter: Bobby Wang > Assignee: Bobby Wang > Priority: Critical > Fix For: 3.3.2, 3.4.0 > > > When I was working on an internal project which has not been opened source. I > found this bug that the messages for Barrier.allGather may be overridden by > the following Barrier APIs, which means the user can't get the correct > allGather message. > > This issue can easily repro by the following unit tests. > > > {code:java} > test("SPARK-XXX, messages of allGather should not been overridden " + > "by the following barrier APIs") { > sc = new SparkContext(new > SparkConf().setAppName("test").setMaster("local[2]")) > sc.setLogLevel("INFO") > val rdd = sc.makeRDD(1 to 10, 2) > val rdd2 = rdd.barrier().mapPartitions { it => > val context = BarrierTaskContext.get() > // Sleep for a random time before global sync. > Thread.sleep(Random.nextInt(1000)) > // Pass partitionId message in > val message: String = context.partitionId().toString > val messages: Array[String] = context.allGather(message) > context.barrier() > Iterator.single(messages.toList) > } > val messages = rdd2.collect() > // All the task partitionIds are shared across all tasks > assert(messages.length === 2) > messages.foreach(m => println("------- " + m)) > assert(messages.forall(_ == List("0", "1"))) > } {code} > > > before throwing the exception by (assert(messages.forall(_ == List("0", > "1"))), the print log is > > {code:java} > ------- List(, ) > ------- List(, ) {code} > > > You can see, the messages are empty which has been overridden by > context.barrier() API. > > Below is the spark log, > > _22/10/27 17:03:50.236 Executor task launch worker for task 0.0 in stage 0.0 > (TID 1) INFO Executor: Running task 0.0 in stage 0.0 (TID 1)_ > _22/10/27 17:03:50.236 Executor task launch worker for task 1.0 in stage 0.0 > (TID 0) INFO Executor: Running task 1.0 in stage 0.0 (TID 0)_ > _22/10/27 17:03:50.949 Executor task launch worker for task 0.0 in stage 0.0 > (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered > the global sync, current barrier epoch is 0._ > _22/10/27 17:03:50.964 dispatcher-event-loop-1 INFO BarrierCoordinator: > Current barrier epoch for Stage 0 (Attempt 0) is 0._ > _22/10/27 17:03:50.966 dispatcher-event-loop-1 INFO BarrierCoordinator: > Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 1, > current progress: 1/2._ > _22/10/27 17:03:51.436 Executor task launch worker for task 1.0 in stage 0.0 > (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered > the global sync, current barrier epoch is 0._ > _22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: > Current barrier epoch for Stage 0 (Attempt 0) is 0._ > _22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: > Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 0, > current progress: 2/2._ > _22/10/27 17:03:51.440 dispatcher-event-loop-0 INFO BarrierCoordinator: > Barrier sync epoch 0 from Stage 0 (Attempt 0) received all updates from > tasks, finished successfully._ > _22/10/27 17:03:51.958 Executor task launch worker for task 0.0 in stage 0.0 > (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished > global sync successfully, waited for 1 seconds, current barrier epoch is 1._ > _22/10/27 17:03:51.959 Executor task launch worker for task 0.0 in stage 0.0 > (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered > the global sync, current barrier epoch is 1._ > _22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: > Current barrier epoch for Stage 0 (Attempt 0) is 1._ > _22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: > Barrier sync epoch 1 from Stage 0 (Attempt 0) received update from Task 1, > current progress: 1/2._ > _22/10/27 17:03:52.437 Executor task launch worker for task 1.0 in stage 0.0 > (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished > global sync successfully, waited for 1 seconds, current barrier epoch is 1._ > _22/10/27 17:03:52.438 Executor task launch worker for task 1.0 in stage 0.0 > (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered > the global sync, current barrier epoch is 1._ > _22/10/27 17:03:52.438 dispatcher-event-loop-0 INFO BarrierCoordinator: > Current barrier epoch for Stage 0 (Attempt 0) is 1._ > _22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: > Barrier sync epoch 1 from Stage 0 (Attempt 0) received update from Task 0, > current progress: 2/2._ > _22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: > Barrier sync epoch 1 from Stage 0 (Attempt 0) received all updates from > tasks, finished successfully._ > _22/10/27 17:03:52.960 Executor task launch worker for task 0.0 in stage 0.0 > (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished > global sync successfully, waited for 1 seconds, current barrier epoch is 2._ > _22/10/27 17:03:52.972 Executor task launch worker for task 0.0 in stage 0.0 > (TID 1) INFO Executor: Finished task 0.0 in stage 0.0 (TID 1). 1040 bytes > result sent to driver_ > _22/10/27 17:03:52.974 dispatcher-event-loop-1 INFO TaskSchedulerImpl: Skip > current round of resource offers for barrier stage 0 because the barrier > taskSet requires 2 slots, while the total number of available slots is 1._ > _22/10/27 17:03:52.976 task-result-getter-0 INFO TaskSetManager: Finished > task 0.0 in stage 0.0 (TID 1) in 2762 ms on 192.168.31.236 (executor driver) > (1/2)_ > _22/10/27 17:03:53.439 Executor task launch worker for task 1.0 in stage 0.0 > (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished > global sync successfully, waited for 1 seconds, current barrier epoch is 2._ > _22/10/27 17:03:53.445 Executor task launch worker for task 1.0 in stage 0.0 > (TID 0) INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). 1040 bytes > result sent to driver_ > > After debugging, I found the [object > messages|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala#L102] > (Array[String]) returning to BarrierTaskContext are the same as the > [original > messages|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala#L107] > > I will file a PR for this issue -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org