This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a13b7a3c8 [SYSTEMDS-3577] Fix default storage memory fractions in 
Spark
4a13b7a3c8 is described below

commit 4a13b7a3c8835eeeda32245b4067ef5a2ed8b363
Author: Arnab Phani <[email protected]>
AuthorDate: Sun May 28 13:25:15 2023 +0200

    [SYSTEMDS-3577] Fix default storage memory fractions in Spark
    
    This patch fixes a bug in analyzing Spark configurations for memory.
    The current code wrongly assumes the storage fraction relative to the
    heap, instead of the unified memory (execution + storage).
    
    Closes #1835
---
 .../context/SparkExecutionContext.java              | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index ce7b43972d..371ec3db51 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1804,9 +1804,10 @@ public class SparkExecutionContext extends 
ExecutionContext
         */
        public static class SparkClusterConfig
        {
-               //broadcasts are stored in mem-and-disk in data space, this 
config
-               //defines the fraction of data space to be used as broadcast 
budget
-               private static final double BROADCAST_DATA_FRACTION = 0.35;
+               //broadcasts are stored in mem-and-disk in storage space, this 
config
+               //defines the fraction of min storage space to be used as 
broadcast budget
+               private static final double BROADCAST_DATA_FRACTION = 0.70;
+               private static final double BROADCAST_DATA_FRACTION_LEGACY = 
0.35;
 
                //forward private config from Spark's 
UnifiedMemoryManager.scala (>1.6)
                private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 
1024 * 1024;
@@ -1894,7 +1895,7 @@ public class SparkExecutionContext extends 
ExecutionContext
                        double dataFrac = 
sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60%
                        _memDataMinFrac = dataFrac;
                        _memDataMaxFrac = dataFrac;
-                       _memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION; 
//default 18%
+                       _memBroadcastFrac = dataFrac * 
BROADCAST_DATA_FRACTION_LEGACY; //default 18%
 
                        //analyze spark degree of parallelism
                        analyzeSparkParallelismConfiguation(sconf);
@@ -1910,10 +1911,14 @@ public class SparkExecutionContext extends 
ExecutionContext
                                        - RESERVED_SYSTEM_MEMORY_BYTES;
 
                        //get data and shuffle memory ratios (defaults not 
specified in job conf)
-                       _memDataMinFrac = 
sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50%
-                       _memDataMaxFrac = 
sconf.getDouble("spark.memory.fraction", 0.6); //default 60%
-                       _memBroadcastFrac = _memDataMaxFrac * 
BROADCAST_DATA_FRACTION; //default 21%
-                       
+                       //first get the unified memory fraction (60%) 
comprising execution and storage
+                       double unifiedMem = 
sconf.getDouble("spark.memory.fraction", 0.6);
+                       //minimum default storage expressed as 50% of unified 
memory (= 30% of heap)
+                       _memDataMinFrac = unifiedMem * 
sconf.getDouble("spark.memory.storageFraction", 0.5);
+                       //storage memory can expand and take up the full 
unified memory
+                       _memDataMaxFrac = unifiedMem;
+                       //Heuristic-based broadcast fraction (70% of min 
storage = 21% of heap)
+                       _memBroadcastFrac = _memDataMinFrac * 
BROADCAST_DATA_FRACTION;
                        //analyze spark degree of parallelism
                        analyzeSparkParallelismConfiguation(sconf);
                }

Reply via email to