This is an automated email from the ASF dual-hosted git repository.

sebwrede pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 16a506a  [MINOR] Add Federated Compilation Options
16a506a is described below

commit 16a506a6dd7eedf5b949c0aefc9ee3c9ceb78b5b
Author: sebwrede <[email protected]>
AuthorDate: Thu Jan 20 16:54:57 2022 +0100

    [MINOR] Add Federated Compilation Options
    
    This commit adds options for compiling federated execution plans which are 
needed for testing and experiment purposes.
    Additionally, various log messages are added and exception handling changed 
to make federated executions easier to debug.
    
    Closes #1528.
---
 src/main/java/org/apache/sysds/api/DMLOptions.java |  29 ++
 src/main/java/org/apache/sysds/api/DMLScript.java  |   2 +-
 src/main/java/org/apache/sysds/hops/Hop.java       |   4 +
 .../java/org/apache/sysds/hops/OptimizerUtils.java |   3 +
 .../hops/ipa/IPAPassRewriteFederatedPlan.java      |  42 ++-
 .../java/org/apache/sysds/hops/ipa/MemoTable.java  |  15 +
 .../hops/rewrite/RewriteFederatedExecution.java    |   8 +-
 .../controlprogram/federated/FederatedWorker.java  |  14 +-
 .../federated/FederatedWorkerHandler.java          |  20 +-
 .../instructions/fed/FEDInstructionUtils.java      | 310 +++++++++++----------
 10 files changed, 277 insertions(+), 170 deletions(-)

