[ 
https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated SPARK-46957:
------------------------------
    Description: 
Hi, we have a long-lived Spark application run on a standalone cluster on GCP 
and we are using spot instances. To reduce the impact of preempted instances, 
we have enabled node decommission to let the preempted node migrate its shuffle 
data to other instances before it is deleted by GCP.

However, we found the migrated shuffle data from the decommissioned node is 
never removed. (same behavior on spark-3.5)

*Reproduce steps:*

1. Start spark-shell with 3 executors and enable decommission on both 
driver/worker
{code:java}
start-worker.sh[3331]: Spark Command: 
/usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp 
/opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 
-Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://master-01.com:7077 {code}
{code:java}
/opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \
  --total-executor-cores 12 \
  --conf spark.decommission.enabled=true \
  --conf spark.storage.decommission.enabled=true \
  --conf spark.storage.decommission.shuffleBlocks.enabled=true \
  --conf spark.storage.decommission.rddBlocks.enabled=true{code}
 

2. Manually stop 1 worker during execution
{code:java}
(1 to 10).foreach { i =>
  println(s"start iter $i ...")
  val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus 
vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non 
laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat 
volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. 
Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim 
tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec 
sodales tempor, odio augue euismod ipsum, nec tristique e"
  val df = (1 to 10000 * i).map(j => (j, s"${j}_${longString}")).toDF("id", 
"mystr")

  df.repartition(6).count()
  System.gc()
  println(s"finished iter $i, wait 15s for next round")
  Thread.sleep(15*1000)
}
System.gc()

start iter 1 ...
finished iter 1, wait 15s for next round
... {code}
 

3. Check the migrated shuffle data files on the remaining workers

{*}decommissioned node{*}: migrated shuffle file successfully
{code:java}
less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated '
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code}
{*}remaining shuffle data files on the other workers{*}: the migrated shuffle 
files are never removed
{code:java}
10.67.5.134 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data
10.67.5.139 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data
 {code}
 

 

  was:
Hi, we have a long-lived Spark application run on a standalone cluster on GCP 
and we are using spot instances. To reduce the impact of preempted instances, 
we have enabled node decommission to let the preempted node migrate its shuffle 
data to other instances before it is deleted by GCP.

However, we found the migrated shuffle data from the decommissioned node is 
never removed. (same behavior on spark-3.5)

*Reproduce steps:*

1. Start spark-shell with 3 executors and enable decommission on both 
driver/worker
{noformat}
start-worker.sh[3331]: Spark Command: 
/usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp 
/opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 
-Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://master-01.com:7077 {noformat}
{code:java}
start-worker.sh[3331]: Spark Command: 
/usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp 
/opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 
-Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker 
--webui-port 8081 spark://master-01.com:7077 {code}
 

 
{code:java}
/opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \
  --total-executor-cores 12 \
  --conf spark.decommission.enabled=true \
  --conf spark.storage.decommission.enabled=true \
  --conf spark.storage.decommission.shuffleBlocks.enabled=true \
  --conf spark.storage.decommission.rddBlocks.enabled=true{code}
 

2. Manually stop 1 worker during execution

 
{code:java}
(1 to 10).foreach { i =>
  println(s"start iter $i ...")
  val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus 
vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non 
laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat 
volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. 
Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim 
tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec 
sodales tempor, odio augue euismod ipsum, nec tristique e"
  val df = (1 to 10000 * i).map(j => (j, s"${j}_${longString}")).toDF("id", 
"mystr")

  df.repartition(6).count()
  System.gc()
  println(s"finished iter $i, wait 15s for next round")
  Thread.sleep(15*1000)
}
System.gc()

start iter 1 ...
finished iter 1, wait 15s for next round
... {code}
 

3. Check the migrated shuffle data files on the remaining workers

{*}decommissioned node{*}: migrated shuffle file successfully
{code:java}
less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated '
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None)
24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code}
{*}remaining shuffle data files on the other workers{*}: the migrated shuffle 
files are never removed

 
{code:java}
10.67.5.134 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data
10.67.5.139 | CHANGED | rc=0 >>
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data
-rw-r--r-- 1 spark spark 126 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data
-rw-r--r-- 1 spark spark 32 Feb  2 08:48 
/mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data
 {code}
 

 


> Migrated shuffle data files from the decommissioned node should be removed 
> when job completed
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-46957
>                 URL: https://issues.apache.org/jira/browse/SPARK-46957
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Yu-Jhe Li
>            Priority: Major
>
> Hi, we have a long-lived Spark application run on a standalone cluster on GCP 
> and we are using spot instances. To reduce the impact of preempted instances, 
> we have enabled node decommission to let the preempted node migrate its 
> shuffle data to other instances before it is deleted by GCP.
> However, we found the migrated shuffle data from the decommissioned node is 
> never removed. (same behavior on spark-3.5)
> *Reproduce steps:*
> 1. Start spark-shell with 3 executors and enable decommission on both 
> driver/worker
> {code:java}
> start-worker.sh[3331]: Spark Command: 
> /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp 
> /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 
> -Dspark.decommission.enabled=true -Xmx1g 
> org.apache.spark.deploy.worker.Worker --webui-port 8081 
> spark://master-01.com:7077 {code}
> {code:java}
> /opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \
>   --total-executor-cores 12 \
>   --conf spark.decommission.enabled=true \
>   --conf spark.storage.decommission.enabled=true \
>   --conf spark.storage.decommission.shuffleBlocks.enabled=true \
>   --conf spark.storage.decommission.rddBlocks.enabled=true{code}
>  
> 2. Manually stop 1 worker during execution
> {code:java}
> (1 to 10).foreach { i =>
>   println(s"start iter $i ...")
>   val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
> Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac 
> risus vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, 
> non laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat 
> volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. 
> Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim 
> tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus 
> nec sodales tempor, odio augue euismod ipsum, nec tristique e"
>   val df = (1 to 10000 * i).map(j => (j, s"${j}_${longString}")).toDF("id", 
> "mystr")
>   df.repartition(6).count()
>   System.gc()
>   println(s"finished iter $i, wait 15s for next round")
>   Thread.sleep(15*1000)
> }
> System.gc()
> start iter 1 ...
> finished iter 1, wait 15s for next round
> ... {code}
>  
> 3. Check the migrated shuffle data files on the remaining workers
> {*}decommissioned node{*}: migrated shuffle file successfully
> {code:java}
> less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated '
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None)
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None)
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None)
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None)
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None)
> 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated 
> migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code}
> {*}remaining shuffle data files on the other workers{*}: the migrated shuffle 
> files are never removed
> {code:java}
> 10.67.5.134 | CHANGED | rc=0 >>
> -rw-r--r-- 1 spark spark 126 Feb  2 08:48 
> /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data
> -rw-r--r-- 1 spark spark 126 Feb  2 08:48 
> /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data
> -rw-r--r-- 1 spark spark 32 Feb  2 08:48 
> /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data
> 10.67.5.139 | CHANGED | rc=0 >>
> -rw-r--r-- 1 spark spark 126 Feb  2 08:48 
> /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data
> -rw-r--r-- 1 spark spark 126 Feb  2 08:48 
> /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data
> -rw-r--r-- 1 spark spark 32 Feb  2 08:48 
> /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to