This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 3027192d24a8b68276d17935ae366b4fb61fa5f9
Author: Matthias Boehm <[email protected]>
AuthorDate: Thu Jun 29 21:34:11 2023 +0200

    [SYSTEMDS-3586] Fix variable release on errors in federated workers
    
    This patch improves the robustness and error handling of federated
    workers for batches of federated requests. So far, if a federated
    request for EXEC_INST causes an exception the respective instruction
    did not release the inputs and thus causing incomprehensible exceptions
    on subsequent requests of the same batch. Even more severely, the state
    of variables is corrupted which can be problematic because the federated
    workers are stateful servers. We now do a dedicated release, but only
    on exceptions during instruction execution to minimize overhead.
---
 .../runtime/controlprogram/LocalVariableMap.java   |  9 +++++++
 .../federated/FederatedWorkerHandler.java          | 31 +++++++++++++++-------
 2 files changed, 30 insertions(+), 10 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/LocalVariableMap.java 
b/src/main/java/org/apache/sysds/runtime/controlprogram/LocalVariableMap.java
index 5031f3f785..c5e3a58969 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/LocalVariableMap.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/LocalVariableMap.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.StringTokenizer;
 
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import 
org.apache.sysds.runtime.controlprogram.caching.CacheableData.CacheStatus;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.ListObject;
@@ -158,6 +159,14 @@ public class LocalVariableMap implements Cloneable
                        .filter(d -> (d instanceof CacheableData)).count();
        }
        
+       public void releasePinnedData() {
+               localMap.values().stream()
+                       .filter(d -> (d instanceof CacheableData))
+                       .map(d -> (CacheableData<?>) d)
+                       .filter(d -> d.getStatus() == CacheStatus.READ)
+                       .forEach(d -> d.release());
+       }
+       
        public String serialize() {
                StringBuilder sb = new StringBuilder();
                int count = 0;
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 4c2f437a81..9bfb32dbee 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -120,6 +120,14 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                _flt = flt;
                _frc = frc;
                _fan = fan;
+               
+               if(DMLScript.LINEAGE) {
+                       // Compiler assisted optimizations are not applicable 
for Fed workers.
+                       // e.g. isMarkedForCaching fails as output operands are 
saved in the
+                       // symbol table only after the instruction execution 
finishes.
+                       // NOTE: In shared JVM, this will disable compiler 
assistance even for the coordinator
+                       LineageCacheConfig.setCompAssRW(false);
+               }
        }
        
        public FederatedWorkerHandler(FederatedLookupTable flt, 
FederatedReadCache frc, FederatedWorkloadAnalyzer fan, Timing timing) {
@@ -300,7 +308,8 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
        }
 
        private FederatedResponse executeCommand(FederatedRequest request, 
ExecutionContextMap ecm, EventStageModel eventStage)
-               throws DMLPrivacyException, FederatedWorkerHandlerException, 
Exception {
+               throws DMLPrivacyException, FederatedWorkerHandlerException, 
Exception
+       {
                final RequestType method = request.getType();
                FederatedResponse result = null;
 
@@ -616,15 +625,17 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                final BasicProgramBlock pb = new BasicProgramBlock(null);
                pb.getInstructions().clear();
                pb.getInstructions().add(ins);
-
-               if(DMLScript.LINEAGE)
-                       // Compiler assisted optimizations are not applicable 
for Fed workers.
-                       // e.g. isMarkedForCaching fails as output operands are 
saved in the
-                       // symbol table only after the instruction execution 
finishes.
-                       // NOTE: In shared JVM, this will disable compiler 
assistance even for the coordinator
-                       LineageCacheConfig.setCompAssRW(false);
-
-               pb.execute(ec); // execute single instruction
+               
+               try {
+                       // execute single instruction
+                       pb.execute(ec);
+               }
+               catch(Exception ex) {
+                       // ensure all variables are properly unpinned, even in 
case
+                       // of failures because federated workers are stateful 
servers
+                       ec.getVariables().releasePinnedData();
+                       throw ex;
+               }
        }
 
        private static void adaptToWorkload(ExecutionContext ec, 
FederatedWorkloadAnalyzer fan,  long tid, Instruction ins){

Reply via email to