[ https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856284#comment-17856284 ]
wuyi commented on SPARK-46957: ------------------------------ [~yujhe.li] Thanks for reporting this bug. I'll submit a fix. > Migrated shuffle data files from the decommissioned node should be removed > when job completed > --------------------------------------------------------------------------------------------- > > Key: SPARK-46957 > URL: https://issues.apache.org/jira/browse/SPARK-46957 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.0.0 > Reporter: Yu-Jhe Li > Priority: Major > > Hi, we have a long-lived Spark application run on a standalone cluster on GCP > and we are using spot instances. To reduce the impact of preempted instances, > we have enabled node decommission to let the preempted node migrate its > shuffle data to other instances before it is deleted by GCP. > However, we found the migrated shuffle data from the decommissioned node is > never removed. (same behavior on spark-3.5) > *Reproduce steps:* > 1. Start spark-shell with 3 executors and enable decommission on both > driver/worker > {code:java} > start-worker.sh[3331]: Spark Command: > /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp > /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 > -Dspark.decommission.enabled=true -Xmx1g > org.apache.spark.deploy.worker.Worker --webui-port 8081 > spark://master-01.com:7077 {code} > {code:java} > /opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \ > --total-executor-cores 12 \ > --conf spark.decommission.enabled=true \ > --conf spark.storage.decommission.enabled=true \ > --conf spark.storage.decommission.shuffleBlocks.enabled=true \ > --conf spark.storage.decommission.rddBlocks.enabled=true{code} > > 2. Manually stop 1 worker during execution > {code:java} > (1 to 10).foreach { i => > println(s"start iter $i ...") > val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. > Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac > risus vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, > non laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat > volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. > Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim > tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus > nec sodales tempor, odio augue euismod ipsum, nec tristique e" > val df = (1 to 10000 * i).map(j => (j, s"${j}_${longString}")).toDF("id", > "mystr") > df.repartition(6).count() > System.gc() > println(s"finished iter $i, wait 15s for next round") > Thread.sleep(15*1000) > } > System.gc() > start iter 1 ... > finished iter 1, wait 15s for next round > ... {code} > > 3. Check the migrated shuffle data files on the remaining workers > {*}decommissioned node{*}: migrated shuffle file successfully > {code:java} > less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated ' > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code} > {*}remaining shuffle data files on the other workers{*}: the migrated shuffle > files are never removed > {code:java} > 10.67.5.134 | CHANGED | rc=0 >> > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data > -rw-r--r-- 1 spark spark 32 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data > 10.67.5.139 | CHANGED | rc=0 >> > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data > -rw-r--r-- 1 spark spark 32 Feb 2 08:48 > /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data > {code} > > *Expected behavior:* > The migrated shuffle data files should be removed after job completed -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org