Yu-Jhe Li created SPARK-46957:
---------------------------------

             Summary: Migrated shuffle data files from the decommissioned node 
should be removed when job completed
                 Key: SPARK-46957
                 URL: https://issues.apache.org/jira/browse/SPARK-46957
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.0.0
            Reporter: Yu-Jhe Li


Hi, we have a long-lived Spark application run on a standalone cluster on GCP 
and we are using spot instances. To reduce the impact of preempted instances, 
we have enabled node decommission to let the preempted node migrate its shuffle 
data to other instances before it is deleted by GCP.

However, we found the migrated shuffle data from the decommissioned node is 
never removed. (same behavior on spark-3.5)

*Reproduce steps:*

1. Start spark-shell with 3 executors and enable decommission on both 
driver/worker

 
{code:java}
start-worker.sh[3331]: Spark Command: 
/usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp 
/opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 
-Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://master-01.com:7077 {code}
 

 
{code:java}
/opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \
  --total-executor-cores 12 \
  --conf spark.decommission.enabled=true \
  --conf spark.storage.decommission.enabled=true \
  --conf spark.storage.decommission.shuffleBlocks.enabled=true \
  --conf spark.storage.decommission.rddBlocks.enabled=true{code}
 

2. Manually stop 1 worker during execution

 
{code:java}
(1 to 10).foreach { i =>
  println(s"start iter $i ...")
  val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus 
vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non 
laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat 
volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. 
Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim 
tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec 
sodales tempor, odio augue euismod ipsum, nec tristique e"
  val df = (1 to 10000 * i).map(j => (j, s"${j}_${longString}")).toDF("id", 
"mystr")

  df.repartition(6).count()
  System.gc()
  println(s"finished iter $i, wait 15s for next round")
  Thread.sleep(15*1000)
}
System.gc()

start iter 1 ...
finished iter 1, wait 15s for next round
... {code}
 

3. Check the migrated shuffle data files on the remaining workers

{*}decommissioned node{*}: migrated shuffle file successfully
{code:java}
less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated '
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code}
{*}remaining shuffle data files on the other workers{*}: the migrated shuffle 
files are never removed

 
{code:java}
10.67.5.134 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data
10.67.5.139 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data
 {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to