This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 535263b28e4f36b7575f487bcf9967d9d42bffdd
Author: baunsgaard <[email protected]>
AuthorDate: Sat Sep 25 19:54:01 2021 +0200

    [SYSTEMDS-3144] Spark Local Context from command line
    
    This commit adds the ability to start a systemDS instance with a
    local spark context, this enables us to use our spark instructions
    even without a spark cluster.
    
    Also added in this commit is a fallback to our a local spark instance,
    in case the spark context is tried to be created but fails.
    
    Closes #1398
    Closes #1399
---
 src/main/java/org/apache/sysds/conf/DMLConfig.java |  4 +-
 .../context/SparkExecutionContext.java             | 50 ++++++++++++++--------
 2 files changed, 36 insertions(+), 18 deletions(-)

diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java 
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index a59101b..6194bc2 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -89,7 +89,8 @@ public class DMLConfig
        public static final String SYNCHRONIZE_GPU      = 
"sysds.gpu.sync.postProcess"; // boolean: whether to synchronize GPUs after 
every instruction
        public static final String EAGER_CUDA_FREE      = 
"sysds.gpu.eager.cudaFree"; // boolean: whether to perform eager CUDA free on 
rmvar
        public static final String GPU_EVICTION_POLICY  = 
"sysds.gpu.eviction.policy"; // string: can be lru, lfu, min_evict
-       public static final String LOCAL_SPARK_NUM_THREADS = 
"sysds.local.spark.number.threads";
+       public static final String USE_LOCAL_SPARK_CONFIG = 
"sysds.local.spark"; // If set to true, it forces spark execution to a local 
spark context.
+       public static final String LOCAL_SPARK_NUM_THREADS = 
"sysds.local.spark.number.threads"; // the number of threads allowed to be used 
in the local spark configuration, default is * to enable use of all threads.
        public static final String LINEAGECACHESPILL    = 
"sysds.lineage.cachespill"; // boolean: whether to spill cache entries to disk
        public static final String COMPILERASSISTED_RW  = 
"sysds.lineage.compilerassisted"; // boolean: whether to apply compiler 
assisted rewrites
        
@@ -152,6 +153,7 @@ public class DMLConfig
                _defaultVals.put(GPU_MEMORY_ALLOCATOR,   "cuda");
                _defaultVals.put(AVAILABLE_GPUS,         "-1");
                _defaultVals.put(GPU_EVICTION_POLICY,    "min_evict");
+               _defaultVals.put(USE_LOCAL_SPARK_CONFIG, "false");
                _defaultVals.put(LOCAL_SPARK_NUM_THREADS, "*"); // * Means it 
allocates the number of available threads on the local host machine.
                _defaultVals.put(SYNCHRONIZE_GPU,        "false" );
                _defaultVals.put(EAGER_CUDA_FREE,        "false" );
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index ca3f69c..67efd5c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -42,6 +42,7 @@ import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.storage.RDDInfo;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.util.LongAccumulator;
+import org.apache.sysds.api.DMLException;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.api.mlcontext.MLContext;
 import org.apache.sysds.api.mlcontext.MLContextUtil;
@@ -213,23 +214,15 @@ public class SparkExecutionContext extends 
ExecutionContext
                }
                else
                {
-                       if(DMLScript.USE_LOCAL_SPARK_CONFIG) {
-                               // For now set 4 cores for integration testing 
:)
-                               SparkConf conf = createSystemDSSparkConf()
-                                               .setMaster("local[" +
-                                                       
ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS)+
-                                                       "]").setAppName("My 
local integration test app");
-                               // This is discouraged in spark but have added 
only for those testcase that cannot stop the context properly
-                               // 
conf.set("spark.driver.allowMultipleContexts", "true");
-                               conf.set("spark.ui.enabled", "false");
-                               _spctx = new JavaSparkContext(conf);
-                       }
-                       else //default cluster setup
-                       {
-                               //setup systemds-preferred spark configuration 
(w/o user choice)
-                               SparkConf conf = createSystemDSSparkConf();
-                               _spctx = new JavaSparkContext(conf);
-                       }
+                       final SparkConf conf = createSystemDSSparkConf();
+                       final DMLConfig dmlConfig= 
ConfigurationManager.getDMLConfig();
+                       // Use Spark local config, if already set to True ... 
keep true, otherwise look up if it should be local.
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = 
DMLScript.USE_LOCAL_SPARK_CONFIG ? true : 
dmlConfig.getBooleanValue(DMLConfig.USE_LOCAL_SPARK_CONFIG);
+                       
+                       if(DMLScript.USE_LOCAL_SPARK_CONFIG)
+                               setLocalConfSettings(conf);
+                       
+                       _spctx = createContext(conf);
 
                        _parRDDs.clear();
                }
@@ -253,6 +246,29 @@ public class SparkExecutionContext extends ExecutionContext
                }
        }
 
+
+       private static JavaSparkContext createContext(SparkConf conf){
+               try{
+                       return new JavaSparkContext(conf);
+               } 
+               catch(Exception e){
+                       if(e.getMessage().contains("A master URL must be set in 
your configuration")){
+                               LOG.error("Error constructing Spark Context, 
falling back to local Spark context creation");
+                               setLocalConfSettings(conf);
+                               return createContext(conf);
+                       }
+                       else
+                               throw new DMLException("Error while creating 
Spark context", e);
+               }
+       }
+
+       private static void setLocalConfSettings(SparkConf conf){
+               final String threads = 
ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS);
+               conf.setMaster("local[" + threads + "]");
+               conf.setAppName("LocalSparkContextApp");
+               conf.set("spark.ui.enabled", "false");
+       }
+
        /**
         * Sets up a SystemDS-preferred Spark configuration based on the 
implicit
         * default configuration (as passed via configurations from outside).

Reply via email to