diff --git a/src/main/java/org/apache/sysds/api/DMLOptions.java 
b/src/main/java/org/apache/sysds/api/DMLOptions.java
index 1b911bc..55773ed 100644
--- a/src/main/java/org/apache/sysds/api/DMLOptions.java
+++ b/src/main/java/org/apache/sysds/api/DMLOptions.java
@@ -31,6 +31,8 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.instructions.fed.FEDInstruction;
+import org.apache.sysds.runtime.instructions.fed.FEDInstructionUtils;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCachePolicy;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.utils.Explain;
@@ -74,6 +76,7 @@ public class DMLOptions {
        public int                  pythonPort    = -1; 
        public boolean              checkPrivacy  = false;            // Check 
which privacy constraints are loaded and checked during federated execution 
        public boolean                          federatedCompilation = false;   
  // Compile federated instructions based on input federation state and privacy 
constraints.
+       public boolean                          noFedRuntimeConversion = false; 
  // If activated, no runtime conversion of CP instructions to FED instructions 
will be performed.
 
        public final static DMLOptions defaultOptions = new DMLOptions(null);
 
@@ -103,6 +106,7 @@ public class DMLOptions {
                        ", lineage=" + lineage +
                        ", w=" + fedWorker +
                        ", federatedCompilation=" + federatedCompilation +
+                       ", noFedRuntimeConversion=" + noFedRuntimeConversion +
                        '}';
        }
        
@@ -266,11 +270,30 @@ public class DMLOptions {
                }
 
                dmlOptions.checkPrivacy = line.hasOption("checkPrivacy");
+
                if (line.hasOption("federatedCompilation")){
                        OptimizerUtils.FEDERATED_COMPILATION = true;
                        dmlOptions.federatedCompilation = true;
+                       String[] fedCompSpecs = 
line.getOptionValues("federatedCompilation");
+                       if ( fedCompSpecs != null && fedCompSpecs.length > 0 ){
+                               for ( String spec : fedCompSpecs ){
+                                       String[] specPair = spec.split("=");
+                                       if (specPair.length != 2){
+                                               throw new 
org.apache.commons.cli.ParseException("Invalid argument specified for 
-federatedCompilation option, must be a list of space separated K=V pairs, 
where K is a line number of the DML script and V is a federated output value");
+                                       }
+                                       int dmlLineNum = 
Integer.parseInt(specPair[0]);
+                                       FEDInstruction.FederatedOutput 
fedOutSpec = FEDInstruction.FederatedOutput.valueOf(specPair[1]);
+                                       
OptimizerUtils.FEDERATED_SPECS.put(dmlLineNum,fedOutSpec);
+                               }
+                       }
                }
 
+               if ( line.hasOption("noFedRuntimeConversion") ){
+                       FEDInstructionUtils.noFedRuntimeConversion = true;
+                       dmlOptions.noFedRuntimeConversion = true;
+               }
+
+
                return dmlOptions;
        }
        
@@ -325,8 +348,13 @@ public class DMLOptions {
                        .withDescription("Check which privacy constraints are 
loaded and checked during federated execution")
                        .create("checkPrivacy");
                Option federatedCompilation = OptionBuilder
+                       .withArgName("key=value")
                        .withDescription("Compile federated instructions based 
on input federation state and privacy constraints.")
+                       .hasOptionalArgs()
                        .create("federatedCompilation");
+               Option noFedRuntimeConversion = OptionBuilder
+                       .withDescription("If activated, no runtime conversion 
of CP instructions to FED instructions will be performed.")
+                       .create("noFedRuntimeConversion");
                
                options.addOption(configOpt);
                options.addOption(cleanOpt);
@@ -341,6 +369,7 @@ public class DMLOptions {
                options.addOption(fedOpt);
                options.addOption(checkPrivacy);
                options.addOption(federatedCompilation);
+               options.addOption(noFedRuntimeConversion);
 
                // Either a clean(-clean), a file(-f), a script(-s) or 
help(-help) needs to be specified
                OptionGroup fileOrScriptOpt = new OptionGroup()
diff --git a/src/main/java/org/apache/sysds/api/DMLScript.java 
b/src/main/java/org/apache/sysds/api/DMLScript.java
index 4184db1..c2ef42c 100644
--- a/src/main/java/org/apache/sysds/api/DMLScript.java
+++ b/src/main/java/org/apache/sysds/api/DMLScript.java
@@ -278,7 +278,7 @@ public class DMLScript
                        if(dmlOptions.fedWorker) {
                                loadConfiguration(fnameOptConfig);
                                try {
-                                       new 
FederatedWorker(dmlOptions.fedWorkerPort).run();
+                                       new 
FederatedWorker(dmlOptions.fedWorkerPort, dmlOptions.debug).run();
                                }
                                catch(CertificateException e) {
                                        e.printStackTrace();
diff --git a/src/main/java/org/apache/sysds/hops/Hop.java 
b/src/main/java/org/apache/sysds/hops/Hop.java
index f47fcff..037bfa5 100644
--- a/src/main/java/org/apache/sysds/hops/Hop.java
+++ b/src/main/java/org/apache/sysds/hops/Hop.java
@@ -203,6 +203,10 @@ public abstract class Hop implements ParseInfo {
                activatePrefetch = true;
        }
 
+       public void deactivatePrefetch(){
+               activatePrefetch = false;
+       }
+
        /**
         * Checks if prefetch is activated for this hop.
         * @return true if prefetch is activated
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 703b9f2..47b5822 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -52,6 +52,7 @@ import org.apache.sysds.runtime.functionobjects.IntegerDivide;
 import org.apache.sysds.runtime.functionobjects.Modulus;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.instructions.fed.FEDInstruction;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -60,6 +61,7 @@ import org.apache.sysds.runtime.util.UtilFunctions;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Map;
 
 public class OptimizerUtils 
 {
@@ -215,6 +217,7 @@ public class OptimizerUtils
         * Compile federated instructions based on input federation state and 
privacy constraints.
         */
        public static boolean FEDERATED_COMPILATION = false;
+       public static Map<Integer, FEDInstruction.FederatedOutput> 
FEDERATED_SPECS = new HashMap<>();
        
        /**
         * Specifies a multiplier computing the degree of parallelism of 
parallel
diff --git 
a/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java 
b/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java
index 59333ab..0939b25 100644
--- a/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java
+++ b/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java
@@ -19,6 +19,8 @@
 
 package org.apache.sysds.hops.ipa;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.hops.AggBinaryOp;
 import org.apache.sysds.hops.AggUnaryOp;
 import org.apache.sysds.hops.BinaryOp;
@@ -54,6 +56,7 @@ import java.util.Set;
  * The rewrite is only applied if federated compilation is activated in 
OptimizerUtils.
  */
 public class IPAPassRewriteFederatedPlan extends IPAPass {
+       private static final Log LOG = 
LogFactory.getLog(IPAPassRewriteFederatedPlan.class.getName());
 
        private final static MemoTable hopRelMemo = new MemoTable();
        private final static Set<Long> hopRelUpdatedFinal = new HashSet<>();
@@ -238,10 +241,24 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
        private void updateFederatedOutput(Hop root, HopRel updateHopRel) {
                root.setFederatedOutput(updateHopRel.getFederatedOutput());
                root.setFederatedCost(updateHopRel.getCostObject());
+               forceFixedFedOut(root);
                hopRelUpdatedFinal.add(root.getHopID());
        }
 
        /**
+        * Set federated output to fixed value if FEDERATED_SPECS is activated 
for root hop.
+        * @param root hop set to fixed fedout value as loaded from 
FEDERATED_SPECS
+        */
+       private void forceFixedFedOut(Hop root){
+               if ( 
OptimizerUtils.FEDERATED_SPECS.containsKey(root.getBeginLine()) ){
+                       FEDInstruction.FederatedOutput fedOutSpec = 
OptimizerUtils.FEDERATED_SPECS.get(root.getBeginLine());
+                       root.setFederatedOutput(fedOutSpec);
+                       if ( fedOutSpec.isForcedFederated() )
+                               root.deactivatePrefetch();
+               }
+       }
+
+       /**
         * Select federated execution plan for every Hop in the DAG starting 
from given roots.
         * The cost estimates of the hops are also updated when FederatedOutput 
is updated in the hops.
         *
@@ -259,8 +276,10 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
         * @param root starting point for going through the Hop DAG to update 
the federatedOutput fields
         */
        private void selectFederatedExecutionPlan(Hop root) {
-               visitFedPlanHop(root);
-               setFinalFedout(root);
+               if ( root != null ){
+                       visitFedPlanHop(root);
+                       setFinalFedout(root);
+               }
        }
 
        /**
@@ -274,6 +293,7 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
                        return;
                // If the currentHop has input, then the input should be 
visited depth-first
                if(currentHop.getInput() != null && 
currentHop.getInput().size() > 0) {
+                       debugLog(currentHop);
                        for(Hop input : currentHop.getInput())
                                visitFedPlanHop(input);
                }
@@ -290,6 +310,24 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
        }
 
        /**
+        * Write HOP visit to debug log if debug is activated.
+        * @param currentHop hop written to log
+        */
+       private void debugLog(Hop currentHop){
+               if ( LOG.isDebugEnabled() ){
+                       LOG.debug("Visiting HOP: " + currentHop + " Input size: 
" + currentHop.getInput().size());
+                       int index = 0;
+                       for ( Hop hop : currentHop.getInput()){
+                               if ( hop == null )
+                                       LOG.debug("Input at index is null: " + 
index);
+                               else
+                                       LOG.debug("HOP input: " + hop + " at 
index " + index + " of " + currentHop);
+                               index++;
+                       }
+               }
+       }
+
+       /**
         * Checks if the instructions related to the given hop supports 
FOUT/LOUT processing.
         *
         * @param hop to check for federated support
diff --git a/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java 
b/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java
index c1aeff6..fc95c29 100644
--- a/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java
+++ b/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java
@@ -115,4 +115,19 @@ public class MemoTable {
                        && hopRelMemo.get(root.getHopRef().getHopID()).stream()
                        .anyMatch(h -> h.getFederatedOutput() == 
root.getFederatedOutput());
        }
+
+       @Override
+       public String toString(){
+               StringBuilder sb = new StringBuilder();
+               sb.append("Federated MemoTable has 
").append(hopRelMemo.size()).append(" entries with the following values:");
+               sb.append("\n").append("{").append("\n");
+               for (Map.Entry<Long,List<HopRel>> hopEntry : 
hopRelMemo.entrySet()){
+                       sb.append("  
").append(hopEntry.getKey()).append(":").append("\n");
+                       for ( HopRel hopRel : hopEntry.getValue() ){
+                               sb.append("    
").append(hopRel.getFederatedOutput()).append(" 
").append(hopRel.getCost()).append("\n");
+                       }
+               }
+               sb.append("\n");
+               return sb.toString();
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java 
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
index 4de2557..6288130 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
@@ -22,6 +22,7 @@ package org.apache.sysds.hops.rewrite;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
 import org.apache.sysds.api.DMLException;
 import org.apache.sysds.hops.Hop;
 import org.apache.sysds.hops.LiteralOp;
@@ -53,6 +54,7 @@ import java.util.ArrayList;
 import java.util.concurrent.Future;
 
 public class RewriteFederatedExecution extends HopRewriteRule {
+       private static final Logger LOG = 
Logger.getLogger(RewriteFederatedExecution.class);
 
        @Override
        public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, 
ProgramRewriteStatus state) {
@@ -72,6 +74,8 @@ public class RewriteFederatedExecution extends HopRewriteRule 
{
                if (hop.isVisited())
                        return;
 
+               LOG.debug("RewriteFederatedExecution visitHop + " + hop);
+
                // Depth first to get to the input
                for ( Hop input : hop.getInput() )
                        visitHop(input);
@@ -98,11 +102,13 @@ public class RewriteFederatedExecution extends 
HopRewriteRule {
        private static void loadFederatedPrivacyConstraints(Hop hop){
                if ( hop.isFederatedDataOp() && hop.getPrivacy() == null){
                        try {
+                               LOG.debug("Load privacy constraints of " + hop);
                                PrivacyConstraint privConstraint = 
unwrapPrivConstraint(sendPrivConstraintRequest(hop));
+                               LOG.debug("PrivacyConstraint retrieved: " + 
privConstraint);
                                hop.setPrivacy(privConstraint);
                        }
                        catch(Exception e) {
-                               throw new DMLException(e.getMessage());
+                               throw new DMLException(e);
                        }
                }
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index de0e1d8..4090684 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -47,24 +47,25 @@ import org.apache.sysds.conf.DMLConfig;
 public class FederatedWorker {
        protected static Logger log = Logger.getLogger(FederatedWorker.class);
 
-       private int _port;
+       private final int _port;
        private final FederatedLookupTable _flt;
        private final FederatedReadCache _frc;
+       private final boolean _debug;
 
-       public FederatedWorker(int port) {
+       public FederatedWorker(int port, boolean debug) {
                _flt = new FederatedLookupTable();
                _frc = new FederatedReadCache();
                _port = (port == -1) ? DMLConfig.DEFAULT_FEDERATED_PORT : port;
+               _debug = debug;
        }
 
        public void run() throws CertificateException, SSLException {
-               log.info("Setting up Federated Worker");
+               log.info("Setting up Federated Worker on port " + _port);
                final int EVENT_LOOP_THREADS = Math.max(4, 
Runtime.getRuntime().availableProcessors() * 4);
                NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
                ThreadPoolExecutor workerTPE = new ThreadPoolExecutor(1, 
Integer.MAX_VALUE,
                        10, TimeUnit.SECONDS, new 
SynchronousQueue<Runnable>(true));
                NioEventLoopGroup workerGroup = new 
NioEventLoopGroup(EVENT_LOOP_THREADS, workerTPE);
-
                ServerBootstrap b = new ServerBootstrap();
                // TODO add ability to use real ssl files, not self signed 
certificates.
                SelfSignedCertificate cert = new SelfSignedCertificate();
@@ -94,7 +95,10 @@ public class FederatedWorker {
                        f.channel().closeFuture().sync();
                }
                catch(Exception e) {
-                       log.info("Federated worker interrupted");
+                       log.error("Federated worker interrupted");
+                       log.error(e.getMessage());
+                       if ( _debug )
+                               e.printStackTrace();
                }
                finally {
                        log.info("Federated Worker Shutting down.");
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 9f7de1f..ca2b055 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -129,6 +129,8 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                }
                catch(DMLPrivacyException | FederatedWorkerHandlerException ex) 
{
                        // Here we control the error message, therefore it is 
allowed to send the stack trace with the response
+                       LOG.error("Exception in FederatedWorkerHandler while 
processing requests:\n"
+                               + Arrays.toString(requests), ex);
                        return new FederatedResponse(ResponseType.ERROR, ex);
                }
                catch(Exception ex) {
@@ -417,16 +419,16 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                ExecutionContext ec = ecm.get(request.getTID());
 
                // get function and input parameters
-               FederatedUDF udf = (FederatedUDF) request.getParam(0);
-               Data[] inputs = Arrays.stream(udf.getInputIDs()).mapToObj(id -> 
ec.getVariable(String.valueOf(id)))
-                       
.map(PrivacyMonitor::handlePrivacy).toArray(Data[]::new);
+               try {
+                       FederatedUDF udf = (FederatedUDF) request.getParam(0);
+                       Data[] inputs = 
Arrays.stream(udf.getInputIDs()).mapToObj(id -> 
ec.getVariable(String.valueOf(id)))
+                               
.map(PrivacyMonitor::handlePrivacy).toArray(Data[]::new);
 
-               // trace lineage
-               if(DMLScript.LINEAGE)
-                       LineageItemUtils.traceFedUDF(ec, udf);
+                       // trace lineage
+                       if(DMLScript.LINEAGE)
+                               LineageItemUtils.traceFedUDF(ec, udf);
 
-               // reuse or execute user-defined function
-               try {
+                       // reuse or execute user-defined function
                        // reuse UDF outputs if available in lineage cache
                        FederatedResponse reuse = LineageCache.reuse(udf, ec);
                        if(reuse.isSuccessful())
@@ -441,6 +443,8 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                        return res;
                }
                catch(DMLPrivacyException | FederatedWorkerHandlerException ex) 
{
+                       LOG.debug("FederatedWorkerHandler Privacy Constraint " +
+                               "exception thrown when processing EXEC_UDF 
request ", ex);
                        throw ex;
                }
                catch(Exception ex) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 64f1f9a..5915033 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -94,6 +94,8 @@ public class FEDInstructionUtils {
        
        private static String[] PARAM_BUILTINS = new String[]{
                "replace", "rmempty", "lowertri", "uppertri", 
"transformdecode", "transformapply", "tokenize"};
+
+       public static boolean noFedRuntimeConversion = false;
        
        // private static final Log LOG = 
LogFactory.getLog(FEDInstructionUtils.class.getName());
 
@@ -109,178 +111,180 @@ public class FEDInstructionUtils {
         * @return The potentially modified instruction
         */
        public static Instruction checkAndReplaceCP(Instruction inst, 
ExecutionContext ec) {
-               FEDInstruction fedinst = null;
-               if (inst instanceof AggregateBinaryCPInstruction) {
-                       AggregateBinaryCPInstruction instruction = 
(AggregateBinaryCPInstruction) inst;
-                       if( instruction.input1.isMatrix() && 
instruction.input2.isMatrix()) {
-                               MatrixObject mo1 = 
ec.getMatrixObject(instruction.input1);
-                               MatrixObject mo2 = 
ec.getMatrixObject(instruction.input2);
-                               if ( (mo1.isFederated(FType.ROW) && 
mo1.isFederatedExcept(FType.BROADCAST))
-                                       || (mo2.isFederated(FType.ROW) && 
mo2.isFederatedExcept(FType.BROADCAST))
-                                       || (mo1.isFederated(FType.COL) && 
mo1.isFederatedExcept(FType.BROADCAST))) {
-                                       fedinst = 
AggregateBinaryFEDInstruction.parseInstruction(
-                                               
InstructionUtils.concatOperands(inst.getInstructionString(), 
FederatedOutput.NONE.name()));
+               if ( !noFedRuntimeConversion ){
+                       FEDInstruction fedinst = null;
+                       if (inst instanceof AggregateBinaryCPInstruction) {
+                               AggregateBinaryCPInstruction instruction = 
(AggregateBinaryCPInstruction) inst;
+                               if( instruction.input1.isMatrix() && 
instruction.input2.isMatrix()) {
+                                       MatrixObject mo1 = 
ec.getMatrixObject(instruction.input1);
+                                       MatrixObject mo2 = 
ec.getMatrixObject(instruction.input2);
+                                       if ( (mo1.isFederated(FType.ROW) && 
mo1.isFederatedExcept(FType.BROADCAST))
+                                               || (mo2.isFederated(FType.ROW) 
&& mo2.isFederatedExcept(FType.BROADCAST))
+                                               || (mo1.isFederated(FType.COL) 
&& mo1.isFederatedExcept(FType.BROADCAST))) {
+                                               fedinst = 
AggregateBinaryFEDInstruction.parseInstruction(
+                                                       
InstructionUtils.concatOperands(inst.getInstructionString(), 
FederatedOutput.NONE.name()));
+                                       }
                                }
                        }
-               }
-               else if( inst instanceof MMChainCPInstruction) {
-                       MMChainCPInstruction linst = (MMChainCPInstruction) 
inst;
-                       MatrixObject mo = ec.getMatrixObject(linst.input1);
-                       if( mo.isFederated(FType.ROW) )
-                               fedinst = 
MMChainFEDInstruction.parseInstruction(linst.getInstructionString());
-               }
-               else if( inst instanceof MMTSJCPInstruction ) {
-                       MMTSJCPInstruction linst = (MMTSJCPInstruction) inst;
-                       MatrixObject mo = ec.getMatrixObject(linst.input1);
-                       if( (mo.isFederated(FType.ROW) && 
mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isLeft()) ||
-                               (mo.isFederated(FType.COL) && 
mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isRight()))
-                               fedinst = 
TsmmFEDInstruction.parseInstruction(linst.getInstructionString());
-               }
-               else if (inst instanceof UnaryCPInstruction && ! (inst 
instanceof IndexingCPInstruction)) {
-                       UnaryCPInstruction instruction = (UnaryCPInstruction) 
inst;
-                       if(inst instanceof ReorgCPInstruction && 
(inst.getOpcode().equals("r'") || inst.getOpcode().equals("rdiag")
-                               || inst.getOpcode().equals("rev"))) {
-                               ReorgCPInstruction rinst = (ReorgCPInstruction) 
inst;
-                               CacheableData<?> mo = 
ec.getCacheableData(rinst.input1);
-
-                               if((mo instanceof MatrixObject || mo instanceof 
FrameObject)
-                                       && 
mo.isFederatedExcept(FType.BROADCAST) )
-                                       fedinst = 
ReorgFEDInstruction.parseInstruction(
-                                               
InstructionUtils.concatOperands(rinst.getInstructionString(),FederatedOutput.NONE.name()));
+                       else if( inst instanceof MMChainCPInstruction) {
+                               MMChainCPInstruction linst = 
(MMChainCPInstruction) inst;
+                               MatrixObject mo = 
ec.getMatrixObject(linst.input1);
+                               if( mo.isFederated(FType.ROW) )
+                                       fedinst = 
MMChainFEDInstruction.parseInstruction(linst.getInstructionString());
                        }
-                       else if(instruction.input1 != null && 
instruction.input1.isMatrix()
-                               && ec.containsVariable(instruction.input1)) {
+                       else if( inst instanceof MMTSJCPInstruction ) {
+                               MMTSJCPInstruction linst = (MMTSJCPInstruction) 
inst;
+                               MatrixObject mo = 
ec.getMatrixObject(linst.input1);
+                               if( (mo.isFederated(FType.ROW) && 
mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isLeft()) ||
+                                       (mo.isFederated(FType.COL) && 
mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isRight()))
+                                       fedinst = 
TsmmFEDInstruction.parseInstruction(linst.getInstructionString());
+                       }
+                       else if (inst instanceof UnaryCPInstruction && ! (inst 
instanceof IndexingCPInstruction)) {
+                               UnaryCPInstruction instruction = 
(UnaryCPInstruction) inst;
+                               if(inst instanceof ReorgCPInstruction && 
(inst.getOpcode().equals("r'") || inst.getOpcode().equals("rdiag")
+                                       || inst.getOpcode().equals("rev"))) {
+                                       ReorgCPInstruction rinst = 
(ReorgCPInstruction) inst;
+                                       CacheableData<?> mo = 
ec.getCacheableData(rinst.input1);
 
-                               MatrixObject mo1 = 
ec.getMatrixObject(instruction.input1);
-                               if( mo1.isFederatedExcept(FType.BROADCAST) ) {
-                                       
if(instruction.getOpcode().equalsIgnoreCase("cm"))
-                                               fedinst = 
CentralMomentFEDInstruction.parseInstruction(inst.getInstructionString());
-                                       else 
if(inst.getOpcode().equalsIgnoreCase("qsort")) {
-                                               
if(mo1.getFedMapping().getFederatedRanges().length == 1)
-                                                       fedinst = 
QuantileSortFEDInstruction.parseInstruction(inst.getInstructionString());
+                                       if((mo instanceof MatrixObject || mo 
instanceof FrameObject)
+                                               && 
mo.isFederatedExcept(FType.BROADCAST) )
+                                               fedinst = 
ReorgFEDInstruction.parseInstruction(
+                                                       
InstructionUtils.concatOperands(rinst.getInstructionString(),FederatedOutput.NONE.name()));
+                               }
+                               else if(instruction.input1 != null && 
instruction.input1.isMatrix()
+                                       && 
ec.containsVariable(instruction.input1)) {
+
+                                       MatrixObject mo1 = 
ec.getMatrixObject(instruction.input1);
+                                       if( 
mo1.isFederatedExcept(FType.BROADCAST) ) {
+                                               
if(instruction.getOpcode().equalsIgnoreCase("cm"))
+                                                       fedinst = 
CentralMomentFEDInstruction.parseInstruction(inst.getInstructionString());
+                                               else 
if(inst.getOpcode().equalsIgnoreCase("qsort")) {
+                                                       
if(mo1.getFedMapping().getFederatedRanges().length == 1)
+                                                               fedinst = 
QuantileSortFEDInstruction.parseInstruction(inst.getInstructionString());
+                                               }
+                                               else 
if(inst.getOpcode().equalsIgnoreCase("rshape"))
+                                                       fedinst = 
ReshapeFEDInstruction.parseInstruction(inst.getInstructionString());
+                                               else if(inst instanceof 
AggregateUnaryCPInstruction &&
+                                                       
((AggregateUnaryCPInstruction) instruction).getAUType() == 
AggregateUnaryCPInstruction.AUType.DEFAULT)
+                                                       fedinst = 
AggregateUnaryFEDInstruction.parseInstruction(
+                                                               
InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+                                               else if(inst instanceof 
UnaryMatrixCPInstruction) {
+                                                       
if(UnaryMatrixFEDInstruction.isValidOpcode(inst.getOpcode()) &&
+                                                               
!(inst.getOpcode().equalsIgnoreCase("ucumk+*") && mo1.isFederated(FType.COL)))
+                                                               fedinst = 
UnaryMatrixFEDInstruction.parseInstruction(inst.getInstructionString());
+                                               }
                                        }
-                                       else 
if(inst.getOpcode().equalsIgnoreCase("rshape"))
-                                               fedinst = 
ReshapeFEDInstruction.parseInstruction(inst.getInstructionString());
-                                       else if(inst instanceof 
AggregateUnaryCPInstruction &&
-                                               ((AggregateUnaryCPInstruction) 
instruction).getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT)
-                                               fedinst = 
AggregateUnaryFEDInstruction.parseInstruction(
+                               }
+                       }
+                       else if (inst instanceof BinaryCPInstruction) {
+                               BinaryCPInstruction instruction = 
(BinaryCPInstruction) inst;
+                               if( (instruction.input1.isMatrix() && 
ec.getMatrixObject(instruction.input1).isFederatedExcept(FType.BROADCAST))
+                                       || (instruction.input2.isMatrix() && 
ec.getMatrixObject(instruction.input2).isFederatedExcept(FType.BROADCAST))) {
+                                       
if(instruction.getOpcode().equals("append") )
+                                               fedinst = 
AppendFEDInstruction.parseInstruction(inst.getInstructionString());
+                                       else 
if(instruction.getOpcode().equals("qpick"))
+                                               fedinst = 
QuantilePickFEDInstruction.parseInstruction(inst.getInstructionString());
+                                       else 
if("cov".equals(instruction.getOpcode()) && 
(ec.getMatrixObject(instruction.input1).isFederated(FType.ROW) ||
+                                               
ec.getMatrixObject(instruction.input2).isFederated(FType.ROW)))
+                                               fedinst = 
CovarianceFEDInstruction.parseInstruction((CovarianceCPInstruction)inst);
+                                       else
+                                               fedinst = 
BinaryFEDInstruction.parseInstruction(
                                                        
InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
-                                       else if(inst instanceof 
UnaryMatrixCPInstruction) {
-                                               
if(UnaryMatrixFEDInstruction.isValidOpcode(inst.getOpcode()) &&
-                                                       
!(inst.getOpcode().equalsIgnoreCase("ucumk+*") && mo1.isFederated(FType.COL)))
-                                                       fedinst = 
UnaryMatrixFEDInstruction.parseInstruction(inst.getInstructionString());
-                                       }
                                }
                        }
-               }
-               else if (inst instanceof BinaryCPInstruction) {
-                       BinaryCPInstruction instruction = (BinaryCPInstruction) 
inst;
-                       if( (instruction.input1.isMatrix() && 
ec.getMatrixObject(instruction.input1).isFederatedExcept(FType.BROADCAST))
-                               || (instruction.input2.isMatrix() && 
ec.getMatrixObject(instruction.input2).isFederatedExcept(FType.BROADCAST))) {
-                               if(instruction.getOpcode().equals("append") )
-                                       fedinst = 
AppendFEDInstruction.parseInstruction(inst.getInstructionString());
-                               else if(instruction.getOpcode().equals("qpick"))
-                                       fedinst = 
QuantilePickFEDInstruction.parseInstruction(inst.getInstructionString());
-                               else if("cov".equals(instruction.getOpcode()) 
&& (ec.getMatrixObject(instruction.input1).isFederated(FType.ROW) ||
-                                       
ec.getMatrixObject(instruction.input2).isFederated(FType.ROW)))
-                                       fedinst = 
CovarianceFEDInstruction.parseInstruction((CovarianceCPInstruction)inst);
-                               else
-                                       fedinst = 
BinaryFEDInstruction.parseInstruction(
-                                               
InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+                       else if( inst instanceof 
ParameterizedBuiltinCPInstruction ) {
+                               ParameterizedBuiltinCPInstruction pinst = 
(ParameterizedBuiltinCPInstruction) inst;
+                               if( ArrayUtils.contains(PARAM_BUILTINS, 
pinst.getOpcode()) && pinst.getTarget(ec).isFederatedExcept(FType.BROADCAST) )
+                                       fedinst = 
ParameterizedBuiltinFEDInstruction.parseInstruction(pinst.getInstructionString());
                        }
-               }
-               else if( inst instanceof ParameterizedBuiltinCPInstruction ) {
-                       ParameterizedBuiltinCPInstruction pinst = 
(ParameterizedBuiltinCPInstruction) inst;
-                       if( ArrayUtils.contains(PARAM_BUILTINS, 
pinst.getOpcode()) && pinst.getTarget(ec).isFederatedExcept(FType.BROADCAST) )
-                               fedinst = 
ParameterizedBuiltinFEDInstruction.parseInstruction(pinst.getInstructionString());
-               }
-               else if (inst instanceof 
MultiReturnParameterizedBuiltinCPInstruction) {
-                       MultiReturnParameterizedBuiltinCPInstruction minst = 
(MultiReturnParameterizedBuiltinCPInstruction) inst;
-                       if(minst.getOpcode().equals("transformencode") && 
minst.input1.isFrame()) {
-                               CacheableData<?> fo = 
ec.getCacheableData(minst.input1);
-                               if(fo.isFederatedExcept(FType.BROADCAST)) {
-                                       fedinst = 
MultiReturnParameterizedBuiltinFEDInstruction
-                                               
.parseInstruction(minst.getInstructionString());
+                       else if (inst instanceof 
MultiReturnParameterizedBuiltinCPInstruction) {
+                               MultiReturnParameterizedBuiltinCPInstruction 
minst = (MultiReturnParameterizedBuiltinCPInstruction) inst;
+                               if(minst.getOpcode().equals("transformencode") 
&& minst.input1.isFrame()) {
+                                       CacheableData<?> fo = 
ec.getCacheableData(minst.input1);
+                                       
if(fo.isFederatedExcept(FType.BROADCAST)) {
+                                               fedinst = 
MultiReturnParameterizedBuiltinFEDInstruction
+                                                       
.parseInstruction(minst.getInstructionString());
+                                       }
                                }
                        }
-               }
-               else if(inst instanceof IndexingCPInstruction) {
-                       // matrix and frame indexing
-                       IndexingCPInstruction minst = (IndexingCPInstruction) 
inst;
-                       if((minst.input1.isMatrix() || minst.input1.isFrame())
-                               && 
ec.getCacheableData(minst.input1).isFederatedExcept(FType.BROADCAST)) {
-                               fedinst = 
IndexingFEDInstruction.parseInstruction(minst.getInstructionString());
+                       else if(inst instanceof IndexingCPInstruction) {
+                               // matrix and frame indexing
+                               IndexingCPInstruction minst = 
(IndexingCPInstruction) inst;
+                               if((minst.input1.isMatrix() || 
minst.input1.isFrame())
+                                       && 
ec.getCacheableData(minst.input1).isFederatedExcept(FType.BROADCAST)) {
+                                       fedinst = 
IndexingFEDInstruction.parseInstruction(minst.getInstructionString());
+                               }
                        }
-               }
-               else if(inst instanceof TernaryCPInstruction) {
-                       TernaryCPInstruction tinst = (TernaryCPInstruction) 
inst;
-                       if(inst.getOpcode().equals("_map") && inst instanceof 
TernaryFrameScalarCPInstruction && 
!inst.getInstructionString().contains("UtilFunctions")
-                               && tinst.input1.isFrame() && 
ec.getFrameObject(tinst.input1).isFederated()) {
-                               long margin = 
ec.getScalarInput(tinst.input3).getLongValue();
-                               FrameObject fo = 
ec.getFrameObject(tinst.input1);
-                               if(margin == 0 || (fo.isFederated(FType.ROW) && 
margin == 1) || (fo.isFederated(FType.COL) && margin == 2))
-                                       fedinst = 
TernaryFrameScalarFEDInstruction
-                                               
.parseInstruction(InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+                       else if(inst instanceof TernaryCPInstruction) {
+                               TernaryCPInstruction tinst = 
(TernaryCPInstruction) inst;
+                               if(inst.getOpcode().equals("_map") && inst 
instanceof TernaryFrameScalarCPInstruction && 
!inst.getInstructionString().contains("UtilFunctions")
+                                       && tinst.input1.isFrame() && 
ec.getFrameObject(tinst.input1).isFederated()) {
+                                       long margin = 
ec.getScalarInput(tinst.input3).getLongValue();
+                                       FrameObject fo = 
ec.getFrameObject(tinst.input1);
+                                       if(margin == 0 || 
(fo.isFederated(FType.ROW) && margin == 1) || (fo.isFederated(FType.COL) && 
margin == 2))
+                                               fedinst = 
TernaryFrameScalarFEDInstruction
+                                                       
.parseInstruction(InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+                               }
+                               else if((tinst.input1.isMatrix() && 
ec.getCacheableData(tinst.input1).isFederatedExcept(FType.BROADCAST))
+                                       || (tinst.input2.isMatrix() && 
ec.getCacheableData(tinst.input2).isFederatedExcept(FType.BROADCAST))
+                                       || (tinst.input3.isMatrix() && 
ec.getCacheableData(tinst.input3).isFederatedExcept(FType.BROADCAST))) {
+                                       fedinst = 
TernaryFEDInstruction.parseInstruction(tinst.getInstructionString());
+                               }
                        }
-                       else if((tinst.input1.isMatrix() && 
ec.getCacheableData(tinst.input1).isFederatedExcept(FType.BROADCAST))
-                               || (tinst.input2.isMatrix() && 
ec.getCacheableData(tinst.input2).isFederatedExcept(FType.BROADCAST))
-                               || (tinst.input3.isMatrix() && 
ec.getCacheableData(tinst.input3).isFederatedExcept(FType.BROADCAST))) {
-                               fedinst = 
TernaryFEDInstruction.parseInstruction(tinst.getInstructionString());
+                       else if(inst instanceof VariableCPInstruction ){
+                               VariableCPInstruction ins = 
(VariableCPInstruction) inst;
+                               if(ins.getVariableOpcode() == 
VariableOperationCode.Write
+                                       && ins.getInput1().isMatrix()
+                                       && 
ins.getInput3().getName().contains("federated")){
+                                       fedinst = 
VariableFEDInstruction.parseInstruction(ins);
+                               }
+                               else if(ins.getVariableOpcode() == 
VariableOperationCode.CastAsFrameVariable
+                                       && ins.getInput1().isMatrix()
+                                       && 
ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
+                                       fedinst = 
VariableFEDInstruction.parseInstruction(ins);
+                               }
+                               else if(ins.getVariableOpcode() == 
VariableOperationCode.CastAsMatrixVariable
+                                       && ins.getInput1().isFrame()
+                                       && 
ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
+                                       fedinst = 
VariableFEDInstruction.parseInstruction(ins);
+                               }
                        }
-               }
-               else if(inst instanceof VariableCPInstruction ){
-                       VariableCPInstruction ins = (VariableCPInstruction) 
inst;
-                       if(ins.getVariableOpcode() == 
VariableOperationCode.Write
-                               && ins.getInput1().isMatrix()
-                               && 
ins.getInput3().getName().contains("federated")){
-                               fedinst = 
VariableFEDInstruction.parseInstruction(ins);
-                       }
-                       else if(ins.getVariableOpcode() == 
VariableOperationCode.CastAsFrameVariable
-                               && ins.getInput1().isMatrix()
-                               && 
ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
-                               fedinst = 
VariableFEDInstruction.parseInstruction(ins);
-                       }
-                       else if(ins.getVariableOpcode() == 
VariableOperationCode.CastAsMatrixVariable
-                               && ins.getInput1().isFrame()
-                               && 
ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
-                               fedinst = 
VariableFEDInstruction.parseInstruction(ins);
+                       else if(inst instanceof AggregateTernaryCPInstruction){
+                               AggregateTernaryCPInstruction ins = 
(AggregateTernaryCPInstruction) inst;
+                               if(ins.input1.isMatrix() && 
ec.getCacheableData(ins.input1).isFederatedExcept(FType.BROADCAST)
+                                       && ins.input2.isMatrix() && 
ec.getCacheableData(ins.input2).isFederatedExcept(FType.BROADCAST)) {
+                                       fedinst = 
AggregateTernaryFEDInstruction.parseInstruction(ins.getInstructionString());
+                               }
                        }
-               }
-               else if(inst instanceof AggregateTernaryCPInstruction){
-                       AggregateTernaryCPInstruction ins = 
(AggregateTernaryCPInstruction) inst;
-                       if(ins.input1.isMatrix() && 
ec.getCacheableData(ins.input1).isFederatedExcept(FType.BROADCAST)
-                               && ins.input2.isMatrix() && 
ec.getCacheableData(ins.input2).isFederatedExcept(FType.BROADCAST)) {
-                               fedinst = 
AggregateTernaryFEDInstruction.parseInstruction(ins.getInstructionString());
+                       else if(inst instanceof QuaternaryCPInstruction) {
+                               QuaternaryCPInstruction instruction = 
(QuaternaryCPInstruction) inst;
+                               Data data = ec.getVariable(instruction.input1);
+                               if(data instanceof MatrixObject && 
((MatrixObject) data).isFederatedExcept(FType.BROADCAST))
+                                       fedinst = 
QuaternaryFEDInstruction.parseInstruction(instruction.getInstructionString());
                        }
-               }
-               else if(inst instanceof QuaternaryCPInstruction) {
-                       QuaternaryCPInstruction instruction = 
(QuaternaryCPInstruction) inst;
-                       Data data = ec.getVariable(instruction.input1);
-                       if(data instanceof MatrixObject && ((MatrixObject) 
data).isFederatedExcept(FType.BROADCAST))
-                               fedinst = 
QuaternaryFEDInstruction.parseInstruction(instruction.getInstructionString());
-               }
-               else if(inst instanceof SpoofCPInstruction) {
-                       SpoofCPInstruction ins = (SpoofCPInstruction) inst;
-                       Class<?> scla = ins.getOperatorClass().getSuperclass();
-                       if(((scla == SpoofCellwise.class || scla == 
SpoofMultiAggregate.class || scla == SpoofOuterProduct.class)
+                       else if(inst instanceof SpoofCPInstruction) {
+                               SpoofCPInstruction ins = (SpoofCPInstruction) 
inst;
+                               Class<?> scla = 
ins.getOperatorClass().getSuperclass();
+                               if(((scla == SpoofCellwise.class || scla == 
SpoofMultiAggregate.class || scla == SpoofOuterProduct.class)
                                        && SpoofFEDInstruction.isFederated(ec, 
ins.getInputs(), scla))
-                               || (scla == SpoofRowwise.class && 
SpoofFEDInstruction.isFederated(ec, FType.ROW, ins.getInputs(), scla))) {
-                               fedinst = 
SpoofFEDInstruction.parseInstruction(ins.getInstructionString());
+                                       || (scla == SpoofRowwise.class && 
SpoofFEDInstruction.isFederated(ec, FType.ROW, ins.getInputs(), scla))) {
+                                       fedinst = 
SpoofFEDInstruction.parseInstruction(ins.getInstructionString());
+                               }
+                       }
+                       else if(inst instanceof CtableCPInstruction) {
+                               CtableCPInstruction cinst = 
(CtableCPInstruction) inst;
+                               if(inst.getOpcode().equalsIgnoreCase("ctable")
+                                       && ( 
ec.getCacheableData(cinst.input1).isFederated(FType.ROW)
+                                       || (cinst.input2.isMatrix() && 
ec.getCacheableData(cinst.input2).isFederated(FType.ROW))
+                                       || (cinst.input3.isMatrix() && 
ec.getCacheableData(cinst.input3).isFederated(FType.ROW))))
+                                       fedinst = 
CtableFEDInstruction.parseInstruction(cinst.getInstructionString());
                        }
-               }
-               else if(inst instanceof CtableCPInstruction) {
-                       CtableCPInstruction cinst = (CtableCPInstruction) inst;
-                       if(inst.getOpcode().equalsIgnoreCase("ctable")
-                               && ( 
ec.getCacheableData(cinst.input1).isFederated(FType.ROW)
-                               || (cinst.input2.isMatrix() && 
ec.getCacheableData(cinst.input2).isFederated(FType.ROW))
-                               || (cinst.input3.isMatrix() && 
ec.getCacheableData(cinst.input3).isFederated(FType.ROW))))
-                               fedinst = 
CtableFEDInstruction.parseInstruction(cinst.getInstructionString());
-               }
 
-               //set thread id for federated context management
-               if( fedinst != null ) {
-                       fedinst.setTID(ec.getTID());
-                       return fedinst;
+                       //set thread id for federated context management
+                       if( fedinst != null ) {
+                               fedinst.setTID(ec.getTID());
+                               return fedinst;
+                       }
                }
 
                return inst;

Reply via email to