This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 535263b28e4f36b7575f487bcf9967d9d42bffdd Author: baunsgaard <[email protected]> AuthorDate: Sat Sep 25 19:54:01 2021 +0200 [SYSTEMDS-3144] Spark Local Context from command line This commit adds the ability to start a systemDS instance with a local spark context, this enables us to use our spark instructions even without a spark cluster. Also added in this commit is a fallback to our a local spark instance, in case the spark context is tried to be created but fails. Closes #1398 Closes #1399 --- src/main/java/org/apache/sysds/conf/DMLConfig.java | 4 +- .../context/SparkExecutionContext.java | 50 ++++++++++++++-------- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index a59101b..6194bc2 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -89,7 +89,8 @@ public class DMLConfig public static final String SYNCHRONIZE_GPU = "sysds.gpu.sync.postProcess"; // boolean: whether to synchronize GPUs after every instruction public static final String EAGER_CUDA_FREE = "sysds.gpu.eager.cudaFree"; // boolean: whether to perform eager CUDA free on rmvar public static final String GPU_EVICTION_POLICY = "sysds.gpu.eviction.policy"; // string: can be lru, lfu, min_evict - public static final String LOCAL_SPARK_NUM_THREADS = "sysds.local.spark.number.threads"; + public static final String USE_LOCAL_SPARK_CONFIG = "sysds.local.spark"; // If set to true, it forces spark execution to a local spark context. + public static final String LOCAL_SPARK_NUM_THREADS = "sysds.local.spark.number.threads"; // the number of threads allowed to be used in the local spark configuration, default is * to enable use of all threads. public static final String LINEAGECACHESPILL = "sysds.lineage.cachespill"; // boolean: whether to spill cache entries to disk public static final String COMPILERASSISTED_RW = "sysds.lineage.compilerassisted"; // boolean: whether to apply compiler assisted rewrites @@ -152,6 +153,7 @@ public class DMLConfig _defaultVals.put(GPU_MEMORY_ALLOCATOR, "cuda"); _defaultVals.put(AVAILABLE_GPUS, "-1"); _defaultVals.put(GPU_EVICTION_POLICY, "min_evict"); + _defaultVals.put(USE_LOCAL_SPARK_CONFIG, "false"); _defaultVals.put(LOCAL_SPARK_NUM_THREADS, "*"); // * Means it allocates the number of available threads on the local host machine. _defaultVals.put(SYNCHRONIZE_GPU, "false" ); _defaultVals.put(EAGER_CUDA_FREE, "false" ); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java index ca3f69c..67efd5c 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java @@ -42,6 +42,7 @@ import org.apache.spark.broadcast.Broadcast; import org.apache.spark.storage.RDDInfo; import org.apache.spark.storage.StorageLevel; import org.apache.spark.util.LongAccumulator; +import org.apache.sysds.api.DMLException; import org.apache.sysds.api.DMLScript; import org.apache.sysds.api.mlcontext.MLContext; import org.apache.sysds.api.mlcontext.MLContextUtil; @@ -213,23 +214,15 @@ public class SparkExecutionContext extends ExecutionContext } else { - if(DMLScript.USE_LOCAL_SPARK_CONFIG) { - // For now set 4 cores for integration testing :) - SparkConf conf = createSystemDSSparkConf() - .setMaster("local[" + - ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS)+ - "]").setAppName("My local integration test app"); - // This is discouraged in spark but have added only for those testcase that cannot stop the context properly - // conf.set("spark.driver.allowMultipleContexts", "true"); - conf.set("spark.ui.enabled", "false"); - _spctx = new JavaSparkContext(conf); - } - else //default cluster setup - { - //setup systemds-preferred spark configuration (w/o user choice) - SparkConf conf = createSystemDSSparkConf(); - _spctx = new JavaSparkContext(conf); - } + final SparkConf conf = createSystemDSSparkConf(); + final DMLConfig dmlConfig= ConfigurationManager.getDMLConfig(); + // Use Spark local config, if already set to True ... keep true, otherwise look up if it should be local. + DMLScript.USE_LOCAL_SPARK_CONFIG = DMLScript.USE_LOCAL_SPARK_CONFIG ? true : dmlConfig.getBooleanValue(DMLConfig.USE_LOCAL_SPARK_CONFIG); + + if(DMLScript.USE_LOCAL_SPARK_CONFIG) + setLocalConfSettings(conf); + + _spctx = createContext(conf); _parRDDs.clear(); } @@ -253,6 +246,29 @@ public class SparkExecutionContext extends ExecutionContext } } + + private static JavaSparkContext createContext(SparkConf conf){ + try{ + return new JavaSparkContext(conf); + } + catch(Exception e){ + if(e.getMessage().contains("A master URL must be set in your configuration")){ + LOG.error("Error constructing Spark Context, falling back to local Spark context creation"); + setLocalConfSettings(conf); + return createContext(conf); + } + else + throw new DMLException("Error while creating Spark context", e); + } + } + + private static void setLocalConfSettings(SparkConf conf){ + final String threads = ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS); + conf.setMaster("local[" + threads + "]"); + conf.setAppName("LocalSparkContextApp"); + conf.set("spark.ui.enabled", "false"); + } + /** * Sets up a SystemDS-preferred Spark configuration based on the implicit * default configuration (as passed via configurations from outside).
