This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 3f166c03bcebce7c95a0d4fb82c0f526939f4fc1 Author: Sebastian Baunsgaard <[email protected]> AuthorDate: Thu Apr 4 17:18:24 2024 +0200 [SYSTEMDS-3685] FFT parallel, including other builtin functioncalls This commit enable the compile time propatation of the parallelization degree to the new FFT instructions. --- .../java/org/apache/sysds/hops/FunctionOp.java | 21 +++- src/main/java/org/apache/sysds/hops/Hop.java | 8 +- src/main/java/org/apache/sysds/hops/UnaryOp.java | 2 +- .../java/org/apache/sysds/lops/Compression.java | 10 +- .../java/org/apache/sysds/lops/FunctionCallCP.java | 48 +++++--- .../cp/AggregateUnaryCPInstruction.java | 6 +- .../instructions/cp/CompressionCPInstruction.java | 45 ++++--- .../runtime/instructions/cp/DnnCPInstruction.java | 13 +- .../cp/MultiReturnBuiltinCPInstruction.java | 68 ++++++----- ...ltiReturnComplexMatrixBuiltinCPInstruction.java | 44 ++++--- .../sysds/runtime/matrix/data/LibCommonsMath.java | 133 +++++++++++---------- .../runtime/matrix/data/LibMatrixFourier.java | 100 +++++++++++----- .../python/systemds/operator/algorithm/__init__.py | 2 + .../operator/algorithm/builtin/pageRank.py | 55 +++++++++ src/main/python/tests/lineage/test_lineagetrace.py | 36 ++++-- .../applications/ScalableDecompositionTest.java | 4 +- .../sysds/test/component/matrix/FourierTest.java | 94 +++++++++------ .../scripts/functions/builtin/GridSearchLMCV.dml | 3 +- 18 files changed, 449 insertions(+), 243 deletions(-) diff --git a/src/main/java/org/apache/sysds/hops/FunctionOp.java b/src/main/java/org/apache/sysds/hops/FunctionOp.java index 95b5411500..7f424d36d0 100644 --- a/src/main/java/org/apache/sysds/hops/FunctionOp.java +++ b/src/main/java/org/apache/sysds/hops/FunctionOp.java @@ -42,7 +42,7 @@ import org.apache.sysds.runtime.meta.DataCharacteristics; * Note: Currently, we support expressions in function arguments along with function calls * in expressions with single outputs, leaving multiple outputs handling as it is. */ -public class FunctionOp extends Hop +public class FunctionOp extends MultiThreadedHop { public enum FunctionType{ DML, @@ -342,7 +342,14 @@ public class FunctionOp extends Hop tmp.add( in.constructLops() ); //construct function call - FunctionCallCP fcall = new FunctionCallCP(tmp, _fnamespace, _fname, _inputNames, _outputNames, _outputHops, _opt, et); + final FunctionCallCP fcall; + if(isMultiThreadedOpType()) { + fcall = new FunctionCallCP(tmp, _fnamespace, _fname, _inputNames, _outputNames, _outputHops, _opt, et, + OptimizerUtils.getConstrainedNumThreads(_maxNumThreads)); + } + else { + fcall = new FunctionCallCP(tmp, _fnamespace, _fname, _inputNames, _outputNames, _outputHops, _opt, et); + } setLineNumbers(fcall); setLops(fcall); @@ -358,13 +365,14 @@ public class FunctionOp extends Hop // Lop matrixOut = lop.getFunctionOutputs().get(0); Lop compressionInstruction = null; + final int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); if(_compressedWorkloadTree != null) { SingletonLookupHashMap m = SingletonLookupHashMap.getMap(); int singletonID = m.put(_compressedWorkloadTree); - compressionInstruction = new Compression(getLops(), DataType.MATRIX, ValueType.FP64, et, singletonID); + compressionInstruction = new Compression(getLops(), DataType.MATRIX, ValueType.FP64, et, singletonID, k); } else - compressionInstruction = new Compression(getLops(), DataType.MATRIX, ValueType.FP64, et, 0); + compressionInstruction = new Compression(getLops(), DataType.MATRIX, ValueType.FP64, et, 0, k); setOutputDimensions( compressionInstruction ); @@ -427,6 +435,11 @@ public class FunctionOp extends Hop public void refreshSizeInformation() { //do nothing } + + @Override + public boolean isMultiThreadedOpType() { + return isBuiltinFunction(); + } @Override @SuppressWarnings("unchecked") diff --git a/src/main/java/org/apache/sysds/hops/Hop.java b/src/main/java/org/apache/sysds/hops/Hop.java index 127fe7e145..93501efa0d 100644 --- a/src/main/java/org/apache/sysds/hops/Hop.java +++ b/src/main/java/org/apache/sysds/hops/Hop.java @@ -502,15 +502,17 @@ public abstract class Hop implements ParseInfo { ExecType et = getExecutionModeForCompression(); Lop compressionInstruction = null; - + + //TODO generalize threads + final int k = OptimizerUtils.getConstrainedNumThreads(-1); if(requiresCompression()) { if(_compressedWorkloadTree != null) { SingletonLookupHashMap m = SingletonLookupHashMap.getMap(); int singletonID = m.put(_compressedWorkloadTree); - compressionInstruction = new Compression(getLops(), getDataType(), getValueType(), et, singletonID); + compressionInstruction = new Compression(getLops(), getDataType(), getValueType(), et, singletonID, k); } else - compressionInstruction = new Compression(getLops(), getDataType(), getValueType(), et, 0); + compressionInstruction = new Compression(getLops(), getDataType(), getValueType(), et, 0, k); } else if(_requiresDeCompression && et != ExecType.SPARK) // Disabled spark decompression instruction. compressionInstruction = new DeCompression(getLops(), getDataType(), getValueType(), et); diff --git a/src/main/java/org/apache/sysds/hops/UnaryOp.java b/src/main/java/org/apache/sysds/hops/UnaryOp.java index f046ffe85c..9c0e280644 100644 --- a/src/main/java/org/apache/sysds/hops/UnaryOp.java +++ b/src/main/java/org/apache/sysds/hops/UnaryOp.java @@ -137,7 +137,7 @@ public class UnaryOp extends MultiThreadedHop Lop ret = null; switch(_op){ case COMPRESS: - ret = new Compression(input.constructLops(), getDataType(), getValueType(), optFindExecType(), 0); + ret = new Compression(input.constructLops(), getDataType(), getValueType(), optFindExecType(), 0, k); break; case DECOMPRESS: ret = new DeCompression(input.constructLops(), getDataType(), getValueType(), optFindExecType()); diff --git a/src/main/java/org/apache/sysds/lops/Compression.java b/src/main/java/org/apache/sysds/lops/Compression.java index e636fc8b84..a2477d5fcc 100644 --- a/src/main/java/org/apache/sysds/lops/Compression.java +++ b/src/main/java/org/apache/sysds/lops/Compression.java @@ -28,6 +28,7 @@ public class Compression extends Lop { public static final String OPCODE = "compress"; private final int _singletonLookupKey; + private final int _numThreads; public enum CompressConfig { TRUE, FALSE, COST, AUTO, WORKLOAD; @@ -41,12 +42,13 @@ public class Compression extends Lop { } } - public Compression(Lop input, DataType dt, ValueType vt, ExecType et, int singletonLookupKey) { + public Compression(Lop input, DataType dt, ValueType vt, ExecType et, int singletonLookupKey, int numThreads) { super(Lop.Type.Checkpoint, dt, vt); addInput(input); input.addOutput(this); lps.setProperties(inputs, et); _singletonLookupKey = singletonLookupKey; + _numThreads = numThreads; } @Override @@ -80,6 +82,12 @@ public class Compression extends Lop { sb.append(OPERAND_DELIMITOR); sb.append(_singletonLookupKey); } + + if(getExecType().equals(ExecType.CP) || getExecType().equals(ExecType.FED)){ + sb.append(OPERAND_DELIMITOR); + sb.append(_numThreads); + } + return sb.toString(); } diff --git a/src/main/java/org/apache/sysds/lops/FunctionCallCP.java b/src/main/java/org/apache/sysds/lops/FunctionCallCP.java index b0694e2c36..1ed674281b 100644 --- a/src/main/java/org/apache/sysds/lops/FunctionCallCP.java +++ b/src/main/java/org/apache/sysds/lops/FunctionCallCP.java @@ -38,11 +38,18 @@ public class FunctionCallCP extends Lop private String[] _inputNames; private String[] _outputNames; private ArrayList<Lop> _outputLops = null; - private boolean _opt; + private final boolean _opt; + private final int _numThreads; public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String fname, String[] inputNames, String[] outputNames, ArrayList<Hop> outputHops, boolean opt, ExecType et) { - this(inputs, fnamespace, fname, inputNames, outputNames, et); + this(inputs, fnamespace, fname, inputNames, outputNames, outputHops, opt, et, 1); + + } + + public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String fname, String[] inputNames, + String[] outputNames, ArrayList<Hop> outputHops, boolean opt, ExecType et, int threads) { + this(inputs, fnamespace, fname, inputNames, outputNames, opt, et, threads); if(outputHops != null) { _outputLops = new ArrayList<>(); setLevel(); @@ -56,27 +63,33 @@ public class FunctionCallCP extends Lop } } } - _opt = opt; } - public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String fname, String[] inputNames, String[] outputNames, ExecType et) - { + public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String fname, String[] inputNames, + String[] outputNames, boolean opt, ExecType et) { + this(inputs, fnamespace, fname, inputNames, outputNames, opt, et, 1); + } + + public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String fname, String[] inputNames, + String[] outputNames, boolean opt, ExecType et, int threads) { super(Lop.Type.FunctionCallCP, DataType.UNKNOWN, ValueType.UNKNOWN); - //note: data scalar in order to prevent generation of redundant createvar, rmvar - + // note: data scalar in order to prevent generation of redundant createvar, rmvar + _fnamespace = fnamespace; _fname = fname; _inputNames = inputNames; _outputNames = outputNames; - - //wire inputs - for( Lop in : inputs ) { - addInput( in ); - in.addOutput( this ); + + // wire inputs + for(Lop in : inputs) { + addInput(in); + in.addOutput(this); } - - //lop properties: always in CP + + // lop properties: always in CP lps.setProperties(inputs, et); + _opt = opt; + _numThreads = threads; } public ArrayList<Lop> getFunctionOutputs() { @@ -116,6 +129,13 @@ public class FunctionCallCP extends Lop sb.append(Lop.OPERAND_DELIMITOR); sb.append(_outputNames[i]); } + + if(getExecType().equals(ExecType.CP)){ + if(!(_fname.toLowerCase().equals("remove"))){ + sb.append(Lop.OPERAND_DELIMITOR); + sb.append(_numThreads); + } + } return sb.toString(); } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java index 030fe5f5cf..1699c26cea 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java @@ -19,6 +19,8 @@ package org.apache.sysds.runtime.instructions.cp; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.ExecMode; @@ -44,7 +46,7 @@ import org.apache.sysds.runtime.meta.DataCharacteristics; import org.apache.sysds.utils.Explain; public class AggregateUnaryCPInstruction extends UnaryCPInstruction { - // private static final Log LOG = LogFactory.getLog(AggregateUnaryCPInstruction.class.getName()); + protected static final Log LOG = LogFactory.getLog(AggregateUnaryCPInstruction.class.getName()); public enum AUType { NROW, NCOL, LENGTH, EXISTS, LINEAGE, @@ -118,7 +120,7 @@ public class AggregateUnaryCPInstruction extends UnaryCPInstruction { public void processInstruction( ExecutionContext ec ) { String outputName = output.getName(); String opcode = getOpcode(); - + switch( _type ) { case NROW: case NCOL: diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java index c9dd5c8961..2e87d9eb65 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java @@ -43,43 +43,51 @@ public class CompressionCPInstruction extends ComputationCPInstruction { private static final Log LOG = LogFactory.getLog(CompressionCPInstruction.class.getName()); private final int _singletonLookupID; + private final int _numThreads; - /** This is only for binned compression with 2 outputs*/ + /** This is only for binned compression with 2 outputs */ protected final List<CPOperand> _outputs; private CompressionCPInstruction(Operator op, CPOperand in, CPOperand out, String opcode, String istr, - int singletonLookupID) { + int singletonLookupID, int numThreads) { super(CPType.Compression, op, in, null, null, out, opcode, istr); _outputs = null; this._singletonLookupID = singletonLookupID; + this._numThreads = numThreads; } - private CompressionCPInstruction(Operator op, CPOperand in1, CPOperand in2, List<CPOperand> out, String opcode, String istr, - int singletonLookupID) { + private CompressionCPInstruction(Operator op, CPOperand in1, CPOperand in2, List<CPOperand> out, String opcode, + String istr, int singletonLookupID, int numThreads) { super(CPType.Compression, op, in1, in2, null, out.get(0), opcode, istr); _outputs = out; this._singletonLookupID = singletonLookupID; + this._numThreads = numThreads; } public static CompressionCPInstruction parseInstruction(String str) { - InstructionUtils.checkNumFields(str, 2, 3, 4); + InstructionUtils.checkNumFields(str, 3, 4, 5); String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); String opcode = parts[0]; CPOperand in1 = new CPOperand(parts[1]); CPOperand out = new CPOperand(parts[2]); - if(parts.length == 5) { - /** Compression with bins that returns two outputs*/ + if(parts.length == 6) { + /** Compression with bins that returns two outputs */ List<CPOperand> outputs = new ArrayList<>(); outputs.add(new CPOperand(parts[3])); outputs.add(new CPOperand(parts[4])); - return new CompressionCPInstruction(null, in1, out, outputs, opcode, str, 0); + int numThreads = Integer.parseInt(parts[5]); + return new CompressionCPInstruction(null, in1, out, outputs, opcode, str, 0, numThreads); } - else if(parts.length == 4) { + else if(parts.length == 5) { int treeNodeID = Integer.parseInt(parts[3]); - return new CompressionCPInstruction(null, in1, out, opcode, str, treeNodeID); + int numThreads = Integer.parseInt(parts[4]); + return new CompressionCPInstruction(null, in1, out, opcode, str, treeNodeID, numThreads); + } + else { + + int numThreads = Integer.parseInt(parts[3]); + return new CompressionCPInstruction(null, in1, out, opcode, str, 0, numThreads); } - else - return new CompressionCPInstruction(null, in1, out, opcode, str, 0); } @Override @@ -101,12 +109,13 @@ public class CompressionCPInstruction extends ComputationCPInstruction { final MatrixBlock X = ec.getMatrixInput(input1.getName()); out = CLALibBinCompress.binCompress(X, d, k); ec.releaseMatrixInput(input1.getName()); - } else { + } + else { final FrameBlock X = ec.getFrameInput(input1.getName()); out = CLALibBinCompress.binCompress(X, d, k); ec.releaseFrameInput(input1.getName()); } - + // Set output and release input ec.releaseMatrixInput(input2.getName()); ec.setMatrixOutput(_outputs.get(0).getName(), out.getKey()); @@ -121,13 +130,11 @@ public class CompressionCPInstruction extends ComputationCPInstruction { final WTreeRoot root = (_singletonLookupID != 0) ? (WTreeRoot) m.get(_singletonLookupID) : null; m.removeKey(_singletonLookupID); - final int k = OptimizerUtils.getConstrainedNumThreads(-1); - if(ec.isFrameObject(input1.getName())) - processFrameBlockCompression(ec, ec.getFrameInput(input1.getName()), k, root); + processFrameBlockCompression(ec, ec.getFrameInput(input1.getName()), _numThreads, root); else if(ec.isMatrixObject(input1.getName())) - processMatrixBlockCompression(ec, ec.getMatrixInput(input1.getName()), k, root); - else{ + processMatrixBlockCompression(ec, ec.getMatrixInput(input1.getName()), _numThreads, root); + else { throw new NotImplementedException("Not supported other types of input for compression than frame and matrix"); } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/DnnCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/DnnCPInstruction.java index 9dc55078c9..2f8abbe89a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/DnnCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/DnnCPInstruction.java @@ -247,7 +247,7 @@ public class DnnCPInstruction extends UnaryCPInstruction { return new DnnCPInstruction(in, in2, out, opcode, str, k, Double.parseDouble(parts[5])); } else if (opcode.equalsIgnoreCase("batch_norm2d")) { - InstructionUtils.checkNumFields(parts, 13); + InstructionUtils.checkNumFields(parts, 14); CPOperand in1 = new CPOperand(parts[1]); // image CPOperand in2 = new CPOperand(parts[2]); // scale CPOperand in3 = new CPOperand(parts[3]); // bias @@ -261,10 +261,12 @@ public class DnnCPInstruction extends UnaryCPInstruction { CPOperand out3 = new CPOperand(parts[11]); // retRunningVar CPOperand out4 = new CPOperand(parts[12]); // resultSaveMean CPOperand out5 = new CPOperand(parts[13]); // resultSaveInvVariance + // int threads = Integer.parseInt(parts[14]); + return new DnnCPInstruction(in1, in2, in3, in4, in5, in6, in7, in8, out, out2, out3, out4, out5, opcode, str, 0); } else if (opcode.equalsIgnoreCase("batch_norm2d_backward")) { - InstructionUtils.checkNumFields(parts, 9); + InstructionUtils.checkNumFields(parts, 10); CPOperand in1 = new CPOperand(parts[1]); // image CPOperand in2 = new CPOperand(parts[2]); // dout CPOperand in3 = new CPOperand(parts[3]); // scale @@ -274,10 +276,11 @@ public class DnnCPInstruction extends UnaryCPInstruction { CPOperand out = new CPOperand(parts[7]); // dX CPOperand out2 = new CPOperand(parts[8]); // dScale CPOperand out3 = new CPOperand(parts[9]); // dBias + // int threads = Integer.parseInt(parts[10]); return new DnnCPInstruction(in1, in2, in3, in4, in5, in6, null, null, out, out2, out3, null, null, opcode, str, 0); } else if (opcode.equalsIgnoreCase("lstm")) { - InstructionUtils.checkNumFields(parts, 11); + InstructionUtils.checkNumFields(parts, 12); CPOperand in1 = new CPOperand(parts[1]); CPOperand in2 = new CPOperand(parts[2]); CPOperand in3 = new CPOperand(parts[3]); @@ -289,9 +292,10 @@ public class DnnCPInstruction extends UnaryCPInstruction { CPOperand out3 = new CPOperand(parts[9]); CPOperand out4 = new CPOperand(parts[10]); CPOperand out5 = new CPOperand(parts[11]); + // int threads = Integer.parseInt(parts[12]); return new DnnCPInstruction(in1, in2, in3, in4, in5, in6, null, null, out1, out2, out3, out4, out5, opcode, str, 0); } if(opcode.equalsIgnoreCase("lstm_backward")){ - InstructionUtils.checkNumFields(parts, 16); + InstructionUtils.checkNumFields(parts, 17); CPOperand in1 = new CPOperand(parts[1]); CPOperand in2 = new CPOperand(parts[2]); CPOperand in3 = new CPOperand(parts[3]); @@ -309,6 +313,7 @@ public class DnnCPInstruction extends UnaryCPInstruction { CPOperand out3 = new CPOperand(parts[14]); CPOperand out4 = new CPOperand(parts[15]); CPOperand out5 = new CPOperand(parts[16]); + // int threads = Integer.parseInt(parts[17]); return new DnnCPInstruction(in1, in2, in3, in4, in5, in6, in7, in8, in9, in10, in11, out1, out2, out3, out4, out5, opcode, str, 0); } else { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnBuiltinCPInstruction.java index a65b56c8ac..e1e0420513 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnBuiltinCPInstruction.java @@ -22,6 +22,7 @@ package org.apache.sysds.runtime.instructions.cp; import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.ValueType; @@ -37,11 +38,13 @@ import org.apache.sysds.runtime.matrix.operators.Operator; public class MultiReturnBuiltinCPInstruction extends ComputationCPInstruction { protected ArrayList<CPOperand> _outputs; + protected int _numThreads; private MultiReturnBuiltinCPInstruction(Operator op, CPOperand input1, ArrayList<CPOperand> outputs, String opcode, - String istr) { + String istr, int threads) { super(CPType.MultiReturnBuiltin, op, input1, null, outputs.get(0), opcode, istr); _outputs = outputs; + _numThreads = threads; } public CPOperand getOutput(int i) { @@ -56,7 +59,7 @@ public class MultiReturnBuiltinCPInstruction extends ComputationCPInstruction { return _outputs.parallelStream().map(output -> output.getName()).toArray(String[]::new); } - public static MultiReturnBuiltinCPInstruction parseInstruction ( String str ) { + public static MultiReturnBuiltinCPInstruction parseInstruction(String str ) { String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); ArrayList<CPOperand> outputs = new ArrayList<>(); // first part is always the opcode @@ -67,8 +70,9 @@ public class MultiReturnBuiltinCPInstruction extends ComputationCPInstruction { CPOperand in1 = new CPOperand(parts[1]); outputs.add ( new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX) ); outputs.add ( new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX) ); + int threads = Integer.parseInt(parts[4]); - return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str); + return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str, threads); } else if ( opcode.equalsIgnoreCase("lu") ) { CPOperand in1 = new CPOperand(parts[1]); @@ -77,8 +81,9 @@ public class MultiReturnBuiltinCPInstruction extends ComputationCPInstruction { outputs.add ( new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX) ); outputs.add ( new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX) ); outputs.add ( new CPOperand(parts[4], ValueType.FP64, DataType.MATRIX) ); + int threads = Integer.parseInt(parts[5]); - return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str); + return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str, threads); } else if ( opcode.equalsIgnoreCase("eigen") ) { @@ -86,42 +91,45 @@ public class MultiReturnBuiltinCPInstruction extends ComputationCPInstruction { CPOperand in1 = new CPOperand(parts[1]); outputs.add ( new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX) ); outputs.add ( new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX) ); - - return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str); - - } - else if(parts.length == 4 && opcode.equalsIgnoreCase("fft")) { - // one input and two outputs - CPOperand in1 = new CPOperand(parts[1]); - outputs.add(new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX)); - outputs.add(new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX)); - - return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str); - + int threads = Integer.parseInt(parts[4]); + return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str, threads); } - else if(parts.length == 3 && opcode.equalsIgnoreCase("fft")) { - // one input and two outputs - outputs.add(new CPOperand(parts[1], ValueType.FP64, DataType.MATRIX)); - outputs.add(new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX)); - - return new MultiReturnBuiltinCPInstruction(null, null, outputs, opcode, str); - + else if( opcode.equalsIgnoreCase("fft")){ + if(parts.length == 5) { + // one input and two outputs + CPOperand in1 = new CPOperand(parts[1]); + outputs.add(new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX)); + outputs.add(new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX)); + int threads = Integer.parseInt(parts[4]); + return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str, threads); + } + else if(parts.length == 4) { + // one input and two outputs + outputs.add(new CPOperand(parts[1], ValueType.FP64, DataType.MATRIX)); + outputs.add(new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX)); + int threads = Integer.parseInt(parts[3]); + return new MultiReturnBuiltinCPInstruction(null, null, outputs, opcode, str, threads); + } + else + throw new NotImplementedException("Invalid number of arguments for FFT."); } - else if(parts.length == 4 && opcode.equalsIgnoreCase("fft_linearized")) { + else if(parts.length == 5 && opcode.equalsIgnoreCase("fft_linearized")) { // one input and two outputs CPOperand in1 = new CPOperand(parts[1]); outputs.add(new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX)); outputs.add(new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX)); + int threads = Integer.parseInt(parts[4]); - return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str); + return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str, threads); } else if(parts.length == 3 && opcode.equalsIgnoreCase("fft_linearized")) { // one input and two outputs outputs.add(new CPOperand(parts[1], ValueType.FP64, DataType.MATRIX)); outputs.add(new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX)); + int threads = Integer.parseInt(parts[3]); - return new MultiReturnBuiltinCPInstruction(null, null, outputs, opcode, str); + return new MultiReturnBuiltinCPInstruction(null, null, outputs, opcode, str, threads); } else if ( opcode.equalsIgnoreCase("stft") ) { @@ -129,8 +137,9 @@ public class MultiReturnBuiltinCPInstruction extends ComputationCPInstruction { CPOperand in1 = new CPOperand(parts[1]); outputs.add ( new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX) ); outputs.add ( new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX) ); + int threads = Integer.parseInt(parts[4]); - return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str); + return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str, threads); } else if ( opcode.equalsIgnoreCase("svd") ) { CPOperand in1 = new CPOperand(parts[1]); @@ -139,8 +148,9 @@ public class MultiReturnBuiltinCPInstruction extends ComputationCPInstruction { outputs.add ( new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX) ); outputs.add ( new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX) ); outputs.add ( new CPOperand(parts[4], ValueType.FP64, DataType.MATRIX) ); + int threads = Integer.parseInt(parts[5]); - return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str); + return new MultiReturnBuiltinCPInstruction(null, in1, outputs, opcode, str, threads); } else { @@ -159,7 +169,7 @@ public class MultiReturnBuiltinCPInstruction extends ComputationCPInstruction { throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + getOpcode()); MatrixBlock in = ec.getMatrixInput(input1.getName()); - MatrixBlock[] out = LibCommonsMath.multiReturnOperations(in, getOpcode()); + MatrixBlock[] out = LibCommonsMath.multiReturnOperations(in, getOpcode(), _numThreads); ec.releaseMatrixInput(input1.getName()); for(int i=0; i < _outputs.size(); i++) { ec.setMatrixOutput(_outputs.get(i).getName(), out[i]); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnComplexMatrixBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnComplexMatrixBuiltinCPInstruction.java index f75b231052..3238b9381a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnComplexMatrixBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnComplexMatrixBuiltinCPInstruction.java @@ -36,24 +36,28 @@ import org.apache.sysds.runtime.matrix.operators.Operator; public class MultiReturnComplexMatrixBuiltinCPInstruction extends ComputationCPInstruction { - protected ArrayList<CPOperand> _outputs; + protected final ArrayList<CPOperand> _outputs; + protected final int _numThreads; private MultiReturnComplexMatrixBuiltinCPInstruction(Operator op, CPOperand input1, CPOperand input2, - ArrayList<CPOperand> outputs, String opcode, String istr) { + ArrayList<CPOperand> outputs, String opcode, String istr, int threads) { super(CPType.MultiReturnBuiltin, op, input1, input2, outputs.get(0), opcode, istr); _outputs = outputs; + _numThreads = threads; } private MultiReturnComplexMatrixBuiltinCPInstruction(Operator op, CPOperand input1, ArrayList<CPOperand> outputs, - String opcode, String istr) { + String opcode, String istr, int threads) { super(CPType.MultiReturnBuiltin, op, input1, null, outputs.get(0), opcode, istr); _outputs = outputs; + _numThreads = threads; } private MultiReturnComplexMatrixBuiltinCPInstruction(Operator op, CPOperand input1, CPOperand input2, - CPOperand input3, CPOperand input4, ArrayList<CPOperand> outputs, String opcode, String istr) { + CPOperand input3, CPOperand input4, ArrayList<CPOperand> outputs, String opcode, String istr, int threads) { super(CPType.MultiReturnBuiltin, op, input1, input2, input3, input4, outputs.get(0), opcode, istr); _outputs = outputs; + _numThreads = threads; } public CPOperand getOutput(int i) { @@ -74,60 +78,66 @@ public class MultiReturnComplexMatrixBuiltinCPInstruction extends ComputationCPI // first part is always the opcode String opcode = parts[0]; - if(parts.length == 5 && opcode.equalsIgnoreCase("ifft")) { + if(parts.length == 6 && opcode.equalsIgnoreCase("ifft")) { // one input and two outputs CPOperand in1 = new CPOperand(parts[1]); CPOperand in2 = new CPOperand(parts[2]); outputs.add(new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX)); outputs.add(new CPOperand(parts[4], ValueType.FP64, DataType.MATRIX)); + int threads = Integer.parseInt(parts[5]); - return new MultiReturnComplexMatrixBuiltinCPInstruction(null, in1, in2, outputs, opcode, str); + return new MultiReturnComplexMatrixBuiltinCPInstruction(null, in1, in2, outputs, opcode, str, threads); } - else if(parts.length == 4 && opcode.equalsIgnoreCase("ifft")) { + else if(parts.length == 5 && opcode.equalsIgnoreCase("ifft")) { // one input and two outputs CPOperand in1 = new CPOperand(parts[1]); outputs.add(new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX)); outputs.add(new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX)); + int threads = Integer.parseInt(parts[4]); - return new MultiReturnComplexMatrixBuiltinCPInstruction(null, in1, outputs, opcode, str); + return new MultiReturnComplexMatrixBuiltinCPInstruction(null, in1, outputs, opcode, str, threads); } - else if(parts.length == 5 && opcode.equalsIgnoreCase("ifft_linearized")) { + else if(parts.length == 6 && opcode.equalsIgnoreCase("ifft_linearized")) { // one input and two outputs CPOperand in1 = new CPOperand(parts[1]); CPOperand in2 = new CPOperand(parts[2]); outputs.add(new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX)); outputs.add(new CPOperand(parts[4], ValueType.FP64, DataType.MATRIX)); + int threads = Integer.parseInt(parts[5]); - return new MultiReturnComplexMatrixBuiltinCPInstruction(null, in1, in2, outputs, opcode, str); + return new MultiReturnComplexMatrixBuiltinCPInstruction(null, in1, in2, outputs, opcode, str, threads); } - else if(parts.length == 4 && opcode.equalsIgnoreCase("ifft_linearized")) { + else if(parts.length == 5 && opcode.equalsIgnoreCase("ifft_linearized")) { // one input and two outputs CPOperand in1 = new CPOperand(parts[1]); outputs.add(new CPOperand(parts[2], ValueType.FP64, DataType.MATRIX)); outputs.add(new CPOperand(parts[3], ValueType.FP64, DataType.MATRIX)); + int threads = Integer.parseInt(parts[4]); - return new MultiReturnComplexMatrixBuiltinCPInstruction(null, in1, outputs, opcode, str); + return new MultiReturnComplexMatrixBuiltinCPInstruction(null, in1, outputs, opcode, str, threads); } - else if(parts.length == 6 && opcode.equalsIgnoreCase("stft")) { + else if(parts.length == 7 && opcode.equalsIgnoreCase("stft")) { CPOperand in1 = new CPOperand(parts[1]); CPOperand windowSize = new CPOperand(parts[2]); CPOperand overlap = new CPOperand(parts[3]); outputs.add(new CPOperand(parts[4], ValueType.FP64, DataType.MATRIX)); outputs.add(new CPOperand(parts[5], ValueType.FP64, DataType.MATRIX)); + int threads = Integer.parseInt(parts[6]); return new MultiReturnComplexMatrixBuiltinCPInstruction(null, in1, null, windowSize, overlap, outputs, opcode, - str); + str, threads); } - else if(parts.length == 7 && opcode.equalsIgnoreCase("stft")) { + else if(parts.length == 8 && opcode.equalsIgnoreCase("stft")) { CPOperand in1 = new CPOperand(parts[1]); CPOperand in2 = new CPOperand(parts[2]); CPOperand windowSize = new CPOperand(parts[3]); CPOperand overlap = new CPOperand(parts[4]); outputs.add(new CPOperand(parts[5], ValueType.FP64, DataType.MATRIX)); outputs.add(new CPOperand(parts[6], ValueType.FP64, DataType.MATRIX)); + int threads = Integer.parseInt(parts[7]); return new MultiReturnComplexMatrixBuiltinCPInstruction(null, in1, in2, windowSize, overlap, outputs, opcode, - str); + str, threads); } else { throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + opcode); @@ -156,7 +166,7 @@ public class MultiReturnComplexMatrixBuiltinCPInstruction extends ComputationCPI throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + getOpcode()); MatrixBlock in = ec.getMatrixInput(input1.getName()); - MatrixBlock[] out = LibCommonsMath.multiReturnOperations(in, getOpcode()); + MatrixBlock[] out = LibCommonsMath.multiReturnOperations(in, getOpcode(), _numThreads); ec.releaseMatrixInput(input1.getName()); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibCommonsMath.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibCommonsMath.java index b7fe116707..81f2215fd3 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibCommonsMath.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibCommonsMath.java @@ -19,6 +19,11 @@ package org.apache.sysds.runtime.matrix.data; +import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.fft; +import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.fft_linearized; +import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.ifft; +import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.ifft_linearized; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.math3.exception.MaxCountExceededException; @@ -33,28 +38,22 @@ import org.apache.commons.math3.linear.RealMatrix; import org.apache.commons.math3.linear.SingularValueDecomposition; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.data.DenseBlock; -import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.functionobjects.Builtin; import org.apache.sysds.runtime.functionobjects.Divide; -import org.apache.sysds.runtime.functionobjects.SwapIndex; import org.apache.sysds.runtime.functionobjects.MinusMultiply; -import org.apache.sysds.runtime.functionobjects.Builtin; +import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.functionobjects.SwapIndex; import org.apache.sysds.runtime.instructions.InstructionUtils; -import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ReorgOperator; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; import org.apache.sysds.runtime.matrix.operators.ScalarOperator; -import org.apache.sysds.runtime.matrix.operators.UnaryOperator; import org.apache.sysds.runtime.matrix.operators.TernaryOperator; -import org.apache.sysds.runtime.matrix.operators.ReorgOperator; -import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator; -import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; import org.apache.sysds.runtime.util.DataConverter; -import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.fft; -import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.ifft; -import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.fft_linearized; -import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.ifft_linearized; -import static org.apache.sysds.runtime.matrix.data.LibMatrixSTFT.stft; - /** * Library for matrix operations that need invocation of * Apache Commons Math library. @@ -106,8 +105,12 @@ public class LibCommonsMath return null; } - public static MatrixBlock[] multiReturnOperations(MatrixBlock in, String opcode) { - return multiReturnOperations(in, opcode, 1, 1); + // public static MatrixBlock[] multiReturnOperations(MatrixBlock in, String opcode) { + // return multiReturnOperations(in, opcode, 1, 1); + // } + + public static MatrixBlock[] multiReturnOperations(MatrixBlock in, String opcode, int threads) { + return multiReturnOperations(in, opcode, threads, 1); } public static MatrixBlock[] multiReturnOperations(MatrixBlock in1, MatrixBlock in2, String opcode) { @@ -418,55 +421,55 @@ public class LibCommonsMath } - /** - * Function to perform STFT on a given matrix. - * - * @param re matrix object - * @param im matrix object - * @param windowSize of stft - * @param overlap of stft - * @return array of matrix blocks - */ - private static MatrixBlock[] computeSTFT(MatrixBlock re, MatrixBlock im, int windowSize, int overlap, int threads) { - if (re == null) { - throw new DMLRuntimeException("Invalid empty block"); - } else if (im != null && !im.isEmptyBlock(false)) { - re.sparseToDense(); - im.sparseToDense(); - return stft(re, im, windowSize, overlap, threads); - } else { - if (re.isEmptyBlock(false)) { - // Return the original matrix as the result - int rows = re.getNumRows(); - int cols = re.getNumColumns(); - - int stepSize = windowSize - overlap; - if (stepSize == 0) { - throw new IllegalArgumentException("windowSize - overlap is zero"); - } - - int numberOfFramesPerRow = (cols - overlap + stepSize - 1) / stepSize; - int rowLength= numberOfFramesPerRow * windowSize; - int out_len = rowLength * rows; - - double[] out_zero = new double[out_len]; - - return new MatrixBlock[]{new MatrixBlock(rows, rowLength, out_zero), new MatrixBlock(rows, rowLength, out_zero)}; - } - re.sparseToDense(); - return stft(re, windowSize, overlap, threads); - } - } - - /** - * Function to perform STFT on a given matrix. - * - * @param re matrix object - * @return array of matrix blocks - */ - private static MatrixBlock[] computeSTFT(MatrixBlock re, int windowSize, int overlap, int threads) { - return computeSTFT(re, null, windowSize, overlap, threads); - } + // /** + // * Function to perform STFT on a given matrix. + // * + // * @param re matrix object + // * @param im matrix object + // * @param windowSize of stft + // * @param overlap of stft + // * @return array of matrix blocks + // */ + // private static MatrixBlock[] computeSTFT(MatrixBlock re, MatrixBlock im, int windowSize, int overlap, int threads) { + // if (re == null) { + // throw new DMLRuntimeException("Invalid empty block"); + // } else if (im != null && !im.isEmptyBlock(false)) { + // re.sparseToDense(); + // im.sparseToDense(); + // return stft(re, im, windowSize, overlap, threads); + // } else { + // if (re.isEmptyBlock(false)) { + // // Return the original matrix as the result + // int rows = re.getNumRows(); + // int cols = re.getNumColumns(); + + // int stepSize = windowSize - overlap; + // if (stepSize == 0) { + // throw new IllegalArgumentException("windowSize - overlap is zero"); + // } + + // int numberOfFramesPerRow = (cols - overlap + stepSize - 1) / stepSize; + // int rowLength= numberOfFramesPerRow * windowSize; + // int out_len = rowLength * rows; + + // double[] out_zero = new double[out_len]; + + // return new MatrixBlock[]{new MatrixBlock(rows, rowLength, out_zero), new MatrixBlock(rows, rowLength, out_zero)}; + // } + // re.sparseToDense(); + // return stft(re, windowSize, overlap, threads); + // } + // } + + // /** + // * Function to perform STFT on a given matrix. + // * + // * @param re matrix object + // * @return array of matrix blocks + // */ + // private static MatrixBlock[] computeSTFT(MatrixBlock re, int windowSize, int overlap, int threads) { + // return computeSTFT(re, null, windowSize, overlap, threads); + // } /** * Performs Singular Value Decomposition. Calls Apache Commons Math SVD. diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixFourier.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixFourier.java index f03cb1663f..5012c3f565 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixFourier.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixFourier.java @@ -19,20 +19,24 @@ package org.apache.sysds.runtime.matrix.data; -import org.apache.commons.math3.util.FastMath; - -import org.apache.sysds.runtime.util.CommonThreadPool; - import java.lang.ref.SoftReference; -import java.util.List; -import java.util.HashMap; import java.util.ArrayList; -import java.util.concurrent.Future; -import java.util.concurrent.ExecutorService; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.util.FastMath; +import org.apache.sysds.runtime.compress.utils.Util; +import org.apache.sysds.runtime.util.CommonThreadPool; public class LibMatrixFourier { + protected static final Log LOG = LogFactory.getLog(LibMatrixFourier.class.getName()); + static SoftReference<HashMap<Double, Double>> sinCacheRef = new SoftReference<>(new HashMap<>()); static SoftReference<HashMap<Double, Double>> cosCacheRef = new SoftReference<>(new HashMap<>()); @@ -46,15 +50,22 @@ public class LibMatrixFourier { * @return array of two matrix blocks */ public static MatrixBlock[] fft(MatrixBlock re, MatrixBlock im, int threads) { - int rows = re.getNumRows(); int cols = re.getNumColumns(); if(!isPowerOfTwo(rows) || !isPowerOfTwo(cols)) throw new RuntimeException("false dimensions"); - fft(re.getDenseBlockValues(), im.getDenseBlockValues(), rows, cols, threads, true); + MatrixBlock re_out = new MatrixBlock(); + re_out.copy(re, false); + MatrixBlock im_out = new MatrixBlock(); + im_out.copy(im, false); + + fft(re_out.getDenseBlockValues(), im_out.getDenseBlockValues(), rows, cols, threads, true); - return new MatrixBlock[] {re, im}; + re_out.recomputeNonZeros(threads); + im_out.recomputeNonZeros(threads); + + return new MatrixBlock[] {re_out, im_out}; } /** @@ -73,9 +84,17 @@ public class LibMatrixFourier { if(!isPowerOfTwo(rows) || !isPowerOfTwo(cols)) throw new RuntimeException("false dimensions"); - ifft(re.getDenseBlockValues(), im.getDenseBlockValues(), rows, cols, threads, true); + MatrixBlock re_out = new MatrixBlock(); + re_out.copy(re, false); + MatrixBlock im_out = new MatrixBlock(); + im_out.copy(im, false); + + ifft(re_out.getDenseBlockValues(), im_out.getDenseBlockValues(), rows, cols, threads, true); + + re_out.recomputeNonZeros(threads); + im_out.recomputeNonZeros(threads); - return new MatrixBlock[] {re, im}; + return new MatrixBlock[] {re_out, im_out}; } /** @@ -89,15 +108,21 @@ public class LibMatrixFourier { * @return array of two matrix blocks */ public static MatrixBlock[] fft_linearized(MatrixBlock re, MatrixBlock im, int threads) { - int rows = re.getNumRows(); int cols = re.getNumColumns(); if(!isPowerOfTwo(cols)) throw new RuntimeException("false dimensions"); - fft(re.getDenseBlockValues(), im.getDenseBlockValues(), rows, cols, threads, false); + MatrixBlock re_out = new MatrixBlock(); + re_out.copy(re, false); + MatrixBlock im_out = new MatrixBlock(); + im_out.copy(im, false); + + fft(re_out.getDenseBlockValues(), im_out.getDenseBlockValues(), rows, cols, threads, false); - return new MatrixBlock[] {re, im}; + re_out.recomputeNonZeros(threads); + im_out.recomputeNonZeros(threads); + return new MatrixBlock[] {re_out, im_out}; } /** @@ -117,9 +142,17 @@ public class LibMatrixFourier { if(!isPowerOfTwo(cols)) throw new RuntimeException("false dimensions"); - ifft(re.getDenseBlockValues(), im.getDenseBlockValues(), rows, cols, threads, false); + MatrixBlock re_out = new MatrixBlock(); + re_out.copy(re, false); + MatrixBlock im_out = new MatrixBlock(); + im_out.copy(im, false); + + ifft(re_out.getDenseBlockValues(), im_out.getDenseBlockValues(), rows, cols, threads, false); + + re_out.recomputeNonZeros(threads); + im_out.recomputeNonZeros(threads); - return new MatrixBlock[] {re, im}; + return new MatrixBlock[] {re_out, im_out}; } /** @@ -454,26 +487,29 @@ public class LibMatrixFourier { } private static double sin(double angle, HashMap<Double, Double> cache) { - if(!cache.containsKey(angle)) { - cache.put(angle, FastMath.sin(angle)); - limitCache(cache); + final double v = cache.getOrDefault(angle, -100.0); + if(Util.eq(v, -100.0)) { // value not in cache. + final double res = FastMath.sin(angle); + if(cache.size() < 1000) + cache.put(angle, res); + return res; } - return cache.get(angle); + else + return v; + } private static double cos(double angle, HashMap<Double, Double> cache) { - if(!cache.containsKey(angle)) { - cache.put(angle, FastMath.cos(angle)); - limitCache(cache); + double v = cache.getOrDefault(angle, -100.0); + if(Util.eq(v, -100.0)) { // value not in cache. + double res = FastMath.cos(angle); + if(cache.size() < 1000) + cache.put(angle, res); + return res; } - return cache.get(angle); - } + else + return v; - private static void limitCache(HashMap<Double, Double> cache) { - if(cache.size() > 1000) { - // remove oldest key - cache.remove(cache.keySet().iterator().next()); - } } } diff --git a/src/main/python/systemds/operator/algorithm/__init__.py b/src/main/python/systemds/operator/algorithm/__init__.py index 690bfe07e8..2cee3105f6 100644 --- a/src/main/python/systemds/operator/algorithm/__init__.py +++ b/src/main/python/systemds/operator/algorithm/__init__.py @@ -151,6 +151,7 @@ from .builtin.outlierByIQR import outlierByIQR from .builtin.outlierByIQRApply import outlierByIQRApply from .builtin.outlierBySd import outlierBySd from .builtin.outlierBySdApply import outlierBySdApply +from .builtin.pageRank import pageRank from .builtin.pca import pca from .builtin.pcaInverse import pcaInverse from .builtin.pcaTransform import pcaTransform @@ -327,6 +328,7 @@ __all__ = ['WoE', 'outlierByIQRApply', 'outlierBySd', 'outlierBySdApply', + 'pageRank', 'pca', 'pcaInverse', 'pcaTransform', diff --git a/src/main/python/systemds/operator/algorithm/builtin/pageRank.py b/src/main/python/systemds/operator/algorithm/builtin/pageRank.py new file mode 100644 index 0000000000..5e03e9dd93 --- /dev/null +++ b/src/main/python/systemds/operator/algorithm/builtin/pageRank.py @@ -0,0 +1,55 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +# Autogenerated By : src/main/python/generator/generator.py +# Autogenerated From : scripts/builtin/pageRank.dml + +from typing import Dict, Iterable + +from systemds.operator import OperationNode, Matrix, Frame, List, MultiReturn, Scalar +from systemds.script_building.dag import OutputType +from systemds.utils.consts import VALID_INPUT_TYPES + + +def pageRank(G: Matrix, + p: Matrix, + e: Matrix, + u: Matrix, + **kwargs: Dict[str, VALID_INPUT_TYPES]): + """ + DML builtin method for PageRank algorithm (power iterations) + + + + :param G: Input Matrix + :param p: initial page rank vector (number of nodes), e.g., rand intialized + :param e: additional customization, default vector of ones + :param u: personalization vector (number of nodes) + :param alpha: teleport probability + :param max_iter: maximum number of iterations + :return: computed pagerank + """ + + params_dict = {'G': G, 'p': p, 'e': e, 'u': u} + params_dict.update(kwargs) + return Matrix(G.sds_context, + 'pageRank', + named_input_nodes=params_dict) diff --git a/src/main/python/tests/lineage/test_lineagetrace.py b/src/main/python/tests/lineage/test_lineagetrace.py index 06b5b39297..7e4e4bb3b1 100644 --- a/src/main/python/tests/lineage/test_lineagetrace.py +++ b/src/main/python/tests/lineage/test_lineagetrace.py @@ -55,36 +55,46 @@ class TestLineageTrace(unittest.TestCase): for x in m_res.get_lineage_trace().split("\n")] - sysds_trace = create_execute_and_trace_dml(trace_test_1) + sysds_trace = self.create_execute_and_trace_dml(trace_test_1) # It is not guarantied, that the two lists 100% align to be the same. # Therefore for now, we only compare if the command is the same, in same order. python_trace_commands = [x[:1] for x in python_trace] dml_script_commands = [x[:1] for x in sysds_trace] + if(len(python_trace_commands) == 0): + self.fail("Error in pythonscript execution") + if(len(dml_script_commands) == 0): + self.fail("Error in DML script execution") + self.assertEqual(python_trace_commands[0], dml_script_commands[0]) -def create_execute_and_trace_dml(script: str): - if not os.path.exists(temp_dir): - os.makedirs(temp_dir) + def create_execute_and_trace_dml(self, script: str): + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) - # Call SYSDS! - result_file_name = temp_dir + "/tmp_res.txt" - - command = "systemds " + script + \ - " > " + result_file_name + " 2> /dev/null" - os.system(command) - return parse_trace(result_file_name) + # Call SYSDS! + result_file_name = temp_dir + "/tmp_res.txt" + os.environ["SYSDS_QUIET"] = "0" + os.system("which systemds") + command = "systemds " + script + \ + " > " + result_file_name + " 2> /dev/null" + status = os.system(command) + if status < 0: + self.fail("systemds call failed.") + return parse_trace(result_file_name) def parse_trace(path: str): data = [] with open(path, "r") as log: for line in log: - data.append(line.strip().split("°")) + print(line) + if "°" in line: + data.append(line.strip().split("°")) # Remove the last 4 lines of the System output because they are after lintrace. - return data[:-4] + return data if __name__ == "__main__": diff --git a/src/test/java/org/apache/sysds/test/applications/ScalableDecompositionTest.java b/src/test/java/org/apache/sysds/test/applications/ScalableDecompositionTest.java index dfd406d47a..edd313ae56 100644 --- a/src/test/java/org/apache/sysds/test/applications/ScalableDecompositionTest.java +++ b/src/test/java/org/apache/sysds/test/applications/ScalableDecompositionTest.java @@ -211,7 +211,7 @@ public class ScalableDecompositionTest extends AutomatedTestBase MatrixBlock A = MatrixBlock.randOperations(rows, cols, 1.0, -5, 10, "uniform", 7); writeInputMatrixWithMTD("A", A, false); runTest(true, false, null, -1); - MatrixBlock[] C = LibCommonsMath.multiReturnOperations(A, "lu"); + MatrixBlock[] C = LibCommonsMath.multiReturnOperations(A, "lu", 1); String[] outputs = new String[]{"C","D","E"}; for(int i=0; i<outputs.length; i++) { HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir(outputs[i]); @@ -223,7 +223,7 @@ public class ScalableDecompositionTest extends AutomatedTestBase MatrixBlock A = MatrixBlock.randOperations(rows, cols, 1.0, -5, 10, "uniform", 7); writeInputMatrixWithMTD("A", A, false); runTest(true, false, null, -1); - MatrixBlock[] C = LibCommonsMath.multiReturnOperations(A, "qr"); + MatrixBlock[] C = LibCommonsMath.multiReturnOperations(A, "qr", 1); String[] outputs = new String[]{"C","D","E"}; for(int i=0; i<outputs.length; i++) { HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir(outputs[i]); diff --git a/src/test/java/org/apache/sysds/test/component/matrix/FourierTest.java b/src/test/java/org/apache/sysds/test/component/matrix/FourierTest.java index da844ad54a..7e45e60fd7 100644 --- a/src/test/java/org/apache/sysds/test/component/matrix/FourierTest.java +++ b/src/test/java/org/apache/sysds/test/component/matrix/FourierTest.java @@ -19,19 +19,22 @@ package org.apache.sysds.test.component.matrix; -import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.junit.Test; - import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.fft; -import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.ifft; import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.fft_linearized; +import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.ifft; import static org.apache.sysds.runtime.matrix.data.LibMatrixFourier.ifft_linearized; - import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.junit.Test; public class FourierTest { - int threads = Runtime.getRuntime().availableProcessors(); + protected static final Log LOG = LogFactory.getLog(FourierTest.class.getName()); + int threads = 2; @Test public void test_fft_one_dim() { @@ -42,10 +45,10 @@ public class FourierTest { double[] expected_re = {6, 15, -36, 15}; double[] expected_im = {0, -15, 0, 15}; - fft(re, im, threads); + MatrixBlock[] ret = fft(re, im, threads); - assertArrayEquals(expected_re, re.getDenseBlockValues(), 0.0001); - assertArrayEquals(expected_im, im.getDenseBlockValues(), 0.0001); + assertArrayEquals(expected_re, ret[0].getDenseBlockValues(), 0.0001); + assertArrayEquals(expected_im, ret[1].getDenseBlockValues(), 0.0001); } @@ -58,10 +61,10 @@ public class FourierTest { double[] expected_re = {35, 4.89949, 15, -14.89949, -45, -14.89949, 15, 4.89949}; double[] expected_im = {0, 18.58579, -16, -21.41421, 0, 21.41421, 16, -18.58579}; - fft(re, im, threads); + MatrixBlock[] ret = fft(re, im, threads); - assertArrayEquals(expected_re, re.getDenseBlockValues(), 0.0001); - assertArrayEquals(expected_im, im.getDenseBlockValues(), 0.0001); + assertArrayEquals(expected_re, ret[0].getDenseBlockValues(), 0.0001); + assertArrayEquals(expected_im, ret[1].getDenseBlockValues(), 0.0001); } @Test @@ -89,8 +92,8 @@ public class FourierTest { MatrixBlock re = new MatrixBlock(1, 4, in_re); MatrixBlock im = new MatrixBlock(1, 4, in_im); - MatrixBlock[] inter = fft(re, im, threads); - MatrixBlock[] res = ifft(inter[0], inter[1], threads); + MatrixBlock[] inter = fft(re, im, 1); + MatrixBlock[] res = ifft(inter[0], inter[1], 1); assertArrayEquals(in_re, res[0].getDenseBlockValues(), 0.0001); assertArrayEquals(in_im, res[1].getDenseBlockValues(), 0.0001); @@ -258,7 +261,7 @@ public class FourierTest { 0.975167392001707, 0.44595301043682656, 0.18401328301977316, 0.7158585484384759, 0.3240126702723025, 0.740836665073052, 0.8890279623888511, 0.8841266040978419, 0.3058930798936259, 0.8987579873722049}); - MatrixBlock im = new MatrixBlock(1, 16, + MatrixBlock im = new MatrixBlock(8,8, new double[] {0.8572457113722648, 0.668182795310341, 0.9739416721141464, 0.8189153345383146, 0.6425950286263254, 0.3569634253534639, 0.19715070300424575, 0.8915344479242211, 0.39207930659031054, 0.1625193685179268, 0.2523438052868171, 0.30940628850519547, 0.7461468672112159, 0.7123766750132684, @@ -301,43 +304,62 @@ public class FourierTest { 1.1079219041153268, 0.1094531803830765, 3.488182265163636, -1.5698242466218544, -0.1013387045518459, 0.9269290699615746, -0.699890233104248, 3.617209720991753, -0.5565163478425035, 3.502962737763559}; - MatrixBlock[] res = fft(re, im, threads); + try { - assertArrayEquals(expected_re, res[0].getDenseBlockValues(), 0.0001); - assertArrayEquals(expected_im, res[1].getDenseBlockValues(), 0.0001); + MatrixBlock[] res = fft(re, im, threads); + assertArrayEquals(expected_re, res[0].getDenseBlockValues(), 0.0001); + assertArrayEquals(expected_im, res[1].getDenseBlockValues(), 0.0001); + } + catch(Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test public void test_fft_linearized() { - MatrixBlock re = new MatrixBlock(2, 4, new double[] {0, 18, -15, 3, 0, 18, -15, 3}); - MatrixBlock im = new MatrixBlock(1, 4, new double[8]); - - double[] expected_re = {6, 15, -36, 15, 6, 15, -36, 15}; - double[] expected_im = {0, -15, 0, 15, 0, -15, 0, 15}; - - fft_linearized(re, im, threads); - - assertArrayEquals(expected_re, re.getDenseBlockValues(), 0.0001); - assertArrayEquals(expected_im, im.getDenseBlockValues(), 0.0001); + try{ + MatrixBlock re = new MatrixBlock(2, 4, new double[] {0, 18, -15, 3, 0, 18, -15, 3}); + MatrixBlock im = new MatrixBlock(2, 4, new double[8]); + + double[] expected_re = {6, 15, -36, 15, 6, 15, -36, 15}; + double[] expected_im = {0, -15, 0, 15, 0, -15, 0, 15}; + + MatrixBlock[] res = fft_linearized(re, im, threads); + + assertArrayEquals(expected_re, res[0].getDenseBlockValues(), 0.0001); + assertArrayEquals(expected_im, res[1].getDenseBlockValues(), 0.0001); + } + catch(Exception e ){ + e.printStackTrace(); + fail(e.getMessage()); + } } @Test public void test_ifft_linearized() { - double[] in_re = new double[] {1, -2, 3, -4, 1, -2, 3, -4}; - double[] in_im = new double[] {0, 0, 0, 0, 0, 0, 0, 0}; + try { - MatrixBlock re = new MatrixBlock(2, 4, in_re); - MatrixBlock im = new MatrixBlock(2, 4, in_im); + double[] in_re = new double[] {1, -2, 3, -4, 1, -2, 3, -4}; + double[] in_im = new double[] {0, 0, 0, 0, 0, 0, 0, 0}; - MatrixBlock[] inter = fft_linearized(re, im, threads); - MatrixBlock[] res = ifft_linearized(inter[0], inter[1], threads); + MatrixBlock re = new MatrixBlock(2, 4, in_re); + MatrixBlock im = new MatrixBlock(2, 4, in_im); - assertArrayEquals(in_re, res[0].getDenseBlockValues(), 0.0001); - assertArrayEquals(in_im, res[1].getDenseBlockValues(), 0.0001); + MatrixBlock[] inter = fft_linearized(re, im, threads); + MatrixBlock[] res = ifft_linearized(inter[0], inter[1], threads); + + assertArrayEquals(in_re, res[0].getDenseBlockValues(), 0.0001); + assertArrayEquals(in_im, res[1].getDenseBlockValues(), 0.0001); + } + catch(Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } diff --git a/src/test/scripts/functions/builtin/GridSearchLMCV.dml b/src/test/scripts/functions/builtin/GridSearchLMCV.dml index 20978182db..84cef20689 100644 --- a/src/test/scripts/functions/builtin/GridSearchLMCV.dml +++ b/src/test/scripts/functions/builtin/GridSearchLMCV.dml @@ -36,7 +36,8 @@ Xtest = X[(N+1):nrow(X),]; ytest = y[(N+1):nrow(X),]; params = list("icpt","reg", "tol", "maxi"); -paramRanges = list(seq(0,1,2),10^seq(0,-4), 10^seq(-6,-12), 10^seq(1,3)); +# paramRanges = list(seq(0,1,2),10^seq(0,-4), 10^seq(-6,-12), 10^seq(1,3)); +paramRanges = list(seq(0,1),10^seq(0,0), 10^seq(-6,-6), 10^seq(1,2)); [B1, opt] = gridSearch(X=Xtrain, y=ytrain, train="lm", predict="l2norm", numB=ncol(X)+1, params=params, paramValues=paramRanges, cv=TRUE, cvk=3); B2 = lm(X=Xtrain, y=ytrain, verbose=FALSE);
