This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e4f3e1  [SYSTEMDS-3215] Fix federated execution contexts for spark 
instructions
1e4f3e1 is described below

commit 1e4f3e1a983c666da187296e8f0953857c827350
Author: ywcb00 <[email protected]>
AuthorDate: Thu Dec 16 22:09:43 2021 +0100

    [SYSTEMDS-3215] Fix federated execution contexts for spark instructions
    
    Closes #1453.
---
 .../context/ExecutionContextFactory.java              |  4 ++--
 .../controlprogram/federated/ExecutionContextMap.java | 19 +++++++++++++++----
 .../federated/FederatedWorkerHandler.java             | 12 +++++++++++-
 3 files changed, 28 insertions(+), 7 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
index bc96c97..3252883 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
@@ -48,7 +48,7 @@ public class ExecutionContextFactory
        public static ExecutionContext createContext(boolean allocateVars, 
boolean allocateLineage, Program prog)
        {
                ExecutionContext ec = null;
-               
+
                switch( DMLScript.getGlobalExecMode() )
                {
                        case SINGLE_NODE:
@@ -60,7 +60,7 @@ public class ExecutionContextFactory
                                else
                                        ec = new 
SparkExecutionContext(allocateVars, allocateLineage, prog);
                                break;
-                               
+
                        case SPARK:
                        case HYBRID:
                                ec = new SparkExecutionContext(allocateVars, 
allocateLineage, prog);
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
index 352728d..ef4f6d6 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
@@ -23,11 +23,13 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
 
 public class ExecutionContextMap {
-       private final ExecutionContext _main;
+       private ExecutionContext _main;
        private final Map<Long, ExecutionContext> _parEc;
        
        public ExecutionContextMap() {
@@ -35,7 +37,7 @@ public class ExecutionContextMap {
                _parEc = new ConcurrentHashMap<>();
        }
        
-       public ExecutionContext get(long tid) {
+       public synchronized ExecutionContext get(long tid) {
                //return main execution context
                if( tid <= 0 )
                        return _main;
@@ -45,7 +47,7 @@ public class ExecutionContextMap {
                        k -> deriveExecutionContext(_main));
        }
        
-       public void clear() {
+       public synchronized void clear() {
                //handle main symbol table (w/ tmp list for concurrent 
modification)
                for( String varName : new 
ArrayList<>(_main.getVariables().keySet()) )
                        _main.cleanupDataObject(_main.removeVariable(varName));
@@ -56,7 +58,16 @@ public class ExecutionContextMap {
                                
_main.cleanupDataObject(ec.removeVariable(varName));
                _parEc.clear();
        }
-       
+
+       public synchronized void convertToSparkCtx() {
+               // set hybrid mode for global consistency
+               DMLScript.setGlobalExecMode(ExecMode.HYBRID);
+               
+               //convert existing execution contexts
+               _main = deriveExecutionContext(_main);
+               _parEc.replaceAll((k,v) -> deriveExecutionContext(v));
+               
+       }
        private static ExecutionContext createExecutionContext() {
                ExecutionContext ec = ExecutionContextFactory.createContext();
                ec.setAutoCreateVars(true); //w/o createvar inst
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 0e98fb6..1d0eb79 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -43,9 +43,11 @@ import 
org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import 
org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import 
org.apache.sysds.runtime.controlprogram.federated.FederatedResponse.ResponseType;
 import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.instructions.Instruction.IType;
 import org.apache.sysds.runtime.instructions.InstructionParser;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.ListObject;
@@ -336,9 +338,17 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
 
        private FederatedResponse execInstruction(FederatedRequest request) 
throws Exception {
                ExecutionContext ec = _ecm.get(request.getTID());
+               
+               //handle missing spark execution context
+               //TODO handling of spark instructions should be under control 
of federated site not coordinator
+               Instruction receivedInstruction = 
InstructionParser.parseSingleInstruction((String) request.getParam(0));
+               if(receivedInstruction.getType() == IType.SPARK
+                       && !(ec instanceof SparkExecutionContext) ) {
+                       _ecm.convertToSparkCtx();
+                       ec = _ecm.get(request.getTID());
+               }
                BasicProgramBlock pb = new BasicProgramBlock(null);
                pb.getInstructions().clear();
-               Instruction receivedInstruction = 
InstructionParser.parseSingleInstruction((String) request.getParam(0));
                pb.getInstructions().add(receivedInstruction);
 
                if(DMLScript.LINEAGE)

Reply via email to