This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 75fe638 [SYSTEMDS-3307,3308] Federated planner configurations
(none/runtime)
75fe638 is described below
commit 75fe63893f3e057c8b5a73bbfc86135e75f92fe2
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Mar 12 17:10:40 2022 +0100
[SYSTEMDS-3307,3308] Federated planner configurations (none/runtime)
This patch is a prerequisite for integrating multiple federated planners
(none consolidates the federated data, runtime converts CP to Fed, and
various compile_* planners).
---
conf/SystemDS-config.xml.template | 3 +++
src/main/java/org/apache/sysds/conf/CompilerConfig.java | 5 ++++-
src/main/java/org/apache/sysds/conf/ConfigurationManager.java | 6 ++++--
src/main/java/org/apache/sysds/conf/DMLConfig.java | 7 +++++--
src/main/java/org/apache/sysds/hops/OptimizerUtils.java | 7 +++++++
.../apache/sysds/hops/rewrite/RewriteFederatedExecution.java | 8 ++++++++
.../apache/sysds/runtime/instructions/cp/CPInstruction.java | 11 +++++++----
7 files changed, 38 insertions(+), 9 deletions(-)
diff --git a/conf/SystemDS-config.xml.template
b/conf/SystemDS-config.xml.template
index 141f044..b4ba733 100644
--- a/conf/SystemDS-config.xml.template
+++ b/conf/SystemDS-config.xml.template
@@ -99,4 +99,7 @@
<!-- enables compiler assisted partial rewrites (e.g. Append-TSMM) -->
<sysds.lineage.compilerassisted>true</sysds.lineage.compilerassisted>
+
+ <!-- set the federated plan generator (none, [runtime], compile_allfed,
compile_heuristic, compile_costbased) -->
+ <sysds.federated.planner>runtime</sysds.federated.planner>
</root>
diff --git a/src/main/java/org/apache/sysds/conf/CompilerConfig.java
b/src/main/java/org/apache/sysds/conf/CompilerConfig.java
index 28dbc7e..9728bd2 100644
--- a/src/main/java/org/apache/sysds/conf/CompilerConfig.java
+++ b/src/main/java/org/apache/sysds/conf/CompilerConfig.java
@@ -75,7 +75,10 @@ public class CompilerConfig
MLCONTEXT, // execution via new MLContext
//code generation enabled
- CODEGEN_ENABLED;
+ CODEGEN_ENABLED,
+
+ //federated runtime conversion
+ FEDERATED_RUNTIME;
}
//default flags (exposed for testing purposes only)
diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index c51d4e1..0aa74d2 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -23,8 +23,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.CompilerConfig.ConfigType;
import org.apache.sysds.lops.Compression.CompressConfig;
-
-
/**
* Singleton for accessing the parsed and merged system configuration.
*
@@ -199,6 +197,10 @@ public class ConfigurationManager
return (getDMLConfig().getBooleanValue(DMLConfig.CODEGEN)
|| getCompilerConfigFlag(ConfigType.CODEGEN_ENABLED));
}
+
+ public static boolean isFederatedRuntimePlanner() {
+ return getCompilerConfigFlag(ConfigType.FEDERATED_RUNTIME);
+ }
public static boolean isCompressionEnabled(){
CompressConfig compress =
CompressConfig.valueOf(getDMLConfig().getTextValue(DMLConfig.COMPRESSED_LINALG).toUpperCase());
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index f46be3b..ea6bef0 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -43,6 +43,7 @@ import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.hops.codegen.SpoofCompiler.CompilerType;
import org.apache.sysds.hops.codegen.SpoofCompiler.GeneratorAPI;
import org.apache.sysds.hops.codegen.SpoofCompiler.PlanSelector;
+import
org.apache.sysds.hops.rewrite.RewriteFederatedExecution.FederatedPlanner;
import org.apache.sysds.lops.Compression;
import org.apache.sysds.lops.compile.linearization.ILinearize.DagLinearization;
import org.apache.sysds.parser.ParseException;
@@ -112,6 +113,7 @@ public class DMLConfig
public static final String USE_SSL_FEDERATED_COMMUNICATION =
"sysds.federated.ssl"; // boolean
public static final String DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT =
"sysds.federated.initialization.timeout"; // int seconds
public static final String FEDERATED_TIMEOUT =
"sysds.federated.timeout"; // single request timeout default -1 to indicate
infinite.
+ public static final String FEDERATED_PLANNER =
"sysds.federated.planner";
public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed
default Spark Port
public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 2;
@@ -152,7 +154,7 @@ public class DMLConfig
_defaultVals.put(CODEGEN, "false" );
_defaultVals.put(CODEGEN_API,
GeneratorAPI.JAVA.name() );
_defaultVals.put(CODEGEN_COMPILER,
CompilerType.AUTO.name() );
- _defaultVals.put(CODEGEN_OPTIMIZER,
PlanSelector.FUSE_COST_BASED_V2.name() );
+ _defaultVals.put(CODEGEN_OPTIMIZER,
PlanSelector.FUSE_COST_BASED_V2.name());
_defaultVals.put(CODEGEN_PLANCACHE, "true" );
_defaultVals.put(CODEGEN_LITERALS, "1" );
_defaultVals.put(NATIVE_BLAS, "none" );
@@ -173,7 +175,8 @@ public class DMLConfig
_defaultVals.put(FLOATING_POINT_PRECISION, "double" );
_defaultVals.put(USE_SSL_FEDERATED_COMMUNICATION, "false");
_defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT,
"10");
- _defaultVals.put(FEDERATED_TIMEOUT, "-1");
+ _defaultVals.put(FEDERATED_TIMEOUT, "-1");
+ _defaultVals.put(FEDERATED_PLANNER,
FederatedPlanner.RUNTIME.name());
}
public DMLConfig() {
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 4d48df6..19d0dec 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -35,6 +35,7 @@ import org.apache.sysds.conf.CompilerConfig.ConfigType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.hops.rewrite.HopRewriteUtils;
+import
org.apache.sysds.hops.rewrite.RewriteFederatedExecution.FederatedPlanner;
import org.apache.sysds.lops.Checkpoint;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.common.Types.ExecType;
@@ -414,6 +415,12 @@ public class OptimizerUtils
cconf.set(ConfigType.PARALLEL_CP_MATRIX_OPERATIONS,
false);
}
+ //handle federated runtime conversion to avoid string
comparisons
+ String planner =
dmlconf.getTextValue(DMLConfig.FEDERATED_PLANNER);
+ if( FederatedPlanner.RUNTIME.name().equalsIgnoreCase(planner) )
{
+ cconf.set(ConfigType.FEDERATED_RUNTIME, true);
+ }
+
return cconf;
}
diff --git
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
index 6288130..c70df67 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
@@ -56,6 +56,14 @@ import java.util.concurrent.Future;
public class RewriteFederatedExecution extends HopRewriteRule {
private static final Logger LOG =
Logger.getLogger(RewriteFederatedExecution.class);
+ public enum FederatedPlanner {
+ NONE,
+ RUNTIME,
+ COMPILE_ALLFED,
+ COMPILE_HEURISTIC,
+ COMPILE_COSTBASED,
+ }
+
@Override
public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots,
ProgramRewriteStatus state) {
if ( roots != null )
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
index 95d224e..acb1fd1 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
@@ -100,10 +101,12 @@ public abstract class CPInstruction extends Instruction
}
//robustness federated instructions (runtime assignment)
- tmp = FEDInstructionUtils.checkAndReplaceCP(tmp, ec);
- //NOTE: Retracing of lineage is not needed as the lineage trace
- //is same for an instruction and its FED version.
-
+ if( ConfigurationManager.isFederatedRuntimePlanner() ) {
+ tmp = FEDInstructionUtils.checkAndReplaceCP(tmp, ec);
+ //NOTE: Retracing of lineage is not needed as the
lineage trace
+ //is same for an instruction and its FED version.
+ }
+
tmp = PrivacyPropagator.preprocessInstruction(tmp, ec);
//Submit a task for the eviction thread. The stopping criteria
are a passed