This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 1e4f3e1 [SYSTEMDS-3215] Fix federated execution contexts for spark
instructions
1e4f3e1 is described below
commit 1e4f3e1a983c666da187296e8f0953857c827350
Author: ywcb00 <[email protected]>
AuthorDate: Thu Dec 16 22:09:43 2021 +0100
[SYSTEMDS-3215] Fix federated execution contexts for spark instructions
Closes #1453.
---
.../context/ExecutionContextFactory.java | 4 ++--
.../controlprogram/federated/ExecutionContextMap.java | 19 +++++++++++++++----
.../federated/FederatedWorkerHandler.java | 12 +++++++++++-
3 files changed, 28 insertions(+), 7 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
index bc96c97..3252883 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
@@ -48,7 +48,7 @@ public class ExecutionContextFactory
public static ExecutionContext createContext(boolean allocateVars,
boolean allocateLineage, Program prog)
{
ExecutionContext ec = null;
-
+
switch( DMLScript.getGlobalExecMode() )
{
case SINGLE_NODE:
@@ -60,7 +60,7 @@ public class ExecutionContextFactory
else
ec = new
SparkExecutionContext(allocateVars, allocateLineage, prog);
break;
-
+
case SPARK:
case HYBRID:
ec = new SparkExecutionContext(allocateVars,
allocateLineage, prog);
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
index 352728d..ef4f6d6 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/ExecutionContextMap.java
@@ -23,11 +23,13 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
public class ExecutionContextMap {
- private final ExecutionContext _main;
+ private ExecutionContext _main;
private final Map<Long, ExecutionContext> _parEc;
public ExecutionContextMap() {
@@ -35,7 +37,7 @@ public class ExecutionContextMap {
_parEc = new ConcurrentHashMap<>();
}
- public ExecutionContext get(long tid) {
+ public synchronized ExecutionContext get(long tid) {
//return main execution context
if( tid <= 0 )
return _main;
@@ -45,7 +47,7 @@ public class ExecutionContextMap {
k -> deriveExecutionContext(_main));
}
- public void clear() {
+ public synchronized void clear() {
//handle main symbol table (w/ tmp list for concurrent
modification)
for( String varName : new
ArrayList<>(_main.getVariables().keySet()) )
_main.cleanupDataObject(_main.removeVariable(varName));
@@ -56,7 +58,16 @@ public class ExecutionContextMap {
_main.cleanupDataObject(ec.removeVariable(varName));
_parEc.clear();
}
-
+
+ public synchronized void convertToSparkCtx() {
+ // set hybrid mode for global consistency
+ DMLScript.setGlobalExecMode(ExecMode.HYBRID);
+
+ //convert existing execution contexts
+ _main = deriveExecutionContext(_main);
+ _parEc.replaceAll((k,v) -> deriveExecutionContext(v));
+
+ }
private static ExecutionContext createExecutionContext() {
ExecutionContext ec = ExecutionContextFactory.createContext();
ec.setAutoCreateVars(true); //w/o createvar inst
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 0e98fb6..1d0eb79 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -43,9 +43,11 @@ import
org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import
org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
import
org.apache.sysds.runtime.controlprogram.federated.FederatedResponse.ResponseType;
import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.instructions.Instruction.IType;
import org.apache.sysds.runtime.instructions.InstructionParser;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.ListObject;
@@ -336,9 +338,17 @@ public class FederatedWorkerHandler extends
ChannelInboundHandlerAdapter {
private FederatedResponse execInstruction(FederatedRequest request)
throws Exception {
ExecutionContext ec = _ecm.get(request.getTID());
+
+ //handle missing spark execution context
+ //TODO handling of spark instructions should be under control
of federated site not coordinator
+ Instruction receivedInstruction =
InstructionParser.parseSingleInstruction((String) request.getParam(0));
+ if(receivedInstruction.getType() == IType.SPARK
+ && !(ec instanceof SparkExecutionContext) ) {
+ _ecm.convertToSparkCtx();
+ ec = _ecm.get(request.getTID());
+ }
BasicProgramBlock pb = new BasicProgramBlock(null);
pb.getInstructions().clear();
- Instruction receivedInstruction =
InstructionParser.parseSingleInstruction((String) request.getParam(0));
pb.getInstructions().add(receivedInstruction);
if(DMLScript.LINEAGE)