[jira] [Updated] (SPARK-46957) Migrated shuffle data files from the decommissioned node should be removed when job completed
[ https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-46957: --- Labels: pull-request-available (was: ) > Migrated shuffle data files from the decommissioned node should be removed > when job completed > - > > Key: SPARK-46957 > URL: https://issues.apache.org/jira/browse/SPARK-46957 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Yu-Jhe Li >Assignee: wuyi >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0, 3.5.2, 3.4.4 > > > Hi, we have a long-lived Spark application run on a standalone cluster on GCP > and we are using spot instances. To reduce the impact of preempted instances, > we have enabled node decommission to let the preempted node migrate its > shuffle data to other instances before it is deleted by GCP. > However, we found the migrated shuffle data from the decommissioned node is > never removed. (same behavior on spark-3.5) > *Reproduce steps:* > 1. Start spark-shell with 3 executors and enable decommission on both > driver/worker > {code:java} > start-worker.sh[3331]: Spark Command: > /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp > /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 > -Dspark.decommission.enabled=true -Xmx1g > org.apache.spark.deploy.worker.Worker --webui-port 8081 > spark://master-01.com:7077 {code} > {code:java} > /opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \ > --total-executor-cores 12 \ > --conf spark.decommission.enabled=true \ > --conf spark.storage.decommission.enabled=true \ > --conf spark.storage.decommission.shuffleBlocks.enabled=true \ > --conf spark.storage.decommission.rddBlocks.enabled=true{code} > > 2. Manually stop 1 worker during execution > {code:java} > (1 to 10).foreach { i => > println(s"start iter $i ...") > val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. > Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac > risus vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, > non laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat > volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. > Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim > tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus > nec sodales tempor, odio augue euismod ipsum, nec tristique e" > val df = (1 to 1 * i).map(j => (j, s"${j}_${longString}")).toDF("id", > "mystr") > df.repartition(6).count() > System.gc() > println(s"finished iter $i, wait 15s for next round") > Thread.sleep(15*1000) > } > System.gc() > start iter 1 ... > finished iter 1, wait 15s for next round > ... {code} > > 3. Check the migrated shuffle data files on the remaining workers > {*}decommissioned node{*}: migrated shuffle file successfully > {code:java} > less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated ' > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code} > {*}remaining shuffle data files on the other workers{*}: the migrated shuffle > files are never removed > {code:java} > 10.67.5.134 | CHANGED | rc=0 >> > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data > -rw-r--r-- 1 spark spark 32 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data > 10.67.5.139 | CHANGED | rc=0 >> > -rw-r--r-- 1
[jira] [Updated] (SPARK-46957) Migrated shuffle data files from the decommissioned node should be removed when job completed
[ https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kent Yao updated SPARK-46957: - Fix Version/s: 3.5.2 3.4.4 > Migrated shuffle data files from the decommissioned node should be removed > when job completed > - > > Key: SPARK-46957 > URL: https://issues.apache.org/jira/browse/SPARK-46957 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Yu-Jhe Li >Assignee: wuyi >Priority: Major > Fix For: 4.0.0, 3.5.2, 3.4.4 > > > Hi, we have a long-lived Spark application run on a standalone cluster on GCP > and we are using spot instances. To reduce the impact of preempted instances, > we have enabled node decommission to let the preempted node migrate its > shuffle data to other instances before it is deleted by GCP. > However, we found the migrated shuffle data from the decommissioned node is > never removed. (same behavior on spark-3.5) > *Reproduce steps:* > 1. Start spark-shell with 3 executors and enable decommission on both > driver/worker > {code:java} > start-worker.sh[3331]: Spark Command: > /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp > /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 > -Dspark.decommission.enabled=true -Xmx1g > org.apache.spark.deploy.worker.Worker --webui-port 8081 > spark://master-01.com:7077 {code} > {code:java} > /opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \ > --total-executor-cores 12 \ > --conf spark.decommission.enabled=true \ > --conf spark.storage.decommission.enabled=true \ > --conf spark.storage.decommission.shuffleBlocks.enabled=true \ > --conf spark.storage.decommission.rddBlocks.enabled=true{code} > > 2. Manually stop 1 worker during execution > {code:java} > (1 to 10).foreach { i => > println(s"start iter $i ...") > val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. > Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac > risus vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, > non laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat > volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. > Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim > tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus > nec sodales tempor, odio augue euismod ipsum, nec tristique e" > val df = (1 to 1 * i).map(j => (j, s"${j}_${longString}")).toDF("id", > "mystr") > df.repartition(6).count() > System.gc() > println(s"finished iter $i, wait 15s for next round") > Thread.sleep(15*1000) > } > System.gc() > start iter 1 ... > finished iter 1, wait 15s for next round > ... {code} > > 3. Check the migrated shuffle data files on the remaining workers > {*}decommissioned node{*}: migrated shuffle file successfully > {code:java} > less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated ' > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code} > {*}remaining shuffle data files on the other workers{*}: the migrated shuffle > files are never removed > {code:java} > 10.67.5.134 | CHANGED | rc=0 >> > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data > -rw-r--r-- 1 spark spark 32 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data > 10.67.5.139 | CHANGED | rc=0 >> > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/spark-ab5
[jira] [Updated] (SPARK-46957) Migrated shuffle data files from the decommissioned node should be removed when job completed
[ https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kent Yao updated SPARK-46957: - Fix Version/s: (was: 3.5.2) (was: 3.4.4) > Migrated shuffle data files from the decommissioned node should be removed > when job completed > - > > Key: SPARK-46957 > URL: https://issues.apache.org/jira/browse/SPARK-46957 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Yu-Jhe Li >Assignee: wuyi >Priority: Major > Fix For: 4.0.0 > > > Hi, we have a long-lived Spark application run on a standalone cluster on GCP > and we are using spot instances. To reduce the impact of preempted instances, > we have enabled node decommission to let the preempted node migrate its > shuffle data to other instances before it is deleted by GCP. > However, we found the migrated shuffle data from the decommissioned node is > never removed. (same behavior on spark-3.5) > *Reproduce steps:* > 1. Start spark-shell with 3 executors and enable decommission on both > driver/worker > {code:java} > start-worker.sh[3331]: Spark Command: > /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp > /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 > -Dspark.decommission.enabled=true -Xmx1g > org.apache.spark.deploy.worker.Worker --webui-port 8081 > spark://master-01.com:7077 {code} > {code:java} > /opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \ > --total-executor-cores 12 \ > --conf spark.decommission.enabled=true \ > --conf spark.storage.decommission.enabled=true \ > --conf spark.storage.decommission.shuffleBlocks.enabled=true \ > --conf spark.storage.decommission.rddBlocks.enabled=true{code} > > 2. Manually stop 1 worker during execution > {code:java} > (1 to 10).foreach { i => > println(s"start iter $i ...") > val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. > Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac > risus vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, > non laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat > volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. > Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim > tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus > nec sodales tempor, odio augue euismod ipsum, nec tristique e" > val df = (1 to 1 * i).map(j => (j, s"${j}_${longString}")).toDF("id", > "mystr") > df.repartition(6).count() > System.gc() > println(s"finished iter $i, wait 15s for next round") > Thread.sleep(15*1000) > } > System.gc() > start iter 1 ... > finished iter 1, wait 15s for next round > ... {code} > > 3. Check the migrated shuffle data files on the remaining workers > {*}decommissioned node{*}: migrated shuffle file successfully > {code:java} > less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated ' > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None) > 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated > migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code} > {*}remaining shuffle data files on the other workers{*}: the migrated shuffle > files are never removed > {code:java} > 10.67.5.134 | CHANGED | rc=0 >> > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data > -rw-r--r-- 1 spark spark 32 Feb 2 08:48 > /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data > 10.67.5.139 | CHANGED | rc=0 >> > -rw-r--r-- 1 spark spark 126 Feb 2 08:48 > /mnt/spark/s
[jira] [Updated] (SPARK-46957) Migrated shuffle data files from the decommissioned node should be removed when job completed
[ https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu-Jhe Li updated SPARK-46957: -- Description: Hi, we have a long-lived Spark application run on a standalone cluster on GCP and we are using spot instances. To reduce the impact of preempted instances, we have enabled node decommission to let the preempted node migrate its shuffle data to other instances before it is deleted by GCP. However, we found the migrated shuffle data from the decommissioned node is never removed. (same behavior on spark-3.5) *Reproduce steps:* 1. Start spark-shell with 3 executors and enable decommission on both driver/worker {code:java} start-worker.sh[3331]: Spark Command: /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 -Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master-01.com:7077 {code} {code:java} /opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \ --total-executor-cores 12 \ --conf spark.decommission.enabled=true \ --conf spark.storage.decommission.enabled=true \ --conf spark.storage.decommission.shuffleBlocks.enabled=true \ --conf spark.storage.decommission.rddBlocks.enabled=true{code} 2. Manually stop 1 worker during execution {code:java} (1 to 10).foreach { i => println(s"start iter $i ...") val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec sodales tempor, odio augue euismod ipsum, nec tristique e" val df = (1 to 1 * i).map(j => (j, s"${j}_${longString}")).toDF("id", "mystr") df.repartition(6).count() System.gc() println(s"finished iter $i, wait 15s for next round") Thread.sleep(15*1000) } System.gc() start iter 1 ... finished iter 1, wait 15s for next round ... {code} 3. Check the migrated shuffle data files on the remaining workers {*}decommissioned node{*}: migrated shuffle file successfully {code:java} less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated ' 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code} {*}remaining shuffle data files on the other workers{*}: the migrated shuffle files are never removed {code:java} 10.67.5.134 | CHANGED | rc=0 >> -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data -rw-r--r-- 1 spark spark 32 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data 10.67.5.139 | CHANGED | rc=0 >> -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data -rw-r--r-- 1 spark spark 32 Feb 2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data {code} *Expected behavior:* The migrated shuffle data files should be removed after job completed was: Hi, we have a long-lived Spark application run on a stan
[jira] [Updated] (SPARK-46957) Migrated shuffle data files from the decommissioned node should be removed when job completed
[ https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu-Jhe Li updated SPARK-46957: -- Description: Hi, we have a long-lived Spark application run on a standalone cluster on GCP and we are using spot instances. To reduce the impact of preempted instances, we have enabled node decommission to let the preempted node migrate its shuffle data to other instances before it is deleted by GCP. However, we found the migrated shuffle data from the decommissioned node is never removed. (same behavior on spark-3.5) *Reproduce steps:* 1. Start spark-shell with 3 executors and enable decommission on both driver/worker {code:java} start-worker.sh[3331]: Spark Command: /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 -Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master-01.com:7077 {code} {code:java} /opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \ --total-executor-cores 12 \ --conf spark.decommission.enabled=true \ --conf spark.storage.decommission.enabled=true \ --conf spark.storage.decommission.shuffleBlocks.enabled=true \ --conf spark.storage.decommission.rddBlocks.enabled=true{code} 2. Manually stop 1 worker during execution {code:java} (1 to 10).foreach { i => println(s"start iter $i ...") val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec sodales tempor, odio augue euismod ipsum, nec tristique e" val df = (1 to 1 * i).map(j => (j, s"${j}_${longString}")).toDF("id", "mystr") df.repartition(6).count() System.gc() println(s"finished iter $i, wait 15s for next round") Thread.sleep(15*1000) } System.gc() start iter 1 ... finished iter 1, wait 15s for next round ... {code} 3. Check the migrated shuffle data files on the remaining workers {*}decommissioned node{*}: migrated shuffle file successfully {code:java} less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated ' 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code} {*}remaining shuffle data files on the other workers{*}: the migrated shuffle files are never removed {code:java} 10.67.5.134 | CHANGED | rc=0 >> -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data -rw-r--r-- 1 spark spark 32 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data 10.67.5.139 | CHANGED | rc=0 >> -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data -rw-r--r-- 1 spark spark 32 Feb 2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/29/shuffle_5_55_0.data {code} was: Hi, we have a long-lived Spark application run on a standalone cluster on GCP and we are using spot instances. To reduce the impact of preempted
[jira] [Updated] (SPARK-46957) Migrated shuffle data files from the decommissioned node should be removed when job completed
[ https://issues.apache.org/jira/browse/SPARK-46957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu-Jhe Li updated SPARK-46957: -- Description: Hi, we have a long-lived Spark application run on a standalone cluster on GCP and we are using spot instances. To reduce the impact of preempted instances, we have enabled node decommission to let the preempted node migrate its shuffle data to other instances before it is deleted by GCP. However, we found the migrated shuffle data from the decommissioned node is never removed. (same behavior on spark-3.5) *Reproduce steps:* 1. Start spark-shell with 3 executors and enable decommission on both driver/worker {noformat} start-worker.sh[3331]: Spark Command: /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 -Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master-01.com:7077 {noformat} {code:java} start-worker.sh[3331]: Spark Command: /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Dspark.worker.cleanup.appDataTtl=1800 -Dspark.decommission.enabled=true -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://master-01.com:7077 {code} {code:java} /opt/spark/bin/spark-shell --master spark://master-01.spark.com:7077 \ --total-executor-cores 12 \ --conf spark.decommission.enabled=true \ --conf spark.storage.decommission.enabled=true \ --conf spark.storage.decommission.shuffleBlocks.enabled=true \ --conf spark.storage.decommission.rddBlocks.enabled=true{code} 2. Manually stop 1 worker during execution {code:java} (1 to 10).foreach { i => println(s"start iter $i ...") val longString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer eget tortor id libero ultricies faucibus nec ac neque. Vivamus ac risus vitae mi efficitur lacinia. Quisque dignissim quam vel tellus placerat, non laoreet elit rhoncus. Nam et magna id dui tempor sagittis. Aliquam erat volutpat. Integer tristique purus ac eros bibendum, at varius velit viverra. Sed eleifend luctus massa, ac accumsan leo feugiat ac. Sed id nisl et enim tristique auctor. Sed vel ante nec leo placerat tincidunt. Ut varius, risus nec sodales tempor, odio augue euismod ipsum, nec tristique e" val df = (1 to 1 * i).map(j => (j, s"${j}_${longString}")).toDF("id", "mystr") df.repartition(6).count() System.gc() println(s"finished iter $i, wait 15s for next round") Thread.sleep(15*1000) } System.gc() start iter 1 ... finished iter 1, wait 15s for next round ... {code} 3. Check the migrated shuffle data files on the remaining workers {*}decommissioned node{*}: migrated shuffle file successfully {code:java} less /mnt/spark_work/app-20240202084807-0003/1/stdout | grep 'Migrated ' 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_41 to BlockManagerId(2, 10.67.5.139, 35949, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_38 to BlockManagerId(0, 10.67.5.134, 36175, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_47 to BlockManagerId(0, 10.67.5.134, 36175, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_4_44 to BlockManagerId(2, 10.67.5.139, 35949, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_5_52 to BlockManagerId(0, 10.67.5.134, 36175, None) 24/02/02 08:48:53 INFO BlockManagerDecommissioner: Migrated migrate_shuffle_5_55 to BlockManagerId(2, 10.67.5.139, 35949, None) {code} {*}remaining shuffle data files on the other workers{*}: the migrated shuffle files are never removed {code:java} 10.67.5.134 | CHANGED | rc=0 >> -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/13/shuffle_4_47_0.data -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/31/shuffle_4_38_0.data -rw-r--r-- 1 spark spark 32 Feb 2 08:48 /mnt/spark/spark-b25878b3-8b3c-4cff-ba4d-41f6d128da7c/executor-b8f83524-9270-4f35-83ca-ceb13af2b7d1/blockmgr-f05c4d8e-e1a5-4822-a6e9-49be760b67a2/3a/shuffle_5_52_0.data 10.67.5.139 | CHANGED | rc=0 >> -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/27/shuffle_4_41_0.data -rw-r--r-- 1 spark spark 126 Feb 2 08:48 /mnt/spark/spark-ab501bec-ddd2-4b82-af3e-f2731066e580/executor-1ca5ad78-1d75-453d-88ab-487d7cdfacb7/blockmgr-f09eb18d-b0e4-48f9-a4ed-5587cef25a16/36/shuffle_4_44_0.data -rw-r--r-- 1 spark spark 32 Feb 2 08:48 /mnt/spark/spar