This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ac9af09bc [SYSTEMDS-3088] Configuration flags for Prefetch, Broadcast
8ac9af09bc is described below

commit 8ac9af09bcc6a9dd6de818544fde32e4ec0add90
Author: Arnab Phani <[email protected]>
AuthorDate: Mon Oct 24 11:43:39 2022 +0200

    [SYSTEMDS-3088] Configuration flags for Prefetch, Broadcast
    
    This patch adds configuration flags for Prefetch, Broadcast and
    other instructions that asynchronously trigger Spark executions.
---
 conf/SystemDS-config.xml.template                             |  7 +++++++
 src/main/java/org/apache/sysds/conf/ConfigurationManager.java | 11 +++++++++++
 src/main/java/org/apache/sysds/conf/DMLConfig.java            |  8 ++++++--
 src/main/java/org/apache/sysds/hops/OptimizerUtils.java       |  4 +++-
 src/main/java/org/apache/sysds/lops/compile/Dag.java          |  5 +++--
 .../java/org/apache/sysds/utils/stats/SparkStatistics.java    |  5 ++---
 .../apache/sysds/test/functions/async/AsyncBroadcastTest.java |  4 ++--
 .../apache/sysds/test/functions/async/PrefetchRDDTest.java    |  4 ++--
 8 files changed, 36 insertions(+), 12 deletions(-)

diff --git a/conf/SystemDS-config.xml.template 
b/conf/SystemDS-config.xml.template
index 2dc98d1e4f..d35f54dbaa 100644
--- a/conf/SystemDS-config.xml.template
+++ b/conf/SystemDS-config.xml.template
@@ -141,4 +141,11 @@
 
     <!-- set memory manager (static, unified) -->
     <sysds.caching.memorymanager>static</sysds.caching.memorymanager>
+       
+       <!-- Asynchronously trigger prefetch (Spark intermediate) -->
+    <sysds.async.prefetch>false</sysds.async.prefetch>
+
+    <!-- Asynchronously trigger broadcast (CP intermediate) -->
+    <sysds.async.broadcast>false</sysds.async.broadcast>
+
 </root>
diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java 
b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index 9e379bc09a..930d26a6d0 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -21,6 +21,7 @@ package org.apache.sysds.conf;
 
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.CompilerConfig.ConfigType;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.lops.Compression.CompressConfig;
 
 /**
@@ -231,6 +232,16 @@ public class ConfigurationManager
                return 
getDMLConfig().getBooleanValue(DMLConfig.FEDERATED_READCACHE);
        }
 
+       public static boolean isPrefetchEnabled() {
+               return 
(getDMLConfig().getBooleanValue(DMLConfig.ASYNC_SPARK_PREFETCH)
+                       || OptimizerUtils.ASYNC_PREFETCH_SPARK);
+       }
+
+       public static boolean isBroadcastEnabled() {
+               return 
(getDMLConfig().getBooleanValue(DMLConfig.ASYNC_SPARK_BROADCAST)
+                       || OptimizerUtils.ASYNC_BROADCAST_SPARK);
+       }
+
        ///////////////////////////////////////
        // Thread-local classes
        
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java 
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 58a78f25e9..937910d720 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -127,7 +127,9 @@ public class DMLConfig
        public static final String FEDERATED_MONITOR_FREQUENCY = 
"sysds.federated.monitorFreq";
        public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed 
default Spark Port
        public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 8;
-       
+       /** Asynchronous triggering of Spark OPs and operator placement **/
+       public static final String ASYNC_SPARK_PREFETCH = 
"sysds.async.prefetch";  // boolean: enable asynchronous prefetching spark 
intermediates
+       public static final String ASYNC_SPARK_BROADCAST = 
"sysds.async.broadcast";  // boolean: enable asynchronous broadcasting CP 
intermediates
        //internal config
        public static final String DEFAULT_SHARED_DIR_PERMISSION = "777"; //for 
local fs and DFS
        
@@ -198,6 +200,8 @@ public class DMLConfig
                _defaultVals.put(FEDERATED_READCACHE,    "true"); // vcores
                _defaultVals.put(FEDERATED_MONITOR_FREQUENCY, "3");
                _defaultVals.put(PRIVACY_CONSTRAINT_MOCK, null);
+               _defaultVals.put(ASYNC_SPARK_PREFETCH,   "false" );
+               _defaultVals.put(ASYNC_SPARK_BROADCAST,  "false" );
        }
        
        public DMLConfig() {
@@ -450,7 +454,7 @@ public class DMLConfig
                        PRINT_GPU_MEMORY_INFO, AVAILABLE_GPUS, SYNCHRONIZE_GPU, 
EAGER_CUDA_FREE, FLOATING_POINT_PRECISION,
                        GPU_EVICTION_POLICY, LOCAL_SPARK_NUM_THREADS, 
EVICTION_SHADOW_BUFFERSIZE, GPU_MEMORY_ALLOCATOR,
                        GPU_MEMORY_UTILIZATION_FACTOR, 
USE_SSL_FEDERATED_COMMUNICATION, DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT,
-                       FEDERATED_TIMEOUT, FEDERATED_MONITOR_FREQUENCY
+                       FEDERATED_TIMEOUT, FEDERATED_MONITOR_FREQUENCY, 
ASYNC_SPARK_PREFETCH, ASYNC_SPARK_BROADCAST
                }; 
                
                StringBuilder sb = new StringBuilder();
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index fc5db7a737..ccee9c96df 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -264,6 +264,7 @@ public class OptimizerUtils
         */
        public static boolean ALLOW_SCRIPT_LEVEL_COMPRESS_COMMAND = false;
 
