This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ca25534  [SPARK-37509][CORE] Improve Fallback Storage upload speed by 
avoiding S3 rate limiter
ca25534 is described below

commit ca2553443977264e2e897006dc729ba61147829f
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Tue Nov 30 15:03:00 2021 -0800

    [SPARK-37509][CORE] Improve Fallback Storage upload speed by avoiding S3 
rate limiter
    
    ### What changes were proposed in this pull request?
    
    This PR aims to improve `Fallback Storage` upload speed by randomizing the 
path in order to avoid S3 rate limiter.
    
    ### Why are the changes needed?
    
    Currently, `Fallback Storage` is using `a single prefix per shuffle`. This 
PR aims to randomize the upload prefixes even in a single shuffle to avoid S3 
rate limiter.
    - 
https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is used internally during the runtime.
    
    ### How was this patch tested?
    
    Pass the CIs to verify read and write operations. To check the layout, 
check the uploaded path manually with the following configs.
    
    ```
    spark.decommission.enabled                          true
    spark.storage.decommission.enabled                  true
    spark.storage.decommission.shuffleBlocks.enabled    true
    spark.storage.decommission.fallbackStorage.path     file:///tmp/fallback/
    ```
    
    Start one master and worker. Connect with `spark-shell` and generate 
shuffle data.
    ```
    scala> sc.parallelize(1 to 11, 10).map(x => (x % 3, 1)).reduceByKey(_ + 
_).count()
    res0: Long = 3
    ```
    
    Invoke decommission and check. Since we have only one worker, the shuffle 
data go to the fallback storage directly.
    ```
    $ kill -PWR <CoarseGrainedExecutorBackend JVM PID>
    $ tree /tmp/fallback
    /tmp/fallback
    └── app-20211130135922-0001
        └── 0
            ├── 103417883
            │   └── shuffle_0_7_0.data
            ├── 1036881592
            │   └── shuffle_0_4_0.data
            ├── 1094002679
            │   └── shuffle_0_7_0.index
            ├── 1393510154
            │   └── shuffle_0_6_0.index
            ├── 1515275369
            │   └── shuffle_0_3_0.data
            ├── 1541340402
            │   └── shuffle_0_2_0.index
            ├── 1639392452
            │   └── shuffle_0_8_0.data
            ├── 1774061049
            │   └── shuffle_0_9_0.index
            ├── 1846228218
            │   └── shuffle_0_6_0.data
            ├── 1970345301
            │   └── shuffle_0_1_0.data
            ├── 2073568524
            │   └── shuffle_0_4_0.index
            ├── 227534966
            │   └── shuffle_0_2_0.data
            ├── 266114061
            │   └── shuffle_0_3_0.index
            ├── 413944309
            │   └── shuffle_0_5_0.index
            ├── 581811660
            │   └── shuffle_0_0_0.data
            ├── 705928743
            │   └── shuffle_0_5_0.data
            ├── 713451784
            │   └── shuffle_0_8_0.index
            ├── 861282032
            │   └── shuffle_0_0_0.index
            ├── 912764509
            │   └── shuffle_0_9_0.data
            └── 946172431
                └── shuffle_0_1_0.index
    ```
    
    Closes #34762 from dongjoon-hyun/SPARK-37509.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../scala/org/apache/spark/storage/FallbackStorage.scala | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala 
b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index 7613713..d137099 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -31,6 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import 
org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP,
 STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
 import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
 import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
@@ -60,15 +61,17 @@ private[storage] class FallbackStorage(conf: SparkConf) 
extends Logging {
         val indexFile = r.getIndexFile(shuffleId, mapId)
 
         if (indexFile.exists()) {
+          val hash = JavaUtils.nonNegativeHash(indexFile.getName)
           fallbackFileSystem.copyFromLocalFile(
             new Path(indexFile.getAbsolutePath),
-            new Path(fallbackPath, s"$appId/$shuffleId/${indexFile.getName}"))
+            new Path(fallbackPath, 
s"$appId/$shuffleId/$hash/${indexFile.getName}"))
 
           val dataFile = r.getDataFile(shuffleId, mapId)
           if (dataFile.exists()) {
+            val hash = JavaUtils.nonNegativeHash(dataFile.getName)
             fallbackFileSystem.copyFromLocalFile(
               new Path(dataFile.getAbsolutePath),
-              new Path(fallbackPath, s"$appId/$shuffleId/${dataFile.getName}"))
+              new Path(fallbackPath, 
s"$appId/$shuffleId/$hash/${dataFile.getName}"))
           }
 
           // Report block statuses
@@ -86,7 +89,8 @@ private[storage] class FallbackStorage(conf: SparkConf) 
extends Logging {
   }
 
   def exists(shuffleId: Int, filename: String): Boolean = {
-    fallbackFileSystem.exists(new Path(fallbackPath, 
s"$appId/$shuffleId/$filename"))
+    val hash = JavaUtils.nonNegativeHash(filename)
+    fallbackFileSystem.exists(new Path(fallbackPath, 
s"$appId/$shuffleId/$hash/$filename"))
   }
 }
 
@@ -168,7 +172,8 @@ private[spark] object FallbackStorage extends Logging {
     }
 
     val name = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
-    val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
+    val hash = JavaUtils.nonNegativeHash(name)
+    val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
     val start = startReduceId * 8L
     val end = endReduceId * 8L
     Utils.tryWithResource(fallbackFileSystem.open(indexFile)) { inputStream =>
@@ -178,7 +183,8 @@ private[spark] object FallbackStorage extends Logging {
         index.skip(end - (start + 8L))
         val nextOffset = index.readLong()
         val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
-        val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
+        val hash = JavaUtils.nonNegativeHash(name)
+        val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
         val f = fallbackFileSystem.open(dataFile)
         val size = nextOffset - offset
         logDebug(s"To byte array $size")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to