This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 42143ec [SYSTEMDS-2921] Federated cumulative aggregates (e.g., cumsum)
42143ec is described below
commit 42143ec14618332cce9ae44129829d3bfff86762
Author: Olga <[email protected]>
AuthorDate: Mon Apr 5 16:24:17 2021 +0200
[SYSTEMDS-2921] Federated cumulative aggregates (e.g., cumsum)
Closes #1216.
---
.../controlprogram/federated/FederationUtils.java | 36 ++--
.../runtime/instructions/InstructionUtils.java | 16 +-
.../runtime/instructions/fed/FEDInstruction.java | 1 +
.../instructions/fed/FEDInstructionUtils.java | 3 +-
.../fed/UnaryMatrixFEDInstruction.java | 218 +++++++++++++++++++--
.../sysds/runtime/matrix/data/MatrixBlock.java | 4 +-
.../primitives/FederatedFullCumulativeTest.java | 211 ++++++++++++++++++++
.../federated/cumulative/FederatedCummaxTest.dml | 33 ++++
.../cumulative/FederatedCummaxTestReference.dml | 26 +++
.../federated/cumulative/FederatedCumminTest.dml | 33 ++++
.../cumulative/FederatedCumminTestReference.dml | 26 +++
.../federated/cumulative/FederatedCumprodTest.dml | 33 ++++
.../cumulative/FederatedCumprodTestReference.dml | 26 +++
.../federated/cumulative/FederatedCumsumTest.dml | 33 ++++
.../cumulative/FederatedCumsumTestReference.dml | 26 +++
.../cumulative/FederatedCumsumprodTest.dml | 32 +++
.../FederatedCumsumprodTestReference.dml | 25 +++
17 files changed, 751 insertions(+), 31 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
index 31a7136..dc2fae5 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -66,20 +66,7 @@ public class FederationUtils {
}
public static FederatedRequest callInstruction(String inst, CPOperand
varOldOut, CPOperand[] varOldIn, long[] varNewIn) {
- //TODO better and safe replacement of operand names -->
instruction utils
- long id = getNextFedDataID();
- String linst = inst.replace(ExecType.SPARK.name(),
ExecType.CP.name());
- linst = linst.replace(
-
Lop.OPERAND_DELIMITOR+varOldOut.getName()+Lop.DATATYPE_PREFIX,
-
Lop.OPERAND_DELIMITOR+String.valueOf(id)+Lop.DATATYPE_PREFIX);
- for(int i=0; i<varOldIn.length; i++)
- if( varOldIn[i] != null ) {
- linst = linst.replace(
-
Lop.OPERAND_DELIMITOR+varOldIn[i].getName()+Lop.DATATYPE_PREFIX,
-
Lop.OPERAND_DELIMITOR+String.valueOf(varNewIn[i])+Lop.DATATYPE_PREFIX);
- linst =
linst.replace("="+varOldIn[i].getName(), "="+String.valueOf(varNewIn[i]));
//parameterized
- }
- return new FederatedRequest(RequestType.EXEC_INST, id, linst);
+ return callInstruction(inst, varOldOut, getNextFedDataID(),
varOldIn, varNewIn);
}
public static FederatedRequest[] callInstruction(String[] inst,
CPOperand varOldOut, CPOperand[] varOldIn, long[] varNewIn) {
@@ -89,10 +76,14 @@ public class FederationUtils {
for(int j=0; j<inst.length; j++) {
for(int i = 0; i < varOldIn.length; i++) {
linst[j] =
linst[j].replace(ExecType.SPARK.name(), ExecType.CP.name());
- linst[j] =
linst[j].replace(Lop.OPERAND_DELIMITOR + varOldOut.getName() +
Lop.DATATYPE_PREFIX, Lop.OPERAND_DELIMITOR + String.valueOf(id) +
Lop.DATATYPE_PREFIX);
+ linst[j] = linst[j].replace(
+ Lop.OPERAND_DELIMITOR +
varOldOut.getName() + Lop.DATATYPE_PREFIX,
+ Lop.OPERAND_DELIMITOR +
String.valueOf(id) + Lop.DATATYPE_PREFIX);
if(varOldIn[i] != null) {
- linst[j] =
linst[j].replace(Lop.OPERAND_DELIMITOR + varOldIn[i].getName() +
Lop.DATATYPE_PREFIX, Lop.OPERAND_DELIMITOR + String.valueOf(varNewIn[i]) +
Lop.DATATYPE_PREFIX);
+ linst[j] = linst[j].replace(
+ Lop.OPERAND_DELIMITOR +
varOldIn[i].getName() + Lop.DATATYPE_PREFIX,
+ Lop.OPERAND_DELIMITOR +
String.valueOf(varNewIn[i]) + Lop.DATATYPE_PREFIX);
linst[j] = linst[j].replace("=" +
varOldIn[i].getName(), "=" + String.valueOf(varNewIn[i])); //parameterized
}
}
@@ -101,6 +92,19 @@ public class FederationUtils {
return fr;
}
+ public static FederatedRequest callInstruction(String inst, CPOperand
varOldOut, long outputId, CPOperand[] varOldIn, long[] varNewIn) {
+ String linst = InstructionUtils.replaceOperand(inst, 0,
ExecType.CP.name());
+ linst =
linst.replace(Lop.OPERAND_DELIMITOR+varOldOut.getName()+Lop.DATATYPE_PREFIX,
Lop.OPERAND_DELIMITOR+outputId+Lop.DATATYPE_PREFIX);
+ for(int i=0; i<varOldIn.length; i++)
+ if( varOldIn[i] != null ) {
+ linst = linst.replace(
+
Lop.OPERAND_DELIMITOR+varOldIn[i].getName()+Lop.DATATYPE_PREFIX,
+
Lop.OPERAND_DELIMITOR+(varNewIn[i])+Lop.DATATYPE_PREFIX);
+ linst =
linst.replace("="+varOldIn[i].getName(), "="+(varNewIn[i])); //parameterized
+ }
+ return new FederatedRequest(RequestType.EXEC_INST, outputId,
linst);
+ }
+
public static MatrixBlock aggAdd(Future<FederatedResponse>[] ffr) {
try {
SimpleOperator op = new
SimpleOperator(Plus.getPlusFnObject());
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
index 9245132..77b53f9 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
@@ -1000,7 +1000,11 @@ public class InstructionUtils
public static String createLiteralOperand(String val, ValueType vt) {
return InstructionUtils.concatOperandParts(val,
DataType.SCALAR.name(), vt.name(), "true");
}
-
+
+ public static String createOperand(CPOperand operand) {
+ return InstructionUtils.concatOperandParts(operand.getName(),
operand.getDataType().name(), operand.getValueType().name());
+ }
+
public static String replaceOperand(String instStr, int operand, String
newValue) {
//split instruction and check for correctness
String[] parts = instStr.split(Lop.OPERAND_DELIMITOR);
@@ -1048,4 +1052,14 @@ public class InstructionUtils
sb.append(inputs[i]);
return sb.toString();
}
+
+ public static String constructTernaryString(String instString,
CPOperand op1, CPOperand op2, CPOperand op3, CPOperand out) {
+ return concatOperands(constructBinaryInstString(instString,
"ifelse", op1, op2, op3), createOperand(out));
+ }
+
+ public static String constructBinaryInstString(String instString,
String opcode, CPOperand op1, CPOperand op2, CPOperand out) {
+ String[] parts = instString.split(Lop.OPERAND_DELIMITOR);
+ parts[1] = opcode;
+ return InstructionUtils.concatOperands(parts[0], parts[1],
createOperand(op1), createOperand(op2), createOperand(out));
+ }
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
index 8f58a8b..1d3c54c 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
@@ -32,6 +32,7 @@ public abstract class FEDInstruction extends Instruction {
AggregateTernary,
Append,
Binary,
+ CumulativeAggregate,
Init,
MultiReturnParameterizedBuiltin,
MMChain,
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 214023f..7439b6f 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -127,7 +127,8 @@ public class FEDInstructionUtils {
((AggregateUnaryCPInstruction)
instruction).getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT)
fedinst =
AggregateUnaryFEDInstruction.parseInstruction(inst.getInstructionString());
else if(inst instanceof
UnaryMatrixCPInstruction && mo1.isFederated()) {
-
if(UnaryMatrixFEDInstruction.isValidOpcode(inst.getOpcode()))
+
if(UnaryMatrixFEDInstruction.isValidOpcode(inst.getOpcode()) &&
+
!(inst.getOpcode().equalsIgnoreCase("ucumk+*") && mo1.isFederated(FType.COL)))
fedinst =
UnaryMatrixFEDInstruction.parseInstruction(inst.getInstructionString());
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java
index c6e0b2c..1f5cdd2 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java
@@ -19,17 +19,23 @@
package org.apache.sysds.runtime.instructions.fed;
+import java.util.concurrent.Future;
+
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.functionobjects.ValueFunction;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.matrix.data.LibCommonsMath;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
@@ -39,8 +45,7 @@ public class UnaryMatrixFEDInstruction extends
UnaryFEDInstruction {
}
public static boolean isValidOpcode(String opcode) {
- return !LibCommonsMath.isSupportedUnaryOperation(opcode)
- && !opcode.startsWith("ucum"); //ucumk+ ucum* ucumk+*
ucummin ucummax
+ return !LibCommonsMath.isSupportedUnaryOperation(opcode);
}
public static UnaryMatrixFEDInstruction parseInstruction(String str) {
@@ -50,7 +55,7 @@ public class UnaryMatrixFEDInstruction extends
UnaryFEDInstruction {
String[] parts =
InstructionUtils.getInstructionPartsWithValueType(str);
String opcode;
opcode = parts[0];
- if( opcode.equalsIgnoreCase("exp") && parts.length == 5) {
+ if( (opcode.equalsIgnoreCase("exp") ||
opcode.startsWith("ucum")) && parts.length == 5) {
in.split(parts[1]);
out.split(parts[2]);
ValueFunction func = Builtin.getBuiltinFnObject(opcode);
@@ -64,16 +69,207 @@ public class UnaryMatrixFEDInstruction extends
UnaryFEDInstruction {
@Override
public void processInstruction(ExecutionContext ec) {
MatrixObject mo1 = ec.getMatrixObject(input1);
+ if(getOpcode().startsWith("ucum") &&
mo1.isFederated(FederationMap.FType.ROW))
+ processCumulativeInstruction(ec, mo1);
+ else {
+ //federated execution on arbitrary row/column partitions
+ //(only assumption for sparse-unsafe: fed mapping
covers entire matrix)
+ FederatedRequest fr1 =
FederationUtils.callInstruction(instString, output,
+ new CPOperand[] {input1}, new long[]
{mo1.getFedMapping().getID()});
+ mo1.getFedMapping().execute(getTID(), true, fr1);
+
+ setOutputFedMapping(ec, mo1, fr1.getID());
+ }
+ }
+
+ public void processCumulativeInstruction(ExecutionContext ec,
MatrixObject mo1) {
+ String opcode = getOpcode();
+ MatrixObject out;
+ if(opcode.equalsIgnoreCase("ucumk+*")) {
+ FederatedRequest fr1 =
FederationUtils.callInstruction(instString, output,
+ new CPOperand[] {input1}, new long[]
{mo1.getFedMapping().getID()});
+ FederatedRequest fr2 = new
FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr1.getID());
+ Future<FederatedResponse>[] tmp =
mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
+ out = setOutputFedMapping(ec, mo1, fr1.getID());
+
+ MatrixBlock scalingValues = getScalars(mo1, tmp);
+ setScalingValues(ec, mo1, out, scalingValues);
+ }
+ else {
+ String colAgg = opcode.replace("ucum", "uac");
+ String agg2 = opcode.replace(opcode.contains("ucumk")?
"ucumk" :"ucum", "");
+
+ double init = opcode.equalsIgnoreCase("ucumk+") ? 0.0:
+ opcode.equalsIgnoreCase("ucum*") ? 1.0 :
+ opcode.equalsIgnoreCase("ucummin") ?
Double.MAX_VALUE : -Double.MAX_VALUE;
+
+ Future<FederatedResponse>[] tmp =
modifyAndGetInstruction(colAgg, mo1);
+ MatrixBlock scalingValues = getResultBlock(tmp,
(int)mo1.getNumColumns(), opcode, init);
+
+ out = ec.getMatrixObject(output);
+ setScalingValues(agg2, ec, mo1, out, scalingValues,
init);
+ }
+ processCumulative(out);
+ }
+
+ private Future<FederatedResponse>[] modifyAndGetInstruction(String
newInst, MatrixObject mo1) {
+ String modifiedInstString =
InstructionUtils.replaceOperand(instString, 1, newInst);
+
+ FederatedRequest fr1 =
FederationUtils.callInstruction(modifiedInstString, output,
+ new CPOperand[] {input1}, new long[]
{mo1.getFedMapping().getID()});
+ FederatedRequest fr2 = new
FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr1.getID());
+ return mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
+ }
+
+ private void processCumulative(MatrixObject out) {
+ String modifiedInstString =
InstructionUtils.replaceOperand(instString, 2,
InstructionUtils.createOperand(output));
+
+ FederatedRequest fr4 =
FederationUtils.callInstruction(modifiedInstString, output,
out.getFedMapping().getID(),
+ new CPOperand[] {output}, new long[]
{out.getFedMapping().getID()});
+ out.getFedMapping().execute(getTID(), true, fr4);
+
+
out.setFedMapping(out.getFedMapping().copyWithNewID(fr4.getID()));
+
+ // modify fed ranges since ucumk+* output is always nx1
+ if(getOpcode().equalsIgnoreCase("ucumk+*")) {
+ out.getDataCharacteristics().set(out.getNumRows(), 1L,
(int) out.getBlocksize());
+ for(int i = 0; i <
out.getFedMapping().getFederatedRanges().length; i++)
+
out.getFedMapping().getFederatedRanges()[i].setEndDim(1, 1);
+ } else {
+ out.getDataCharacteristics().set(out.getNumRows(),
out.getNumColumns(), (int) out.getBlocksize());
+ }
+ }
+
+ private static MatrixBlock getResultBlock(Future<FederatedResponse>[]
tmp, int cols, String opcode, double init) {
+ //TODO perf simple rbind, as the first row (init) is anyway not
transferred
- //federated execution on arbitrary row/column partitions
- //(only assumption for sparse-unsafe: fed mapping covers entire
matrix)
- FederatedRequest fr1 =
FederationUtils.callInstruction(instString, output,
- new CPOperand[]{input1}, new
long[]{mo1.getFedMapping().getID()});
- mo1.getFedMapping().execute(getTID(), true, fr1);
+ //collect row vectors into local matrix
+ MatrixBlock res = new MatrixBlock(tmp.length, cols, init);
+ for(int i = 0; i < tmp.length-1; i++)
+ try {
+ res.copy(i+1, i+1, 0, cols-1, ((MatrixBlock)
tmp[i].get().getData()[0]), true);
+ }
+ catch(Exception e) {
+ throw new DMLRuntimeException("Federated Get
data failed with exception on UnaryMatrixFEDInstruction", e);
+ }
+
+ //local cumulative aggregate
+ return res.unaryOperations(
+ new UnaryOperator(Builtin.getBuiltinFnObject(opcode)),
+ new MatrixBlock());
+ }
+
+ private MatrixBlock getScalars(MatrixObject mo1,
Future<FederatedResponse>[] tmp) {
+ MatrixBlock[] aggRes = getAggMatrices(mo1);
+ MatrixBlock prod = aggRes[0];
+ MatrixBlock firstValues = aggRes[1];
+ for(int i = 0; i < tmp.length; i++)
+ try {
+ MatrixBlock curr = ((MatrixBlock)
tmp[i].get().getData()[0]);
+ prod.setValue(i, 0,
curr.getValue(curr.getNumRows()-1, 0));
+ }
+ catch(Exception e) {
+ throw new DMLRuntimeException("Federated Get
data failed with exception on UnaryMatrixFEDInstruction", e);
+ }
+
+ // aggregate sumprod to get scalars
+ MatrixBlock a = new MatrixBlock(tmp.length, 1, 0.0);
+ a.copy(1, a.getNumRows()-1, 0, 0,
+ prod.unaryOperations(new
UnaryOperator(Builtin.getBuiltinFnObject("ucumk+*")), new MatrixBlock())
+ .slice(0, prod.getNumRows()-2), true);
+
+ // compute B11 = B11 + B12 ⊙ a
+ MatrixBlock B = firstValues.slice(0,
firstValues.getNumRows()-1,1, 1)
+
.binaryOperations(InstructionUtils.parseBinaryOperator("*"), a, new
MatrixBlock());
+ return
B.binaryOperationsInPlace(InstructionUtils.parseBinaryOperator("+"),
firstValues.slice(0,firstValues.getNumRows()-1,0,0));
+ }
+
+ private MatrixBlock[] getAggMatrices(MatrixObject mo1) {
+ Future<FederatedResponse>[] tmp =
modifyAndGetInstruction("ucum*", mo1);
+
+ // slice and return prod and first value
+ MatrixBlock prod = new MatrixBlock(tmp.length, 2, 0.0);
+ MatrixBlock firstValues = new MatrixBlock(tmp.length, 2, 0.0);
+ for(int i = 0; i < tmp.length; i++)
+ try {
+ MatrixBlock curr = ((MatrixBlock)
tmp[i].get().getData()[0]);
+ prod.setValue(i, 1,
curr.getValue(curr.getNumRows()-1, 1));
+ firstValues.copy(i, i, 0,1, curr.slice(0, 0),
true);
+ }
+ catch(Exception e) {
+ throw new DMLRuntimeException("Federated Get
data failed with exception on UnaryMatrixFEDInstruction", e);
+ }
+ return new MatrixBlock[] {prod, firstValues};
+ }
+
+ private void setScalingValues(ExecutionContext ec, MatrixObject mo1,
MatrixObject out, MatrixBlock scalingValues) {
+ MatrixBlock condition = new MatrixBlock((int) mo1.getNumRows(),
(int) mo1.getNumColumns(), 1.0);
+ MatrixBlock mb2 = new MatrixBlock((int) mo1.getNumRows(), (int)
mo1.getNumColumns(), 0.0);
+
+ for(int i = 0; i < scalingValues.getNumRows()-1; i++) {
+ int step = (int)
mo1.getFedMapping().getFederatedRanges()[i + 1].getBeginDims()[0];
+ condition.setValue(step, 0, 0.0);
+ mb2.setValue(step, 0, scalingValues.getValue(i + 1, 0));
+ }
+
+ MatrixObject cond =
ExecutionContext.createMatrixObject(condition);
+ long condID = FederationUtils.getNextFedDataID();
+ ec.setVariable(String.valueOf(condID), cond);
+
+ MatrixObject mo2 = ExecutionContext.createMatrixObject(mb2);
+ long varID2 = FederationUtils.getNextFedDataID();
+ ec.setVariable(String.valueOf(varID2), mo2);
+
+ CPOperand opCond = new CPOperand(String.valueOf(condID),
ValueType.FP64, DataType.MATRIX);
+ CPOperand op2 = new CPOperand(String.valueOf(varID2),
ValueType.FP64, DataType.MATRIX);
+
+ String ternaryInstString =
InstructionUtils.constructTernaryString(instString, opCond, input1, op2,
output);
+
+ FederatedRequest[] fr1 =
mo1.getFedMapping().broadcastSliced(cond, false);
+ FederatedRequest[] fr2 =
mo1.getFedMapping().broadcastSliced(mo2, false);
+ FederatedRequest fr3 =
FederationUtils.callInstruction(ternaryInstString, output,
+ new CPOperand[] {input1, opCond, op2}, new long[]
{mo1.getFedMapping().getID(), fr1[0].getID(), fr2[0].getID()});
+ //TODO perf no need to execute here, we can piggyback the
requests onto the final cumagg
+ mo1.getFedMapping().execute(getTID(), true, fr1, fr2, fr3);
+
+
out.setFedMapping(mo1.getFedMapping().copyWithNewID(fr3.getID()));
+
+ ec.removeVariable(opCond.getName());
+ ec.removeVariable(op2.getName());
+ }
+
+ private void setScalingValues(String opcode, ExecutionContext ec,
MatrixObject mo1, MatrixObject out, MatrixBlock scalingValues, double init) {
+ //TODO perf improvement (currently this creates a sliced
broadcast in the size of the original matrix
+ //but sparse w/ strategically placed offsets, but would need to
be dense for dense prod/cumsum)
- //set characteristics and fed mapp
+ //allocated large matrix of init value and placed offset rows
in first row of every partition
+ MatrixBlock mb2 = new MatrixBlock((int) mo1.getNumRows(), (int)
mo1.getNumColumns(), init);
+ for(int i = 1; i < scalingValues.getNumRows(); i++) {
+ int step = (int)
mo1.getFedMapping().getFederatedRanges()[i].getBeginDims()[0];
+ mb2.copy(step, step, 0, (int)(mo1.getNumColumns()-1),
scalingValues.slice(i, i), true);
+ }
+
+ MatrixObject mo2 = ExecutionContext.createMatrixObject(mb2);
+ long varID2 = FederationUtils.getNextFedDataID();
+ ec.setVariable(String.valueOf(varID2), mo2);
+ CPOperand op2 = new CPOperand(String.valueOf(varID2),
ValueType.FP64, DataType.MATRIX);
+
+ String modifiedInstString =
InstructionUtils.constructBinaryInstString(instString, opcode, input1, op2,
output);
+
+ FederatedRequest[] fr1 =
mo1.getFedMapping().broadcastSliced(mo2, false);
+ FederatedRequest fr2 =
FederationUtils.callInstruction(modifiedInstString, output,
+ new CPOperand[] {input1, op2}, new long[]
{mo1.getFedMapping().getID(), fr1[0].getID()});
+ mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
+
+
out.setFedMapping(mo1.getFedMapping().copyWithNewID(fr2.getID()));
+
+ ec.removeVariable(op2.getName());
+ }
+
+ private MatrixObject setOutputFedMapping(ExecutionContext ec,
MatrixObject fedMapObj, long fedOutputID) {
MatrixObject out = ec.getMatrixObject(output);
- out.getDataCharacteristics().set(mo1.getNumRows(),
mo1.getNumColumns(), (int)mo1.getBlocksize());
-
out.setFedMapping(mo1.getFedMapping().copyWithNewID(fr1.getID()));
+
out.getDataCharacteristics().set(fedMapObj.getDataCharacteristics());
+
out.setFedMapping(fedMapObj.getFedMapping().copyWithNewID(fedOutputID));
+ return out;
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 615b28d..efd86a3 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -1466,9 +1466,9 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
* only done if 'awareDestNZ=true',
*
* @param rl row lower index, 0-based
- * @param ru row upper index, 0-based
+ * @param ru row upper index, 0-based, inclusive
* @param cl column lower index, 0-based
- * @param cu column upper index, 0-based
+ * @param cu column upper index, 0-based, inclusive
* @param src matrix block
* @param awareDestNZ
* true, forces (1) to remove existing non-zeros in the index
range of the
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullCumulativeTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullCumulativeTest.java
new file mode 100644
index 0000000..336f45e
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullCumulativeTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.lops.LopProperties.ExecType;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
[email protected]
+public class FederatedFullCumulativeTest extends AutomatedTestBase {
+ private final static String TEST_NAME1 = "FederatedCumsumTest";
+ private final static String TEST_NAME2 = "FederatedCumprodTest";
+ private final static String TEST_NAME3 = "FederatedCummaxTest";
+ private final static String TEST_NAME4 = "FederatedCumminTest";
+ private final static String TEST_NAME5 = "FederatedCumsumprodTest";
+
+ private final static String TEST_DIR =
"functions/federated/cumulative/";
+ private static final String TEST_CLASS_DIR = TEST_DIR +
FederatedFullCumulativeTest.class.getSimpleName() + "/";
+
+ private final static int blocksize = 1024;
+ @Parameterized.Parameter()
+ public int rows;
+ @Parameterized.Parameter(1)
+ public int cols;
+ @Parameterized.Parameter(2)
+ public boolean rowPartitioned;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][] {
+ {240, 4, true},
+ {240, 4, false},
+ });
+ }
+
+ private enum OpType {
+ SUM, PROD, SUMPROD, MAX, MIN
+ }
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME1, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME2, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME3, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME4, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME5, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {"S"}));
+ }
+
+ @Test
+ public void testSumDenseMatrixCP() { runCumOperationTest(OpType.SUM,
ExecType.CP); }
+
+// FIXME offset handling has some remaining issues
+// @Test
+// public void testProdDenseMatrixCP() {
+// runCumOperationTest(OpType.PROD, ExecType.CP);
+// }
+
+ @Test
+ public void testMaxDenseMatrixCP() {
+ runCumOperationTest(OpType.MAX, ExecType.CP);
+ }
+
+ @Test
+ public void testMinDenseMatrixCP() {
+ runCumOperationTest(OpType.MIN, ExecType.CP);
+ }
+
+// FIXME offset handling has some remaining issues
+// @Test
+// public void testSumprodDenseMatrixCP() {
+// runCumOperationTest(OpType.SUMPROD, ExecType.CP);
+// }
+
+ private void runCumOperationTest(OpType type, ExecType instType) {
+ ExecMode platformOld = setExecMode(instType);
+
+ String TEST_NAME = null;
+ switch(type) {
+ case SUM:
+ TEST_NAME = TEST_NAME1;
+ break;
+ case PROD:
+ TEST_NAME = TEST_NAME2;
+ break;
+ case MAX:
+ TEST_NAME = TEST_NAME3;
+ break;
+ case MIN:
+ TEST_NAME = TEST_NAME4;
+ break;
+ case SUMPROD:
+ TEST_NAME = TEST_NAME5;
+ break;
+ }
+
+ getAndLoadTestConfiguration(TEST_NAME);
+ String HOME = SCRIPT_DIR + TEST_DIR;
+
+ // write input matrices
+ int r = rows;
+ int c = cols / 4;
+ if(rowPartitioned) {
+ r = rows / 4;
+ c = cols;
+ }
+
+ double[][] X1 = getRandomMatrix(r, c, 1, 3, 1, 3);
+ double[][] X2 = getRandomMatrix(r, c, 1, 3, 1, 7);
+ double[][] X3 = getRandomMatrix(r, c, 1, 3, 1, 8);
+ double[][] X4 = getRandomMatrix(r, c, 1, 3, 1, 9);
+
+ MatrixCharacteristics mc = new MatrixCharacteristics(r, c,
blocksize, r * c);
+ writeInputMatrixWithMTD("X1", X1, false, mc);
+ writeInputMatrixWithMTD("X2", X2, false, mc);
+ writeInputMatrixWithMTD("X3", X3, false, mc);
+ writeInputMatrixWithMTD("X4", X4, false, mc);
+
+ // empty script name because we don't execute any script, just
start the worker
+ fullDMLScriptName = "";
+ int port1 = getRandomAvailablePort();
+ int port2 = getRandomAvailablePort();
+ int port3 = getRandomAvailablePort();
+ int port4 = getRandomAvailablePort();
+ Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+ Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+ Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
+ Thread t4 = startLocalFedWorkerThread(port4);
+
+ TestConfiguration config =
availableTestConfigurations.get(TEST_NAME);
+ loadTestConfiguration(config);
+
+ // Run reference dml script with normal matrix
+ fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+ programArgs = new String[] {"-stats", "100", "-args",
input("X1"), input("X2"), input("X3"), input("X4"),
+ expected("S"),
Boolean.toString(rowPartitioned).toUpperCase()};
+ runTest(true, false, null, -1);
+
+ // Run actual dml script with federated matrix
+
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[] {"-stats", "100", "-nvargs",
+ "in_X1=" + TestUtils.federatedAddress(port1,
input("X1")),
+ "in_X2=" + TestUtils.federatedAddress(port2,
input("X2")),
+ "in_X3=" + TestUtils.federatedAddress(port3,
input("X3")),
+ "in_X4=" + TestUtils.federatedAddress(port4,
input("X4")), "rows=" + rows, "cols=" + cols,
+ "rP=" + Boolean.toString(rowPartitioned).toUpperCase(),
"out_S=" + output("S")};
+
+ runTest(true, false, null, -1);
+
+ // compare via files
+ compareResults(1e-6);
+
+ switch(type) {
+ case SUM:
+
Assert.assertTrue(heavyHittersContainsString("fed_ucumk+"));
+ break;
+ case PROD:
+
Assert.assertTrue(heavyHittersContainsString("fed_ucum*"));
+ break;
+ case MAX:
+
Assert.assertTrue(heavyHittersContainsString("fed_ucummax"));
+ break;
+ case MIN:
+
Assert.assertTrue(heavyHittersContainsString("fed_ucummin"));
+ break;
+ case SUMPROD:
+
Assert.assertTrue(heavyHittersContainsString("ucumk+*"));
+ break;
+ }
+
+ // check that federated input files are still existing
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+ TestUtils.shutdownThreads(t1, t2, t3, t4);
+ resetExecMode(platformOld);
+ }
+}
diff --git
a/src/test/scripts/functions/federated/cumulative/FederatedCummaxTest.dml
b/src/test/scripts/functions/federated/cumulative/FederatedCummaxTest.dml
new file mode 100644
index 0000000..9f0f40c
--- /dev/null
+++ b/src/test/scripts/functions/federated/cumulative/FederatedCummaxTest.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0),
list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0),
list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4),
list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0,
3*($cols/4)), list($rows, $cols)));
+}
+
+s = cummax(A);
+write(s, $out_S);
diff --git
a/src/test/scripts/functions/federated/cumulative/FederatedCummaxTestReference.dml
b/src/test/scripts/functions/federated/cumulative/FederatedCummaxTestReference.dml
new file mode 100644
index 0000000..558c843
--- /dev/null
+++
b/src/test/scripts/functions/federated/cumulative/FederatedCummaxTestReference.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = cummax(A);
+write(s, $5);
diff --git
a/src/test/scripts/functions/federated/cumulative/FederatedCumminTest.dml
b/src/test/scripts/functions/federated/cumulative/FederatedCumminTest.dml
new file mode 100644
index 0000000..f088e6b
--- /dev/null
+++ b/src/test/scripts/functions/federated/cumulative/FederatedCumminTest.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0),
list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0),
list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4),
list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0,
3*($cols/4)), list($rows, $cols)));
+}
+
+s = cummin(A);
+write(s, $out_S);
diff --git
a/src/test/scripts/functions/federated/cumulative/FederatedCumminTestReference.dml
b/src/test/scripts/functions/federated/cumulative/FederatedCumminTestReference.dml
new file mode 100644
index 0000000..e65f584
--- /dev/null
+++
b/src/test/scripts/functions/federated/cumulative/FederatedCumminTestReference.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = cummin(A);
+write(s, $5);
diff --git
a/src/test/scripts/functions/federated/cumulative/FederatedCumprodTest.dml
b/src/test/scripts/functions/federated/cumulative/FederatedCumprodTest.dml
new file mode 100644
index 0000000..167cbbf
--- /dev/null
+++ b/src/test/scripts/functions/federated/cumulative/FederatedCumprodTest.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0),
list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0),
list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4),
list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0,
3*($cols/4)), list($rows, $cols)));
+}
+
+s = cumprod(A);
+write(s, $out_S);
diff --git
a/src/test/scripts/functions/federated/cumulative/FederatedCumprodTestReference.dml
b/src/test/scripts/functions/federated/cumulative/FederatedCumprodTestReference.dml
new file mode 100644
index 0000000..a6c7311
--- /dev/null
+++
b/src/test/scripts/functions/federated/cumulative/FederatedCumprodTestReference.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = cumprod(A);
+write(s, $5);
diff --git
a/src/test/scripts/functions/federated/cumulative/FederatedCumsumTest.dml
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumTest.dml
new file mode 100644
index 0000000..757a9d7
--- /dev/null
+++ b/src/test/scripts/functions/federated/cumulative/FederatedCumsumTest.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0),
list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0),
list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4),
list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0,
3*($cols/4)), list($rows, $cols)));
+}
+
+s = cumsum(A);
+write(s, $out_S);
diff --git
a/src/test/scripts/functions/federated/cumulative/FederatedCumsumTestReference.dml
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumTestReference.dml
new file mode 100644
index 0000000..c496905
--- /dev/null
+++
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumTestReference.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = cumsum(A);
+write(s, $5);
diff --git
a/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTest.dml
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTest.dml
new file mode 100644
index 0000000..43978d9
--- /dev/null
+++
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTest.dml
@@ -0,0 +1,32 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0),
list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0),
list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4),
list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0,
3*($cols/4)), list($rows, $cols)));
+}
+s = cumsumprod(A[,1:2]);
+write(s, $out_S);
diff --git
a/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTestReference.dml
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTestReference.dml
new file mode 100644
index 0000000..5c73b14
--- /dev/null
+++
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTestReference.dml
@@ -0,0 +1,25 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+s = cumsumprod(A[,1:2]);
+write(s, $5);