[ 
https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856284#comment-17856284
 ] 

wuyi commented on SPARK-46957:
------------------------------

[~yujhe.li] Thanks for reporting this bug. I'll submit a fix.

> Migrated shuffle data files from the decommissioned node should be removed 
> when job completed
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-46957
>                 URL: https://issues.apache.org/jira/browse/SPARK-46957
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Yu-Jhe Li
>            Priority: Major
>
> Hi, we have a long-lived Spark application run on a standalone cluster on GCP 
> and we are using spot instances. To reduce the impact of preempted instances, 
> we have enabled node decommission to let the preempted node migrate its 
> shuffle data to other instances before it is deleted by GCP.
> However, we found the migrated shuffle data from the decommissioned node is 
> never removed. (same behavior on spark-3.5)
> *Reproduce steps:*
> 1. Start spark-shell with 3 executors and enable decommission on both 
> driver/worker
> {code:java}
> start-worker.sh[3331]: Spark Command: 
> /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp 
> /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 
> -Dspark.decommission.enabled=true -Xmx1g 
> org.apache.spark.deploy.worker.Worker --webui-port 8081 
> spark://master-01.com:7077 {code}
> {code:java}
> /opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \
>   --total-executor-cores 12 \
>   --conf spark.decommission.enabled=true \
>   --conf spark.storage.decommission.enabled=true \
>   --conf spark.storage.decommission.shuffleBlocks.enabled=true \
>   --conf spark.storage.decommission.rddBlocks.enabled=true{code}
>  
> 2. Manually stop 1 worker during execution
> {code:java}
> (1 to 10).foreach { i =>
>   println(s"start iter $i ...")
>   val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
> Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac 
> risus vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, 
> non laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat 
> volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. 
> Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim 
> tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus 
> nec sodales tempor, odio augue euismod ipsum, nec tristique e"
>   val df = (1 to 10000 * i).map(j => (j, s"${j}_${longString}")).toDF("id", 
> "mystr")
>   df.repartition(6).count()
>   System.gc()
>   println(s"finished iter $i, wait 15s for next round")
>   Thread.sleep(15*1000)
> }
> System.gc()
> start iter 1 ...
> finished iter 1, wait 15s for next round
> ... {code}
>  
> 3. Check the migrated shuffle data files on the remaining workers
> {*}decommissioned node{*}: migrated shuffle file successfully
> {code:java}
> less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated '
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None)
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None)
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None)
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None)
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None)
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code}
> {*}remaining shuffle data files on the other workers{*}: the migrated shuffle 
> files are never removed
> {code:java}
> 10.67.5.134 | CHANGED | rc=0 >>
> -rw-r--r-- 1 spark spark 126 Feb  2 08:48 
> /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data
> -rw-r--r-- 1 spark spark 126 Feb  2 08:48 
> /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data
> -rw-r--r-- 1 spark spark 32 Feb  2 08:48 
> /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data
> 10.67.5.139 | CHANGED | rc=0 >>
> -rw-r--r-- 1 spark spark 126 Feb  2 08:48 
> /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data
> -rw-r--r-- 1 spark spark 126 Feb  2 08:48 
> /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data
> -rw-r--r-- 1 spark spark 32 Feb  2 08:48 
> /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data
>  {code}
>  
> *Expected behavior:*
> The migrated shuffle data files should be removed after job completed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to