This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 75fe638  [SYSTEMDS-3307,3308] Federated planner configurations 
(none/runtime)
75fe638 is described below

commit 75fe63893f3e057c8b5a73bbfc86135e75f92fe2
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Mar 12 17:10:40 2022 +0100

    [SYSTEMDS-3307,3308] Federated planner configurations (none/runtime)
    
    This patch is a prerequisite for integrating multiple federated planners
    (none consolidates the federated data, runtime converts CP to Fed, and
    various compile_* planners).
---
 conf/SystemDS-config.xml.template                             |  3 +++
 src/main/java/org/apache/sysds/conf/CompilerConfig.java       |  5 ++++-
 src/main/java/org/apache/sysds/conf/ConfigurationManager.java |  6 ++++--
 src/main/java/org/apache/sysds/conf/DMLConfig.java            |  7 +++++--
 src/main/java/org/apache/sysds/hops/OptimizerUtils.java       |  7 +++++++
 .../apache/sysds/hops/rewrite/RewriteFederatedExecution.java  |  8 ++++++++
 .../apache/sysds/runtime/instructions/cp/CPInstruction.java   | 11 +++++++----
 7 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/conf/SystemDS-config.xml.template 
b/conf/SystemDS-config.xml.template
index 141f044..b4ba733 100644
--- a/conf/SystemDS-config.xml.template
+++ b/conf/SystemDS-config.xml.template
@@ -99,4 +99,7 @@
 
     <!-- enables compiler assisted partial rewrites (e.g. Append-TSMM) -->
     <sysds.lineage.compilerassisted>true</sysds.lineage.compilerassisted>
+    
+    <!-- set the federated plan generator (none, [runtime], compile_allfed, 
compile_heuristic, compile_costbased) -->
+    <sysds.federated.planner>runtime</sysds.federated.planner>
 </root>
diff --git a/src/main/java/org/apache/sysds/conf/CompilerConfig.java 
b/src/main/java/org/apache/sysds/conf/CompilerConfig.java
index 28dbc7e..9728bd2 100644
--- a/src/main/java/org/apache/sysds/conf/CompilerConfig.java
+++ b/src/main/java/org/apache/sysds/conf/CompilerConfig.java
@@ -75,7 +75,10 @@ public class CompilerConfig
                MLCONTEXT, // execution via new MLContext
                
                //code generation enabled 
-               CODEGEN_ENABLED;
+               CODEGEN_ENABLED,
+               
+               //federated runtime conversion
+               FEDERATED_RUNTIME;
        }
        
        //default flags (exposed for testing purposes only)
diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java 
b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index c51d4e1..0aa74d2 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -23,8 +23,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.CompilerConfig.ConfigType;
 import org.apache.sysds.lops.Compression.CompressConfig;
 
