This is an automated email from the ASF dual-hosted git repository.
sebwrede pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 16a506a [MINOR] Add Federated Compilation Options
16a506a is described below
commit 16a506a6dd7eedf5b949c0aefc9ee3c9ceb78b5b
Author: sebwrede <[email protected]>
AuthorDate: Thu Jan 20 16:54:57 2022 +0100
[MINOR] Add Federated Compilation Options
This commit adds options for compiling federated execution plans which are
needed for testing and experiment purposes.
Additionally, various log messages are added and exception handling changed
to make federated executions easier to debug.
Closes #1528.
---
src/main/java/org/apache/sysds/api/DMLOptions.java | 29 ++
src/main/java/org/apache/sysds/api/DMLScript.java | 2 +-
src/main/java/org/apache/sysds/hops/Hop.java | 4 +
.../java/org/apache/sysds/hops/OptimizerUtils.java | 3 +
.../hops/ipa/IPAPassRewriteFederatedPlan.java | 42 ++-
.../java/org/apache/sysds/hops/ipa/MemoTable.java | 15 +
.../hops/rewrite/RewriteFederatedExecution.java | 8 +-
.../controlprogram/federated/FederatedWorker.java | 14 +-
.../federated/FederatedWorkerHandler.java | 20 +-
.../instructions/fed/FEDInstructionUtils.java | 310 +++++++++++----------
10 files changed, 277 insertions(+), 170 deletions(-)
diff --git a/src/main/java/org/apache/sysds/api/DMLOptions.java
b/src/main/java/org/apache/sysds/api/DMLOptions.java
index 1b911bc..55773ed 100644
--- a/src/main/java/org/apache/sysds/api/DMLOptions.java
+++ b/src/main/java/org/apache/sysds/api/DMLOptions.java
@@ -31,6 +31,8 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.instructions.fed.FEDInstruction;
+import org.apache.sysds.runtime.instructions.fed.FEDInstructionUtils;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCachePolicy;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
import org.apache.sysds.utils.Explain;
@@ -74,6 +76,7 @@ public class DMLOptions {
public int pythonPort = -1;
public boolean checkPrivacy = false; // Check
which privacy constraints are loaded and checked during federated execution
public boolean federatedCompilation = false;
// Compile federated instructions based on input federation state and privacy
constraints.
+ public boolean noFedRuntimeConversion = false;
// If activated, no runtime conversion of CP instructions to FED instructions
will be performed.
public final static DMLOptions defaultOptions = new DMLOptions(null);
@@ -103,6 +106,7 @@ public class DMLOptions {
", lineage=" + lineage +
", w=" + fedWorker +
", federatedCompilation=" + federatedCompilation +
+ ", noFedRuntimeConversion=" + noFedRuntimeConversion +
'}';
}
@@ -266,11 +270,30 @@ public class DMLOptions {
}
dmlOptions.checkPrivacy = line.hasOption("checkPrivacy");
+
if (line.hasOption("federatedCompilation")){
OptimizerUtils.FEDERATED_COMPILATION = true;
dmlOptions.federatedCompilation = true;
+ String[] fedCompSpecs =
line.getOptionValues("federatedCompilation");
+ if ( fedCompSpecs != null && fedCompSpecs.length > 0 ){
+ for ( String spec : fedCompSpecs ){
+ String[] specPair = spec.split("=");
+ if (specPair.length != 2){
+ throw new
org.apache.commons.cli.ParseException("Invalid argument specified for
-federatedCompilation option, must be a list of space separated K=V pairs,
where K is a line number of the DML script and V is a federated output value");
+ }
+ int dmlLineNum =
Integer.parseInt(specPair[0]);
+ FEDInstruction.FederatedOutput
fedOutSpec = FEDInstruction.FederatedOutput.valueOf(specPair[1]);
+
OptimizerUtils.FEDERATED_SPECS.put(dmlLineNum,fedOutSpec);
+ }
+ }
}
+ if ( line.hasOption("noFedRuntimeConversion") ){
+ FEDInstructionUtils.noFedRuntimeConversion = true;
+ dmlOptions.noFedRuntimeConversion = true;
+ }
+
+
return dmlOptions;
}
@@ -325,8 +348,13 @@ public class DMLOptions {
.withDescription("Check which privacy constraints are
loaded and checked during federated execution")
.create("checkPrivacy");
Option federatedCompilation = OptionBuilder
+ .withArgName("key=value")
.withDescription("Compile federated instructions based
on input federation state and privacy constraints.")
+ .hasOptionalArgs()
.create("federatedCompilation");
+ Option noFedRuntimeConversion = OptionBuilder
+ .withDescription("If activated, no runtime conversion
of CP instructions to FED instructions will be performed.")
+ .create("noFedRuntimeConversion");
options.addOption(configOpt);
options.addOption(cleanOpt);
@@ -341,6 +369,7 @@ public class DMLOptions {
options.addOption(fedOpt);
options.addOption(checkPrivacy);
options.addOption(federatedCompilation);
+ options.addOption(noFedRuntimeConversion);
// Either a clean(-clean), a file(-f), a script(-s) or
help(-help) needs to be specified
OptionGroup fileOrScriptOpt = new OptionGroup()
diff --git a/src/main/java/org/apache/sysds/api/DMLScript.java
b/src/main/java/org/apache/sysds/api/DMLScript.java
index 4184db1..c2ef42c 100644
--- a/src/main/java/org/apache/sysds/api/DMLScript.java
+++ b/src/main/java/org/apache/sysds/api/DMLScript.java
@@ -278,7 +278,7 @@ public class DMLScript
if(dmlOptions.fedWorker) {
loadConfiguration(fnameOptConfig);
try {
- new
FederatedWorker(dmlOptions.fedWorkerPort).run();
+ new
FederatedWorker(dmlOptions.fedWorkerPort, dmlOptions.debug).run();
}
catch(CertificateException e) {
e.printStackTrace();
diff --git a/src/main/java/org/apache/sysds/hops/Hop.java
b/src/main/java/org/apache/sysds/hops/Hop.java
index f47fcff..037bfa5 100644
--- a/src/main/java/org/apache/sysds/hops/Hop.java
+++ b/src/main/java/org/apache/sysds/hops/Hop.java
@@ -203,6 +203,10 @@ public abstract class Hop implements ParseInfo {
activatePrefetch = true;
}
+ public void deactivatePrefetch(){
+ activatePrefetch = false;
+ }
+
/**
* Checks if prefetch is activated for this hop.
* @return true if prefetch is activated
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 703b9f2..47b5822 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -52,6 +52,7 @@ import org.apache.sysds.runtime.functionobjects.IntegerDivide;
import org.apache.sysds.runtime.functionobjects.Modulus;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.instructions.fed.FEDInstruction;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -60,6 +61,7 @@ import org.apache.sysds.runtime.util.UtilFunctions;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Map;
public class OptimizerUtils
{
@@ -215,6 +217,7 @@ public class OptimizerUtils
* Compile federated instructions based on input federation state and
privacy constraints.
*/
public static boolean FEDERATED_COMPILATION = false;
+ public static Map<Integer, FEDInstruction.FederatedOutput>
FEDERATED_SPECS = new HashMap<>();
/**
* Specifies a multiplier computing the degree of parallelism of
parallel
diff --git
a/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java
b/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java
index 59333ab..0939b25 100644
--- a/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java
+++ b/src/main/java/org/apache/sysds/hops/ipa/IPAPassRewriteFederatedPlan.java
@@ -19,6 +19,8 @@
package org.apache.sysds.hops.ipa;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.hops.AggBinaryOp;
import org.apache.sysds.hops.AggUnaryOp;
import org.apache.sysds.hops.BinaryOp;
@@ -54,6 +56,7 @@ import java.util.Set;
* The rewrite is only applied if federated compilation is activated in
OptimizerUtils.
*/
public class IPAPassRewriteFederatedPlan extends IPAPass {
+ private static final Log LOG =
LogFactory.getLog(IPAPassRewriteFederatedPlan.class.getName());
private final static MemoTable hopRelMemo = new MemoTable();
private final static Set<Long> hopRelUpdatedFinal = new HashSet<>();
@@ -238,10 +241,24 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
private void updateFederatedOutput(Hop root, HopRel updateHopRel) {
root.setFederatedOutput(updateHopRel.getFederatedOutput());
root.setFederatedCost(updateHopRel.getCostObject());
+ forceFixedFedOut(root);
hopRelUpdatedFinal.add(root.getHopID());
}
/**
+ * Set federated output to fixed value if FEDERATED_SPECS is activated
for root hop.
+ * @param root hop set to fixed fedout value as loaded from
FEDERATED_SPECS
+ */
+ private void forceFixedFedOut(Hop root){
+ if (
OptimizerUtils.FEDERATED_SPECS.containsKey(root.getBeginLine()) ){
+ FEDInstruction.FederatedOutput fedOutSpec =
OptimizerUtils.FEDERATED_SPECS.get(root.getBeginLine());
+ root.setFederatedOutput(fedOutSpec);
+ if ( fedOutSpec.isForcedFederated() )
+ root.deactivatePrefetch();
+ }
+ }
+
+ /**
* Select federated execution plan for every Hop in the DAG starting
from given roots.
* The cost estimates of the hops are also updated when FederatedOutput
is updated in the hops.
*
@@ -259,8 +276,10 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
* @param root starting point for going through the Hop DAG to update
the federatedOutput fields
*/
private void selectFederatedExecutionPlan(Hop root) {
- visitFedPlanHop(root);
- setFinalFedout(root);
+ if ( root != null ){
+ visitFedPlanHop(root);
+ setFinalFedout(root);
+ }
}
/**
@@ -274,6 +293,7 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
return;
// If the currentHop has input, then the input should be
visited depth-first
if(currentHop.getInput() != null &&
currentHop.getInput().size() > 0) {
+ debugLog(currentHop);
for(Hop input : currentHop.getInput())
visitFedPlanHop(input);
}
@@ -290,6 +310,24 @@ public class IPAPassRewriteFederatedPlan extends IPAPass {
}
/**
+ * Write HOP visit to debug log if debug is activated.
+ * @param currentHop hop written to log
+ */
+ private void debugLog(Hop currentHop){
+ if ( LOG.isDebugEnabled() ){
+ LOG.debug("Visiting HOP: " + currentHop + " Input size:
" + currentHop.getInput().size());
+ int index = 0;
+ for ( Hop hop : currentHop.getInput()){
+ if ( hop == null )
+ LOG.debug("Input at index is null: " +
index);
+ else
+ LOG.debug("HOP input: " + hop + " at
index " + index + " of " + currentHop);
+ index++;
+ }
+ }
+ }
+
+ /**
* Checks if the instructions related to the given hop supports
FOUT/LOUT processing.
*
* @param hop to check for federated support
diff --git a/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java
b/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java
index c1aeff6..fc95c29 100644
--- a/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java
+++ b/src/main/java/org/apache/sysds/hops/ipa/MemoTable.java
@@ -115,4 +115,19 @@ public class MemoTable {
&& hopRelMemo.get(root.getHopRef().getHopID()).stream()
.anyMatch(h -> h.getFederatedOutput() ==
root.getFederatedOutput());
}
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("Federated MemoTable has
").append(hopRelMemo.size()).append(" entries with the following values:");
+ sb.append("\n").append("{").append("\n");
+ for (Map.Entry<Long,List<HopRel>> hopEntry :
hopRelMemo.entrySet()){
+ sb.append("
").append(hopEntry.getKey()).append(":").append("\n");
+ for ( HopRel hopRel : hopEntry.getValue() ){
+ sb.append("
").append(hopRel.getFederatedOutput()).append("
").append(hopRel.getCost()).append("\n");
+ }
+ }
+ sb.append("\n");
+ return sb.toString();
+ }
}
diff --git
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
index 4de2557..6288130 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteFederatedExecution.java
@@ -22,6 +22,7 @@ package org.apache.sysds.hops.rewrite;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
import org.apache.sysds.api.DMLException;
import org.apache.sysds.hops.Hop;
import org.apache.sysds.hops.LiteralOp;
@@ -53,6 +54,7 @@ import java.util.ArrayList;
import java.util.concurrent.Future;
public class RewriteFederatedExecution extends HopRewriteRule {
+ private static final Logger LOG =
Logger.getLogger(RewriteFederatedExecution.class);
@Override
public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots,
ProgramRewriteStatus state) {
@@ -72,6 +74,8 @@ public class RewriteFederatedExecution extends HopRewriteRule
{
if (hop.isVisited())
return;
+ LOG.debug("RewriteFederatedExecution visitHop + " + hop);
+
// Depth first to get to the input
for ( Hop input : hop.getInput() )
visitHop(input);
@@ -98,11 +102,13 @@ public class RewriteFederatedExecution extends
HopRewriteRule {
private static void loadFederatedPrivacyConstraints(Hop hop){
if ( hop.isFederatedDataOp() && hop.getPrivacy() == null){
try {
+ LOG.debug("Load privacy constraints of " + hop);
PrivacyConstraint privConstraint =
unwrapPrivConstraint(sendPrivConstraintRequest(hop));
+ LOG.debug("PrivacyConstraint retrieved: " +
privConstraint);
hop.setPrivacy(privConstraint);
}
catch(Exception e) {
- throw new DMLException(e.getMessage());
+ throw new DMLException(e);
}
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index de0e1d8..4090684 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -47,24 +47,25 @@ import org.apache.sysds.conf.DMLConfig;
public class FederatedWorker {
protected static Logger log = Logger.getLogger(FederatedWorker.class);
- private int _port;
+ private final int _port;
private final FederatedLookupTable _flt;
private final FederatedReadCache _frc;
+ private final boolean _debug;
- public FederatedWorker(int port) {
+ public FederatedWorker(int port, boolean debug) {
_flt = new FederatedLookupTable();
_frc = new FederatedReadCache();
_port = (port == -1) ? DMLConfig.DEFAULT_FEDERATED_PORT : port;
+ _debug = debug;
}
public void run() throws CertificateException, SSLException {
- log.info("Setting up Federated Worker");
+ log.info("Setting up Federated Worker on port " + _port);
final int EVENT_LOOP_THREADS = Math.max(4,
Runtime.getRuntime().availableProcessors() * 4);
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
ThreadPoolExecutor workerTPE = new ThreadPoolExecutor(1,
Integer.MAX_VALUE,
10, TimeUnit.SECONDS, new
SynchronousQueue<Runnable>(true));
NioEventLoopGroup workerGroup = new
NioEventLoopGroup(EVENT_LOOP_THREADS, workerTPE);
-
ServerBootstrap b = new ServerBootstrap();
// TODO add ability to use real ssl files, not self signed
certificates.
SelfSignedCertificate cert = new SelfSignedCertificate();
@@ -94,7 +95,10 @@ public class FederatedWorker {
f.channel().closeFuture().sync();
}
catch(Exception e) {
- log.info("Federated worker interrupted");
+ log.error("Federated worker interrupted");
+ log.error(e.getMessage());
+ if ( _debug )
+ e.printStackTrace();
}
finally {
log.info("Federated Worker Shutting down.");
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 9f7de1f..ca2b055 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -129,6 +129,8 @@ public class FederatedWorkerHandler extends
ChannelInboundHandlerAdapter {
}
catch(DMLPrivacyException | FederatedWorkerHandlerException ex)
{
// Here we control the error message, therefore it is
allowed to send the stack trace with the response
+ LOG.error("Exception in FederatedWorkerHandler while
processing requests:\n"
+ + Arrays.toString(requests), ex);
return new FederatedResponse(ResponseType.ERROR, ex);
}
catch(Exception ex) {
@@ -417,16 +419,16 @@ public class FederatedWorkerHandler extends
ChannelInboundHandlerAdapter {
ExecutionContext ec = ecm.get(request.getTID());
// get function and input parameters
- FederatedUDF udf = (FederatedUDF) request.getParam(0);
- Data[] inputs = Arrays.stream(udf.getInputIDs()).mapToObj(id ->
ec.getVariable(String.valueOf(id)))
-
.map(PrivacyMonitor::handlePrivacy).toArray(Data[]::new);
+ try {
+ FederatedUDF udf = (FederatedUDF) request.getParam(0);
+ Data[] inputs =
Arrays.stream(udf.getInputIDs()).mapToObj(id ->
ec.getVariable(String.valueOf(id)))
+
.map(PrivacyMonitor::handlePrivacy).toArray(Data[]::new);
- // trace lineage
- if(DMLScript.LINEAGE)
- LineageItemUtils.traceFedUDF(ec, udf);
+ // trace lineage
+ if(DMLScript.LINEAGE)
+ LineageItemUtils.traceFedUDF(ec, udf);
- // reuse or execute user-defined function
- try {
+ // reuse or execute user-defined function
// reuse UDF outputs if available in lineage cache
FederatedResponse reuse = LineageCache.reuse(udf, ec);
if(reuse.isSuccessful())
@@ -441,6 +443,8 @@ public class FederatedWorkerHandler extends
ChannelInboundHandlerAdapter {
return res;
}
catch(DMLPrivacyException | FederatedWorkerHandlerException ex)
{
+ LOG.debug("FederatedWorkerHandler Privacy Constraint " +
+ "exception thrown when processing EXEC_UDF
request ", ex);
throw ex;
}
catch(Exception ex) {
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 64f1f9a..5915033 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -94,6 +94,8 @@ public class FEDInstructionUtils {
private static String[] PARAM_BUILTINS = new String[]{
"replace", "rmempty", "lowertri", "uppertri",
"transformdecode", "transformapply", "tokenize"};
+
+ public static boolean noFedRuntimeConversion = false;
// private static final Log LOG =
LogFactory.getLog(FEDInstructionUtils.class.getName());
@@ -109,178 +111,180 @@ public class FEDInstructionUtils {
* @return The potentially modified instruction
*/
public static Instruction checkAndReplaceCP(Instruction inst,
ExecutionContext ec) {
- FEDInstruction fedinst = null;
- if (inst instanceof AggregateBinaryCPInstruction) {
- AggregateBinaryCPInstruction instruction =
(AggregateBinaryCPInstruction) inst;
- if( instruction.input1.isMatrix() &&
instruction.input2.isMatrix()) {
- MatrixObject mo1 =
ec.getMatrixObject(instruction.input1);
- MatrixObject mo2 =
ec.getMatrixObject(instruction.input2);
- if ( (mo1.isFederated(FType.ROW) &&
mo1.isFederatedExcept(FType.BROADCAST))
- || (mo2.isFederated(FType.ROW) &&
mo2.isFederatedExcept(FType.BROADCAST))
- || (mo1.isFederated(FType.COL) &&
mo1.isFederatedExcept(FType.BROADCAST))) {
- fedinst =
AggregateBinaryFEDInstruction.parseInstruction(
-
InstructionUtils.concatOperands(inst.getInstructionString(),
FederatedOutput.NONE.name()));
+ if ( !noFedRuntimeConversion ){
+ FEDInstruction fedinst = null;
+ if (inst instanceof AggregateBinaryCPInstruction) {
+ AggregateBinaryCPInstruction instruction =
(AggregateBinaryCPInstruction) inst;
+ if( instruction.input1.isMatrix() &&
instruction.input2.isMatrix()) {
+ MatrixObject mo1 =
ec.getMatrixObject(instruction.input1);
+ MatrixObject mo2 =
ec.getMatrixObject(instruction.input2);
+ if ( (mo1.isFederated(FType.ROW) &&
mo1.isFederatedExcept(FType.BROADCAST))
+ || (mo2.isFederated(FType.ROW)
&& mo2.isFederatedExcept(FType.BROADCAST))
+ || (mo1.isFederated(FType.COL)
&& mo1.isFederatedExcept(FType.BROADCAST))) {
+ fedinst =
AggregateBinaryFEDInstruction.parseInstruction(
+
InstructionUtils.concatOperands(inst.getInstructionString(),
FederatedOutput.NONE.name()));
+ }
}
}
- }
- else if( inst instanceof MMChainCPInstruction) {
- MMChainCPInstruction linst = (MMChainCPInstruction)
inst;
- MatrixObject mo = ec.getMatrixObject(linst.input1);
- if( mo.isFederated(FType.ROW) )
- fedinst =
MMChainFEDInstruction.parseInstruction(linst.getInstructionString());
- }
- else if( inst instanceof MMTSJCPInstruction ) {
- MMTSJCPInstruction linst = (MMTSJCPInstruction) inst;
- MatrixObject mo = ec.getMatrixObject(linst.input1);
- if( (mo.isFederated(FType.ROW) &&
mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isLeft()) ||
- (mo.isFederated(FType.COL) &&
mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isRight()))
- fedinst =
TsmmFEDInstruction.parseInstruction(linst.getInstructionString());
- }
- else if (inst instanceof UnaryCPInstruction && ! (inst
instanceof IndexingCPInstruction)) {
- UnaryCPInstruction instruction = (UnaryCPInstruction)
inst;
- if(inst instanceof ReorgCPInstruction &&
(inst.getOpcode().equals("r'") || inst.getOpcode().equals("rdiag")
- || inst.getOpcode().equals("rev"))) {
- ReorgCPInstruction rinst = (ReorgCPInstruction)
inst;
- CacheableData<?> mo =
ec.getCacheableData(rinst.input1);
-
- if((mo instanceof MatrixObject || mo instanceof
FrameObject)
- &&
mo.isFederatedExcept(FType.BROADCAST) )
- fedinst =
ReorgFEDInstruction.parseInstruction(
-
InstructionUtils.concatOperands(rinst.getInstructionString(),FederatedOutput.NONE.name()));
+ else if( inst instanceof MMChainCPInstruction) {
+ MMChainCPInstruction linst =
(MMChainCPInstruction) inst;
+ MatrixObject mo =
ec.getMatrixObject(linst.input1);
+ if( mo.isFederated(FType.ROW) )
+ fedinst =
MMChainFEDInstruction.parseInstruction(linst.getInstructionString());
}
- else if(instruction.input1 != null &&
instruction.input1.isMatrix()
- && ec.containsVariable(instruction.input1)) {
+ else if( inst instanceof MMTSJCPInstruction ) {
+ MMTSJCPInstruction linst = (MMTSJCPInstruction)
inst;
+ MatrixObject mo =
ec.getMatrixObject(linst.input1);
+ if( (mo.isFederated(FType.ROW) &&
mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isLeft()) ||
+ (mo.isFederated(FType.COL) &&
mo.isFederatedExcept(FType.BROADCAST) && linst.getMMTSJType().isRight()))
+ fedinst =
TsmmFEDInstruction.parseInstruction(linst.getInstructionString());
+ }
+ else if (inst instanceof UnaryCPInstruction && ! (inst
instanceof IndexingCPInstruction)) {
+ UnaryCPInstruction instruction =
(UnaryCPInstruction) inst;
+ if(inst instanceof ReorgCPInstruction &&
(inst.getOpcode().equals("r'") || inst.getOpcode().equals("rdiag")
+ || inst.getOpcode().equals("rev"))) {
+ ReorgCPInstruction rinst =
(ReorgCPInstruction) inst;
+ CacheableData<?> mo =
ec.getCacheableData(rinst.input1);
- MatrixObject mo1 =
ec.getMatrixObject(instruction.input1);
- if( mo1.isFederatedExcept(FType.BROADCAST) ) {
-
if(instruction.getOpcode().equalsIgnoreCase("cm"))
- fedinst =
CentralMomentFEDInstruction.parseInstruction(inst.getInstructionString());
- else
if(inst.getOpcode().equalsIgnoreCase("qsort")) {
-
if(mo1.getFedMapping().getFederatedRanges().length == 1)
- fedinst =
QuantileSortFEDInstruction.parseInstruction(inst.getInstructionString());
+ if((mo instanceof MatrixObject || mo
instanceof FrameObject)
+ &&
mo.isFederatedExcept(FType.BROADCAST) )
+ fedinst =
ReorgFEDInstruction.parseInstruction(
+
InstructionUtils.concatOperands(rinst.getInstructionString(),FederatedOutput.NONE.name()));
+ }
+ else if(instruction.input1 != null &&
instruction.input1.isMatrix()
+ &&
ec.containsVariable(instruction.input1)) {
+
+ MatrixObject mo1 =
ec.getMatrixObject(instruction.input1);
+ if(
mo1.isFederatedExcept(FType.BROADCAST) ) {
+
if(instruction.getOpcode().equalsIgnoreCase("cm"))
+ fedinst =
CentralMomentFEDInstruction.parseInstruction(inst.getInstructionString());
+ else
if(inst.getOpcode().equalsIgnoreCase("qsort")) {
+
if(mo1.getFedMapping().getFederatedRanges().length == 1)
+ fedinst =
QuantileSortFEDInstruction.parseInstruction(inst.getInstructionString());
+ }
+ else
if(inst.getOpcode().equalsIgnoreCase("rshape"))
+ fedinst =
ReshapeFEDInstruction.parseInstruction(inst.getInstructionString());
+ else if(inst instanceof
AggregateUnaryCPInstruction &&
+
((AggregateUnaryCPInstruction) instruction).getAUType() ==
AggregateUnaryCPInstruction.AUType.DEFAULT)
+ fedinst =
AggregateUnaryFEDInstruction.parseInstruction(
+
InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+ else if(inst instanceof
UnaryMatrixCPInstruction) {
+
if(UnaryMatrixFEDInstruction.isValidOpcode(inst.getOpcode()) &&
+
!(inst.getOpcode().equalsIgnoreCase("ucumk+*") && mo1.isFederated(FType.COL)))
+ fedinst =
UnaryMatrixFEDInstruction.parseInstruction(inst.getInstructionString());
+ }
}
- else
if(inst.getOpcode().equalsIgnoreCase("rshape"))
- fedinst =
ReshapeFEDInstruction.parseInstruction(inst.getInstructionString());
- else if(inst instanceof
AggregateUnaryCPInstruction &&
- ((AggregateUnaryCPInstruction)
instruction).getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT)
- fedinst =
AggregateUnaryFEDInstruction.parseInstruction(
+ }
+ }
+ else if (inst instanceof BinaryCPInstruction) {
+ BinaryCPInstruction instruction =
(BinaryCPInstruction) inst;
+ if( (instruction.input1.isMatrix() &&
ec.getMatrixObject(instruction.input1).isFederatedExcept(FType.BROADCAST))
+ || (instruction.input2.isMatrix() &&
ec.getMatrixObject(instruction.input2).isFederatedExcept(FType.BROADCAST))) {
+
if(instruction.getOpcode().equals("append") )
+ fedinst =
AppendFEDInstruction.parseInstruction(inst.getInstructionString());
+ else
if(instruction.getOpcode().equals("qpick"))
+ fedinst =
QuantilePickFEDInstruction.parseInstruction(inst.getInstructionString());
+ else
if("cov".equals(instruction.getOpcode()) &&
(ec.getMatrixObject(instruction.input1).isFederated(FType.ROW) ||
+
ec.getMatrixObject(instruction.input2).isFederated(FType.ROW)))
+ fedinst =
CovarianceFEDInstruction.parseInstruction((CovarianceCPInstruction)inst);
+ else
+ fedinst =
BinaryFEDInstruction.parseInstruction(
InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
- else if(inst instanceof
UnaryMatrixCPInstruction) {
-
if(UnaryMatrixFEDInstruction.isValidOpcode(inst.getOpcode()) &&
-
!(inst.getOpcode().equalsIgnoreCase("ucumk+*") && mo1.isFederated(FType.COL)))
- fedinst =
UnaryMatrixFEDInstruction.parseInstruction(inst.getInstructionString());
- }
}
}
- }
- else if (inst instanceof BinaryCPInstruction) {
- BinaryCPInstruction instruction = (BinaryCPInstruction)
inst;
- if( (instruction.input1.isMatrix() &&
ec.getMatrixObject(instruction.input1).isFederatedExcept(FType.BROADCAST))
- || (instruction.input2.isMatrix() &&
ec.getMatrixObject(instruction.input2).isFederatedExcept(FType.BROADCAST))) {
- if(instruction.getOpcode().equals("append") )
- fedinst =
AppendFEDInstruction.parseInstruction(inst.getInstructionString());
- else if(instruction.getOpcode().equals("qpick"))
- fedinst =
QuantilePickFEDInstruction.parseInstruction(inst.getInstructionString());
- else if("cov".equals(instruction.getOpcode())
&& (ec.getMatrixObject(instruction.input1).isFederated(FType.ROW) ||
-
ec.getMatrixObject(instruction.input2).isFederated(FType.ROW)))
- fedinst =
CovarianceFEDInstruction.parseInstruction((CovarianceCPInstruction)inst);
- else
- fedinst =
BinaryFEDInstruction.parseInstruction(
-
InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+ else if( inst instanceof
ParameterizedBuiltinCPInstruction ) {
+ ParameterizedBuiltinCPInstruction pinst =
(ParameterizedBuiltinCPInstruction) inst;
+ if( ArrayUtils.contains(PARAM_BUILTINS,
pinst.getOpcode()) && pinst.getTarget(ec).isFederatedExcept(FType.BROADCAST) )
+ fedinst =
ParameterizedBuiltinFEDInstruction.parseInstruction(pinst.getInstructionString());
}
- }
- else if( inst instanceof ParameterizedBuiltinCPInstruction ) {
- ParameterizedBuiltinCPInstruction pinst =
(ParameterizedBuiltinCPInstruction) inst;
- if( ArrayUtils.contains(PARAM_BUILTINS,
pinst.getOpcode()) && pinst.getTarget(ec).isFederatedExcept(FType.BROADCAST) )
- fedinst =
ParameterizedBuiltinFEDInstruction.parseInstruction(pinst.getInstructionString());
- }
- else if (inst instanceof
MultiReturnParameterizedBuiltinCPInstruction) {
- MultiReturnParameterizedBuiltinCPInstruction minst =
(MultiReturnParameterizedBuiltinCPInstruction) inst;
- if(minst.getOpcode().equals("transformencode") &&
minst.input1.isFrame()) {
- CacheableData<?> fo =
ec.getCacheableData(minst.input1);
- if(fo.isFederatedExcept(FType.BROADCAST)) {
- fedinst =
MultiReturnParameterizedBuiltinFEDInstruction
-
.parseInstruction(minst.getInstructionString());
+ else if (inst instanceof
MultiReturnParameterizedBuiltinCPInstruction) {
+ MultiReturnParameterizedBuiltinCPInstruction
minst = (MultiReturnParameterizedBuiltinCPInstruction) inst;
+ if(minst.getOpcode().equals("transformencode")
&& minst.input1.isFrame()) {
+ CacheableData<?> fo =
ec.getCacheableData(minst.input1);
+
if(fo.isFederatedExcept(FType.BROADCAST)) {
+ fedinst =
MultiReturnParameterizedBuiltinFEDInstruction
+
.parseInstruction(minst.getInstructionString());
+ }
}
}
- }
- else if(inst instanceof IndexingCPInstruction) {
- // matrix and frame indexing
- IndexingCPInstruction minst = (IndexingCPInstruction)
inst;
- if((minst.input1.isMatrix() || minst.input1.isFrame())
- &&
ec.getCacheableData(minst.input1).isFederatedExcept(FType.BROADCAST)) {
- fedinst =
IndexingFEDInstruction.parseInstruction(minst.getInstructionString());
+ else if(inst instanceof IndexingCPInstruction) {
+ // matrix and frame indexing
+ IndexingCPInstruction minst =
(IndexingCPInstruction) inst;
+ if((minst.input1.isMatrix() ||
minst.input1.isFrame())
+ &&
ec.getCacheableData(minst.input1).isFederatedExcept(FType.BROADCAST)) {
+ fedinst =
IndexingFEDInstruction.parseInstruction(minst.getInstructionString());
+ }
}
- }
- else if(inst instanceof TernaryCPInstruction) {
- TernaryCPInstruction tinst = (TernaryCPInstruction)
inst;
- if(inst.getOpcode().equals("_map") && inst instanceof
TernaryFrameScalarCPInstruction &&
!inst.getInstructionString().contains("UtilFunctions")
- && tinst.input1.isFrame() &&
ec.getFrameObject(tinst.input1).isFederated()) {
- long margin =
ec.getScalarInput(tinst.input3).getLongValue();
- FrameObject fo =
ec.getFrameObject(tinst.input1);
- if(margin == 0 || (fo.isFederated(FType.ROW) &&
margin == 1) || (fo.isFederated(FType.COL) && margin == 2))
- fedinst =
TernaryFrameScalarFEDInstruction
-
.parseInstruction(InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+ else if(inst instanceof TernaryCPInstruction) {
+ TernaryCPInstruction tinst =
(TernaryCPInstruction) inst;
+ if(inst.getOpcode().equals("_map") && inst
instanceof TernaryFrameScalarCPInstruction &&
!inst.getInstructionString().contains("UtilFunctions")
+ && tinst.input1.isFrame() &&
ec.getFrameObject(tinst.input1).isFederated()) {
+ long margin =
ec.getScalarInput(tinst.input3).getLongValue();
+ FrameObject fo =
ec.getFrameObject(tinst.input1);
+ if(margin == 0 ||
(fo.isFederated(FType.ROW) && margin == 1) || (fo.isFederated(FType.COL) &&
margin == 2))
+ fedinst =
TernaryFrameScalarFEDInstruction
+
.parseInstruction(InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+ }
+ else if((tinst.input1.isMatrix() &&
ec.getCacheableData(tinst.input1).isFederatedExcept(FType.BROADCAST))
+ || (tinst.input2.isMatrix() &&
ec.getCacheableData(tinst.input2).isFederatedExcept(FType.BROADCAST))
+ || (tinst.input3.isMatrix() &&
ec.getCacheableData(tinst.input3).isFederatedExcept(FType.BROADCAST))) {
+ fedinst =
TernaryFEDInstruction.parseInstruction(tinst.getInstructionString());
+ }
}
- else if((tinst.input1.isMatrix() &&
ec.getCacheableData(tinst.input1).isFederatedExcept(FType.BROADCAST))
- || (tinst.input2.isMatrix() &&
ec.getCacheableData(tinst.input2).isFederatedExcept(FType.BROADCAST))
- || (tinst.input3.isMatrix() &&
ec.getCacheableData(tinst.input3).isFederatedExcept(FType.BROADCAST))) {
- fedinst =
TernaryFEDInstruction.parseInstruction(tinst.getInstructionString());
+ else if(inst instanceof VariableCPInstruction ){
+ VariableCPInstruction ins =
(VariableCPInstruction) inst;
+ if(ins.getVariableOpcode() ==
VariableOperationCode.Write
+ && ins.getInput1().isMatrix()
+ &&
ins.getInput3().getName().contains("federated")){
+ fedinst =
VariableFEDInstruction.parseInstruction(ins);
+ }
+ else if(ins.getVariableOpcode() ==
VariableOperationCode.CastAsFrameVariable
+ && ins.getInput1().isMatrix()
+ &&
ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
+ fedinst =
VariableFEDInstruction.parseInstruction(ins);
+ }
+ else if(ins.getVariableOpcode() ==
VariableOperationCode.CastAsMatrixVariable
+ && ins.getInput1().isFrame()
+ &&
ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
+ fedinst =
VariableFEDInstruction.parseInstruction(ins);
+ }
}
- }
- else if(inst instanceof VariableCPInstruction ){
- VariableCPInstruction ins = (VariableCPInstruction)
inst;
- if(ins.getVariableOpcode() ==
VariableOperationCode.Write
- && ins.getInput1().isMatrix()
- &&
ins.getInput3().getName().contains("federated")){
- fedinst =
VariableFEDInstruction.parseInstruction(ins);
- }
- else if(ins.getVariableOpcode() ==
VariableOperationCode.CastAsFrameVariable
- && ins.getInput1().isMatrix()
- &&
ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
- fedinst =
VariableFEDInstruction.parseInstruction(ins);
- }
- else if(ins.getVariableOpcode() ==
VariableOperationCode.CastAsMatrixVariable
- && ins.getInput1().isFrame()
- &&
ec.getCacheableData(ins.getInput1()).isFederatedExcept(FType.BROADCAST)){
- fedinst =
VariableFEDInstruction.parseInstruction(ins);
+ else if(inst instanceof AggregateTernaryCPInstruction){
+ AggregateTernaryCPInstruction ins =
(AggregateTernaryCPInstruction) inst;
+ if(ins.input1.isMatrix() &&
ec.getCacheableData(ins.input1).isFederatedExcept(FType.BROADCAST)
+ && ins.input2.isMatrix() &&
ec.getCacheableData(ins.input2).isFederatedExcept(FType.BROADCAST)) {
+ fedinst =
AggregateTernaryFEDInstruction.parseInstruction(ins.getInstructionString());
+ }
}
- }
- else if(inst instanceof AggregateTernaryCPInstruction){
- AggregateTernaryCPInstruction ins =
(AggregateTernaryCPInstruction) inst;
- if(ins.input1.isMatrix() &&
ec.getCacheableData(ins.input1).isFederatedExcept(FType.BROADCAST)
- && ins.input2.isMatrix() &&
ec.getCacheableData(ins.input2).isFederatedExcept(FType.BROADCAST)) {
- fedinst =
AggregateTernaryFEDInstruction.parseInstruction(ins.getInstructionString());
+ else if(inst instanceof QuaternaryCPInstruction) {
+ QuaternaryCPInstruction instruction =
(QuaternaryCPInstruction) inst;
+ Data data = ec.getVariable(instruction.input1);
+ if(data instanceof MatrixObject &&
((MatrixObject) data).isFederatedExcept(FType.BROADCAST))
+ fedinst =
QuaternaryFEDInstruction.parseInstruction(instruction.getInstructionString());
}
- }
- else if(inst instanceof QuaternaryCPInstruction) {
- QuaternaryCPInstruction instruction =
(QuaternaryCPInstruction) inst;
- Data data = ec.getVariable(instruction.input1);
- if(data instanceof MatrixObject && ((MatrixObject)
data).isFederatedExcept(FType.BROADCAST))
- fedinst =
QuaternaryFEDInstruction.parseInstruction(instruction.getInstructionString());
- }
- else if(inst instanceof SpoofCPInstruction) {
- SpoofCPInstruction ins = (SpoofCPInstruction) inst;
- Class<?> scla = ins.getOperatorClass().getSuperclass();
- if(((scla == SpoofCellwise.class || scla ==
SpoofMultiAggregate.class || scla == SpoofOuterProduct.class)
+ else if(inst instanceof SpoofCPInstruction) {
+ SpoofCPInstruction ins = (SpoofCPInstruction)
inst;
+ Class<?> scla =
ins.getOperatorClass().getSuperclass();
+ if(((scla == SpoofCellwise.class || scla ==
SpoofMultiAggregate.class || scla == SpoofOuterProduct.class)
&& SpoofFEDInstruction.isFederated(ec,
ins.getInputs(), scla))
- || (scla == SpoofRowwise.class &&
SpoofFEDInstruction.isFederated(ec, FType.ROW, ins.getInputs(), scla))) {
- fedinst =
SpoofFEDInstruction.parseInstruction(ins.getInstructionString());
+ || (scla == SpoofRowwise.class &&
SpoofFEDInstruction.isFederated(ec, FType.ROW, ins.getInputs(), scla))) {
+ fedinst =
SpoofFEDInstruction.parseInstruction(ins.getInstructionString());
+ }
+ }
+ else if(inst instanceof CtableCPInstruction) {
+ CtableCPInstruction cinst =
(CtableCPInstruction) inst;
+ if(inst.getOpcode().equalsIgnoreCase("ctable")
+ && (
ec.getCacheableData(cinst.input1).isFederated(FType.ROW)
+ || (cinst.input2.isMatrix() &&
ec.getCacheableData(cinst.input2).isFederated(FType.ROW))
+ || (cinst.input3.isMatrix() &&
ec.getCacheableData(cinst.input3).isFederated(FType.ROW))))
+ fedinst =
CtableFEDInstruction.parseInstruction(cinst.getInstructionString());
}
- }
- else if(inst instanceof CtableCPInstruction) {
- CtableCPInstruction cinst = (CtableCPInstruction) inst;
- if(inst.getOpcode().equalsIgnoreCase("ctable")
- && (
ec.getCacheableData(cinst.input1).isFederated(FType.ROW)
- || (cinst.input2.isMatrix() &&
ec.getCacheableData(cinst.input2).isFederated(FType.ROW))
- || (cinst.input3.isMatrix() &&
ec.getCacheableData(cinst.input3).isFederated(FType.ROW))))
- fedinst =
CtableFEDInstruction.parseInstruction(cinst.getInstructionString());
- }
- //set thread id for federated context management
- if( fedinst != null ) {
- fedinst.setTID(ec.getTID());
- return fedinst;
+ //set thread id for federated context management
+ if( fedinst != null ) {
+ fedinst.setTID(ec.getTID());
+ return fedinst;
+ }
}
return inst;