+
        /**
         * Boolean specifying if compression rewrites is allowed. This is 
disabled at run time if the IPA for Workload aware compression
         * is activated.
@@ -281,7 +282,8 @@ public class OptimizerUtils
         * transformations, which would would otherwise make the next 
instruction wait till completion. Broadcast allows
         * asynchronously transferring the data to all the nodes.
         */
-       public static boolean ASYNC_TRIGGER_RDD_OPERATIONS = false; 
+       public static boolean ASYNC_PREFETCH_SPARK = false;
+       public static boolean ASYNC_BROADCAST_SPARK = false;
 
        //////////////////////
        // Optimizer levels //
diff --git a/src/main/java/org/apache/sysds/lops/compile/Dag.java 
b/src/main/java/org/apache/sysds/lops/compile/Dag.java
index b089b1068f..36cacde4d0 100644
--- a/src/main/java/org/apache/sysds/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysds/lops/compile/Dag.java
@@ -35,6 +35,7 @@ import org.apache.sysds.common.Types.ExecType;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.OpOp1;
 import org.apache.sysds.common.Types.OpOpData;
+import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
 import org.apache.sysds.hops.OptimizerUtils;
@@ -195,8 +196,8 @@ public class Dag<N extends Lop>
                List<Lop> node_v = ILinearize.linearize(nodes);
                
                // add Prefetch and broadcast lops, if necessary
-               List<Lop> node_pf = OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS 
? addPrefetchLop(node_v) : node_v;
-               List<Lop> node_bc = OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS 
? addBroadcastLop(node_pf) : node_pf;
+               List<Lop> node_pf = ConfigurationManager.isPrefetchEnabled() ? 
addPrefetchLop(node_v) : node_v;
+               List<Lop> node_bc = ConfigurationManager.isBroadcastEnabled() ? 
addBroadcastLop(node_pf) : node_pf;
                // TODO: Merge via a single traversal of the nodes
 
                prefetchFederated(node_bc);
diff --git a/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java 
b/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
index 3965feafdc..e374512267 100644
--- a/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
+++ b/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
@@ -123,9 +123,8 @@ public class SparkStatistics {
                                                
parallelizeTime.longValue()*1e-9,
                                                broadcastTime.longValue()*1e-9,
                                                collectTime.longValue()*1e-9));
-               if (OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS)
-                       sb.append("Spark async. count (pf,bc,tr): \t" +
-                                       String.format("%d/%d/%d.\n", 
getAsyncPrefetchCount(), getAsyncBroadcastCount(), 
getAsyncTriggerRemoteCount()));
+               sb.append("Spark async. count (pf,bc,tr): \t" +
+                               String.format("%d/%d/%d.\n", 
getAsyncPrefetchCount(), getAsyncBroadcastCount(), 
getAsyncTriggerRemoteCount()));
                return sb.toString();
        }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/async/AsyncBroadcastTest.java 
b/src/test/java/org/apache/sysds/test/functions/async/AsyncBroadcastTest.java
index 0884753e38..edfef8998d 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/async/AsyncBroadcastTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/async/AsyncBroadcastTest.java
@@ -88,9 +88,9 @@ public class AsyncBroadcastTest extends AutomatedTestBase {
                        runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
                        HashMap<MatrixValue.CellIndex, Double> R = 
readDMLScalarFromOutputDir("R");
 
-                       OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS = true;
+                       OptimizerUtils.ASYNC_BROADCAST_SPARK = true;
                        runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
-                       OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS = false;
+                       OptimizerUtils.ASYNC_BROADCAST_SPARK = false;
                        HashMap<MatrixValue.CellIndex, Double> R_bc = 
readDMLScalarFromOutputDir("R");
 
                        //compare matrices
diff --git 
a/src/test/java/org/apache/sysds/test/functions/async/PrefetchRDDTest.java 
b/src/test/java/org/apache/sysds/test/functions/async/PrefetchRDDTest.java
index 61279bd036..61ae7059b1 100644
--- a/src/test/java/org/apache/sysds/test/functions/async/PrefetchRDDTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/async/PrefetchRDDTest.java
@@ -96,9 +96,9 @@ public class PrefetchRDDTest extends AutomatedTestBase {
                        runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
                        HashMap<MatrixValue.CellIndex, Double> R = 
readDMLScalarFromOutputDir("R");
 
-                       OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS = true;
+                       OptimizerUtils.ASYNC_PREFETCH_SPARK = true;
                        runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
-                       OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS = false;
+                       OptimizerUtils.ASYNC_PREFETCH_SPARK = false;
                        HashMap<MatrixValue.CellIndex, Double> R_pf = 
readDMLScalarFromOutputDir("R");
 
                        //compare matrices

Reply via email to