-
-
 /**
  * Singleton for accessing the parsed and merged system configuration.
  * 
@@ -199,6 +197,10 @@ public class ConfigurationManager
                return (getDMLConfig().getBooleanValue(DMLConfig.CODEGEN)
                        || getCompilerConfigFlag(ConfigType.CODEGEN_ENABLED));
        }
+       
+       public static boolean isFederatedRuntimePlanner() {
+               return getCompilerConfigFlag(ConfigType.FEDERATED_RUNTIME);
+       }
 
        public static boolean isCompressionEnabled(){
                CompressConfig compress = 
CompressConfig.valueOf(getDMLConfig().getTextValue(DMLConfig.COMPRESSED_LINALG).toUpperCase());
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java 
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index f46be3b..ea6bef0 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -43,6 +43,7 @@ import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.hops.codegen.SpoofCompiler.CompilerType;
 import org.apache.sysds.hops.codegen.SpoofCompiler.GeneratorAPI;
 import org.apache.sysds.hops.codegen.SpoofCompiler.PlanSelector;
+import 
org.apache.sysds.hops.rewrite.RewriteFederatedExecution.FederatedPlanner;
 import org.apache.sysds.lops.Compression;
 import org.apache.sysds.lops.compile.linearization.ILinearize.DagLinearization;
 import org.apache.sysds.parser.ParseException;
@@ -112,6 +113,7 @@ public class DMLConfig
        public static final String USE_SSL_FEDERATED_COMMUNICATION = 
"sysds.federated.ssl"; // boolean
        public static final String DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT = 
"sysds.federated.initialization.timeout"; // int seconds
        public static final String FEDERATED_TIMEOUT = 
"sysds.federated.timeout"; // single request timeout default -1 to indicate 
infinite.
+       public static final String FEDERATED_PLANNER = 
"sysds.federated.planner";
        public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed 
default Spark Port
        public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 2;
        
@@ -152,7 +154,7 @@ public class DMLConfig
                _defaultVals.put(CODEGEN,                "false" );
                _defaultVals.put(CODEGEN_API,            
GeneratorAPI.JAVA.name() );
                _defaultVals.put(CODEGEN_COMPILER,       
CompilerType.AUTO.name() );
-               _defaultVals.put(CODEGEN_OPTIMIZER,      
PlanSelector.FUSE_COST_BASED_V2.name() );
+               _defaultVals.put(CODEGEN_OPTIMIZER,      
PlanSelector.FUSE_COST_BASED_V2.name());
                _defaultVals.put(CODEGEN_PLANCACHE,      "true" );
                _defaultVals.put(CODEGEN_LITERALS,       "1" );
                _defaultVals.put(NATIVE_BLAS,            "none" );
@@ -173,7 +175,8 @@ public class DMLConfig
                _defaultVals.put(FLOATING_POINT_PRECISION, "double" );
                _defaultVals.put(USE_SSL_FEDERATED_COMMUNICATION, "false");
                _defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, 
"10");
-               _defaultVals.put(FEDERATED_TIMEOUT, "-1");
+               _defaultVals.put(FEDERATED_TIMEOUT,      "-1");
+               _defaultVals.put(FEDERATED_PLANNER,      
FederatedPlanner.RUNTIME.name());
        }
        
        public DMLConfig() {
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 4d48df6..19d0dec 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -35,6 +35,7 @@ import org.apache.sysds.conf.CompilerConfig.ConfigType;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.hops.rewrite.HopRewriteUtils;
+import 
org.apache.sysds.hops.rewrite.RewriteFederatedExecution.FederatedPlanner;
 import org.apache.sysds.lops.Checkpoint;
 import org.apache.sysds.lops.Lop;
 import org.apache.sysds.common.Types.ExecType;
@@ -414,6 +415,12 @@ public class OptimizerUtils
                        cconf.set(ConfigType.PARALLEL_CP_MATRIX_OPERATIONS, 
false);
                }
                
+               //handle federated runtime conversion to avoid string 
comparisons
+               String planner = 
dmlconf.getTextValue(DMLConfig.FEDERATED_PLANNER);
+               if( FederatedPlanner.RUNTIME.name().equalsIgnoreCase(planner) ) 
{
+                       cconf.set(ConfigType.FEDERATED_RUNTIME, true);
+               }
+               
                return cconf;
        }
        
diff --git 
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java 
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
index 6288130..c70df67 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
@@ -56,6 +56,14 @@ import java.util.concurrent.Future;
 public class RewriteFederatedExecution extends HopRewriteRule {
        private static final Logger LOG = 
Logger.getLogger(RewriteFederatedExecution.class);
 
+       public enum FederatedPlanner {
+               NONE,
+               RUNTIME,
+               COMPILE_ALLFED,
+               COMPILE_HEURISTIC,
+               COMPILE_COSTBASED,
+       }
+       
        @Override
        public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, 
ProgramRewriteStatus state) {
                if ( roots != null )
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
index 95d224e..acb1fd1 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
 
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.lops.Lop;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
@@ -100,10 +101,12 @@ public abstract class CPInstruction extends Instruction
                }
                
                //robustness federated instructions (runtime assignment)
-               tmp = FEDInstructionUtils.checkAndReplaceCP(tmp, ec);
-               //NOTE: Retracing of lineage is not needed as the lineage trace
-               //is same for an instruction and its FED version.
-
+               if( ConfigurationManager.isFederatedRuntimePlanner() ) {
+                       tmp = FEDInstructionUtils.checkAndReplaceCP(tmp, ec);
+                       //NOTE: Retracing of lineage is not needed as the 
lineage trace
+                       //is same for an instruction and its FED version.
+               }
+               
                tmp = PrivacyPropagator.preprocessInstruction(tmp, ec);
                
                //Submit a task for the eviction thread. The stopping criteria 
are a passed

Reply via email to