This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 8ac9af09bc [SYSTEMDS-3088] Configuration flags for Prefetch, Broadcast
8ac9af09bc is described below
commit 8ac9af09bcc6a9dd6de818544fde32e4ec0add90
Author: Arnab Phani <[email protected]>
AuthorDate: Mon Oct 24 11:43:39 2022 +0200
[SYSTEMDS-3088] Configuration flags for Prefetch, Broadcast
This patch adds configuration flags for Prefetch, Broadcast and
other instructions that asynchronously trigger Spark executions.
---
conf/SystemDS-config.xml.template | 7 +++++++
src/main/java/org/apache/sysds/conf/ConfigurationManager.java | 11 +++++++++++
src/main/java/org/apache/sysds/conf/DMLConfig.java | 8 ++++++--
src/main/java/org/apache/sysds/hops/OptimizerUtils.java | 4 +++-
src/main/java/org/apache/sysds/lops/compile/Dag.java | 5 +++--
.../java/org/apache/sysds/utils/stats/SparkStatistics.java | 5 ++---
.../apache/sysds/test/functions/async/AsyncBroadcastTest.java | 4 ++--
.../apache/sysds/test/functions/async/PrefetchRDDTest.java | 4 ++--
8 files changed, 36 insertions(+), 12 deletions(-)
diff --git a/conf/SystemDS-config.xml.template
b/conf/SystemDS-config.xml.template
index 2dc98d1e4f..d35f54dbaa 100644
--- a/conf/SystemDS-config.xml.template
+++ b/conf/SystemDS-config.xml.template
@@ -141,4 +141,11 @@
<!-- set memory manager (static, unified) -->
<sysds.caching.memorymanager>static</sysds.caching.memorymanager>
+
+ <!-- Asynchronously trigger prefetch (Spark intermediate) -->
+ <sysds.async.prefetch>false</sysds.async.prefetch>
+
+ <!-- Asynchronously trigger broadcast (CP intermediate) -->
+ <sysds.async.broadcast>false</sysds.async.broadcast>
+
</root>
diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index 9e379bc09a..930d26a6d0 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -21,6 +21,7 @@ package org.apache.sysds.conf;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.CompilerConfig.ConfigType;
+import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.lops.Compression.CompressConfig;
/**
@@ -231,6 +232,16 @@ public class ConfigurationManager
return
getDMLConfig().getBooleanValue(DMLConfig.FEDERATED_READCACHE);
}
+ public static boolean isPrefetchEnabled() {
+ return
(getDMLConfig().getBooleanValue(DMLConfig.ASYNC_SPARK_PREFETCH)
+ || OptimizerUtils.ASYNC_PREFETCH_SPARK);
+ }
+
+ public static boolean isBroadcastEnabled() {
+ return
(getDMLConfig().getBooleanValue(DMLConfig.ASYNC_SPARK_BROADCAST)
+ || OptimizerUtils.ASYNC_BROADCAST_SPARK);
+ }
+
///////////////////////////////////////
// Thread-local classes
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 58a78f25e9..937910d720 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -127,7 +127,9 @@ public class DMLConfig
public static final String FEDERATED_MONITOR_FREQUENCY =
"sysds.federated.monitorFreq";
public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed
default Spark Port
public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 8;
-
+ /** Asynchronous triggering of Spark OPs and operator placement **/
+ public static final String ASYNC_SPARK_PREFETCH =
"sysds.async.prefetch"; // boolean: enable asynchronous prefetching spark
intermediates
+ public static final String ASYNC_SPARK_BROADCAST =
"sysds.async.broadcast"; // boolean: enable asynchronous broadcasting CP
intermediates
//internal config
public static final String DEFAULT_SHARED_DIR_PERMISSION = "777"; //for
local fs and DFS
@@ -198,6 +200,8 @@ public class DMLConfig
_defaultVals.put(FEDERATED_READCACHE, "true"); // vcores
_defaultVals.put(FEDERATED_MONITOR_FREQUENCY, "3");
_defaultVals.put(PRIVACY_CONSTRAINT_MOCK, null);
+ _defaultVals.put(ASYNC_SPARK_PREFETCH, "false" );
+ _defaultVals.put(ASYNC_SPARK_BROADCAST, "false" );
}
public DMLConfig() {
@@ -450,7 +454,7 @@ public class DMLConfig
PRINT_GPU_MEMORY_INFO, AVAILABLE_GPUS, SYNCHRONIZE_GPU,
EAGER_CUDA_FREE, FLOATING_POINT_PRECISION,
GPU_EVICTION_POLICY, LOCAL_SPARK_NUM_THREADS,
EVICTION_SHADOW_BUFFERSIZE, GPU_MEMORY_ALLOCATOR,
GPU_MEMORY_UTILIZATION_FACTOR,
USE_SSL_FEDERATED_COMMUNICATION, DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT,
- FEDERATED_TIMEOUT, FEDERATED_MONITOR_FREQUENCY
+ FEDERATED_TIMEOUT, FEDERATED_MONITOR_FREQUENCY,
ASYNC_SPARK_PREFETCH, ASYNC_SPARK_BROADCAST
};
StringBuilder sb = new StringBuilder();
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index fc5db7a737..ccee9c96df 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -264,6 +264,7 @@ public class OptimizerUtils
*/
public static boolean ALLOW_SCRIPT_LEVEL_COMPRESS_COMMAND = false;
+
/**
* Boolean specifying if compression rewrites is allowed. This is
disabled at run time if the IPA for Workload aware compression
* is activated.
@@ -281,7 +282,8 @@ public class OptimizerUtils
* transformations, which would would otherwise make the next
instruction wait till completion. Broadcast allows
* asynchronously transferring the data to all the nodes.
*/
- public static boolean ASYNC_TRIGGER_RDD_OPERATIONS = false;
+ public static boolean ASYNC_PREFETCH_SPARK = false;
+ public static boolean ASYNC_BROADCAST_SPARK = false;
//////////////////////
// Optimizer levels //
diff --git a/src/main/java/org/apache/sysds/lops/compile/Dag.java
b/src/main/java/org/apache/sysds/lops/compile/Dag.java
index b089b1068f..36cacde4d0 100644
--- a/src/main/java/org/apache/sysds/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysds/lops/compile/Dag.java
@@ -35,6 +35,7 @@ import org.apache.sysds.common.Types.ExecType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.OpOp1;
import org.apache.sysds.common.Types.OpOpData;
+import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
import org.apache.sysds.hops.OptimizerUtils;
@@ -195,8 +196,8 @@ public class Dag<N extends Lop>
List<Lop> node_v = ILinearize.linearize(nodes);
// add Prefetch and broadcast lops, if necessary
- List<Lop> node_pf = OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS
? addPrefetchLop(node_v) : node_v;
- List<Lop> node_bc = OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS
? addBroadcastLop(node_pf) : node_pf;
+ List<Lop> node_pf = ConfigurationManager.isPrefetchEnabled() ?
addPrefetchLop(node_v) : node_v;
+ List<Lop> node_bc = ConfigurationManager.isBroadcastEnabled() ?
addBroadcastLop(node_pf) : node_pf;
// TODO: Merge via a single traversal of the nodes
prefetchFederated(node_bc);
diff --git a/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
b/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
index 3965feafdc..e374512267 100644
--- a/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
+++ b/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
@@ -123,9 +123,8 @@ public class SparkStatistics {
parallelizeTime.longValue()*1e-9,
broadcastTime.longValue()*1e-9,
collectTime.longValue()*1e-9));
- if (OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS)
- sb.append("Spark async. count (pf,bc,tr): \t" +
- String.format("%d/%d/%d.\n",
getAsyncPrefetchCount(), getAsyncBroadcastCount(),
getAsyncTriggerRemoteCount()));
+ sb.append("Spark async. count (pf,bc,tr): \t" +
+ String.format("%d/%d/%d.\n",
getAsyncPrefetchCount(), getAsyncBroadcastCount(),
getAsyncTriggerRemoteCount()));
return sb.toString();
}
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/async/AsyncBroadcastTest.java
b/src/test/java/org/apache/sysds/test/functions/async/AsyncBroadcastTest.java
index 0884753e38..edfef8998d 100644
---
a/src/test/java/org/apache/sysds/test/functions/async/AsyncBroadcastTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/async/AsyncBroadcastTest.java
@@ -88,9 +88,9 @@ public class AsyncBroadcastTest extends AutomatedTestBase {
runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
HashMap<MatrixValue.CellIndex, Double> R =
readDMLScalarFromOutputDir("R");
- OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS = true;
+ OptimizerUtils.ASYNC_BROADCAST_SPARK = true;
runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
- OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS = false;
+ OptimizerUtils.ASYNC_BROADCAST_SPARK = false;
HashMap<MatrixValue.CellIndex, Double> R_bc =
readDMLScalarFromOutputDir("R");
//compare matrices
diff --git
a/src/test/java/org/apache/sysds/test/functions/async/PrefetchRDDTest.java
b/src/test/java/org/apache/sysds/test/functions/async/PrefetchRDDTest.java
index 61279bd036..61ae7059b1 100644
--- a/src/test/java/org/apache/sysds/test/functions/async/PrefetchRDDTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/async/PrefetchRDDTest.java
@@ -96,9 +96,9 @@ public class PrefetchRDDTest extends AutomatedTestBase {
runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
HashMap<MatrixValue.CellIndex, Double> R =
readDMLScalarFromOutputDir("R");
- OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS = true;
+ OptimizerUtils.ASYNC_PREFETCH_SPARK = true;
runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
- OptimizerUtils.ASYNC_TRIGGER_RDD_OPERATIONS = false;
+ OptimizerUtils.ASYNC_PREFETCH_SPARK = false;
HashMap<MatrixValue.CellIndex, Double> R_pf =
readDMLScalarFromOutputDir("R");
//compare matrices