This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 470bb0b904 [SYSTEMDS-3730] Multithreaded roll operation and improved
tests
470bb0b904 is described below
commit 470bb0b904fe15a7bc6b01b310703effd9432e82
Author: Biranavan Parameswaran <[email protected]>
AuthorDate: Tue Dec 30 09:58:46 2025 +0100
[SYSTEMDS-3730] Multithreaded roll operation and improved tests
This patch introduces multi-threading support for the roll operation to
improve performance. The RollTest.java has been updated to cover
both single and multithreaded execution modes.
Furthermore, this update adds comprehensive consistency checks to ensure
mathematical correctness. New tests were created to validate both dense
and sparse matrix inputs. Additionally, cross-verification tests were
added to confirm that sparse and dense rolling for single and
multithreaded executions produce identical results.
Closes #2376.
---
src/main/java/org/apache/sysds/hops/ReorgOp.java | 4 +-
src/main/java/org/apache/sysds/lops/Transform.java | 2 +-
.../instructions/cp/ReorgCPInstruction.java | 6 +-
.../sysds/runtime/matrix/data/LibMatrixReorg.java | 124 +++++++++++++-
.../sysds/performance/matrix/MatrixRollPerf.java | 127 ++++++++++++++
.../DenseMatrixRollOperationCorrectnessTest.java | 183 ++++++++++++++++++++
.../RollOperationThreadSafetyTest.java | 125 ++++++++++++++
.../component/matrix/libMatrixReorg/RollTest.java | 27 ++-
.../SparseMatrixRollOperationCorrectnessTest.java | 185 +++++++++++++++++++++
9 files changed, 774 insertions(+), 9 deletions(-)
diff --git a/src/main/java/org/apache/sysds/hops/ReorgOp.java
b/src/main/java/org/apache/sysds/hops/ReorgOp.java
index 5fc73e2bd3..f43d1cc2ba 100644
--- a/src/main/java/org/apache/sysds/hops/ReorgOp.java
+++ b/src/main/java/org/apache/sysds/hops/ReorgOp.java
@@ -173,7 +173,9 @@ public class ReorgOp extends MultiThreadedHop
for (int i = 0; i < 2; i++)
linputs[i] =
getInput().get(i).constructLops();
- Transform transform1 = new Transform(linputs,
_op, getDataType(), getValueType(), et, 1);
+ Transform transform1 = new Transform(
+ linputs, _op, getDataType(),
getValueType(), et,
+
OptimizerUtils.getConstrainedNumThreads(_maxNumThreads));
setOutputDimensions(transform1);
setLineNumbers(transform1);
diff --git a/src/main/java/org/apache/sysds/lops/Transform.java
b/src/main/java/org/apache/sysds/lops/Transform.java
index 0d2e79f83a..d9537dcca6 100644
--- a/src/main/java/org/apache/sysds/lops/Transform.java
+++ b/src/main/java/org/apache/sysds/lops/Transform.java
@@ -180,7 +180,7 @@ public class Transform extends Lop
sb.append( this.prepOutputOperand(output));
if( (getExecType()==ExecType.CP || getExecType()==ExecType.FED
|| getExecType()==ExecType.OOC)
- && (_operation == ReOrgOp.TRANS || _operation ==
ReOrgOp.REV || _operation == ReOrgOp.SORT) ) {
+ && (_operation == ReOrgOp.TRANS || _operation ==
ReOrgOp.REV || _operation == ReOrgOp.SORT || _operation == ReOrgOp.ROLL) ) {
sb.append( OPERAND_DELIMITOR );
sb.append( _numThreads );
if ( getExecType()==ExecType.FED ) {
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java
index a1788c0e25..be77f4eb4e 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java
@@ -118,11 +118,13 @@ public class ReorgCPInstruction extends
UnaryCPInstruction {
return new ReorgCPInstruction(new
ReorgOperator(RevIndex.getRevIndexFnObject(), k), in, out, opcode, str);
}
else if (opcode.equalsIgnoreCase(Opcodes.ROLL.toString())) {
- InstructionUtils.checkNumFields(str, 3);
+ InstructionUtils.checkNumFields(str, 3, 4);
in.split(parts[1]);
out.split(parts[3]);
CPOperand shift = new CPOperand(parts[2]);
- return new ReorgCPInstruction(new ReorgOperator(new
RollIndex(0)), in, out, shift, opcode, str);
+ int k = (parts.length > 4) ? Integer.parseInt(parts[4])
: 1;
+
+ return new ReorgCPInstruction(new ReorgOperator(new
RollIndex(0), k), in, out, shift, opcode, str);
}
else if ( opcode.equalsIgnoreCase(Opcodes.DIAG.toString()) ) {
parseUnaryInstruction(str, in, out); //max 2 operands
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
index 90ea445be8..c90f410181 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
@@ -134,6 +134,8 @@ public class LibMatrixReorg {
return rev(in, out);
case ROLL:
RollIndex rix = (RollIndex) op.fn;
+ if(op.getNumThreads() > 1)
+ return roll(in, out, rix.getShift(),
op.getNumThreads());
return roll(in, out, rix.getShift());
case DIAG:
return diag(in, out);
@@ -514,6 +516,124 @@ public class LibMatrixReorg {
return out;
}
+ public static MatrixBlock roll(MatrixBlock input, MatrixBlock output,
int shift, int numThreads) {
+
+ final int numRows = input.rlen;
+ final int numCols = input.clen;
+ final boolean isSparse = input.sparse;
+
+ // sparse-safe operation
+ if(input.isEmptyBlock(false))
+ return output;
+
+ // special case: row vector
+ if(numRows == 1) {
+ output.copy(input);
+ return output;
+ }
+
+ if(numThreads <= 1 || input.getLength() <
PAR_NUMCELL_THRESHOLD) {
+ return roll(input, output, shift); // fallback to
single-threaded
+ }
+
+ final int normalizedShift = getNormalizedShiftForRoll(shift,
numRows);
+
+ output.reset(numRows, numCols, isSparse);
+ output.nonZeros = input.nonZeros;
+
+ if(isSparse) {
+ output.allocateSparseRowsBlock(false);
+ }
+ else {
+ output.allocateDenseBlock(false);
+ }
+
+ //TODO experiment with more tasks per thread for better load
balance
+ //TODO call common kernel from both single- and multi-threaded
execution
+
+ ExecutorService threadPool = CommonThreadPool.get(numThreads);
+ try {
+ final int rowsPerThread = (int) Math.ceil((double)
numRows / numThreads);
+ List<Future<?>> tasks = new ArrayList<>();
+
+ for(int threadIndex = 0; threadIndex < numThreads;
threadIndex++) {
+
+ final int startRow = threadIndex *
rowsPerThread;
+ final int endRow = Math.min((threadIndex + 1) *
rowsPerThread, numRows);
+
+ tasks.add(threadPool.submit(() -> {
+ if(isSparse)
+ rollSparseBlock(input, output,
normalizedShift, startRow, endRow);
+ else
+ rollDenseBlock(input, output,
normalizedShift, startRow, endRow);
+ }));
+ }
+
+ for(Future<?> task : tasks)
+ task.get();
+
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ finally {
+ threadPool.shutdown();
+ }
+
+ return output;
+ }
+
+ private static int getNormalizedShiftForRoll(int shift, int numRows) {
+ shift = shift % numRows;
+ if(shift < 0)
+ shift += numRows;
+
+ return shift;
+ }
+
+ private static void rollDenseBlock(MatrixBlock input, MatrixBlock
output,
+ int shift, int startRow, int endRow)
+ {
+ DenseBlock inputBlock = input.getDenseBlock();
+ DenseBlock outputBlock = output.getDenseBlock();
+ final int numRows = input.rlen;
+ final int numCols = input.clen;
+
+ for(int targetRow = startRow; targetRow < endRow; targetRow++) {
+ int sourceRow = targetRow - shift;
+ if(sourceRow < 0)
+ sourceRow += numRows;
+
+ System.arraycopy(inputBlock.values(sourceRow),
inputBlock.pos(sourceRow), outputBlock.values(targetRow),
+ outputBlock.pos(targetRow), numCols);
+ }
+ }
+
+ private static void rollSparseBlock(MatrixBlock input, MatrixBlock
output,
+ int shift, int startRow, int endRow)
+ {
+ SparseBlock inputBlock = input.getSparseBlock();
+ SparseBlock outputBlock = output.getSparseBlock();
+ final int numRows = input.rlen;
+
+ for(int targetRow = startRow; targetRow < endRow; targetRow++) {
+ int sourceRow = targetRow - shift;
+ if(sourceRow < 0)
+ sourceRow += numRows;
+
+ if(!inputBlock.isEmpty(sourceRow)) {
+ int rowStart = inputBlock.pos(sourceRow);
+ int rowEnd = rowStart +
inputBlock.size(sourceRow);
+ int[] colIndexes =
inputBlock.indexes(sourceRow);
+ double[] values = inputBlock.values(sourceRow);
+
+ for(int k = rowStart; k < rowEnd; k++) {
+ outputBlock.set(targetRow,
colIndexes[k], values[k]);
+ }
+ }
+ }
+ }
+
public static void roll(IndexedMatrixValue in, long rlen, int blen, int
shift, ArrayList<IndexedMatrixValue> out) {
MatrixIndexes inMtxIdx = in.getIndexes();
MatrixBlock inMtxBlk = (MatrixBlock) in.getValue();
@@ -2554,7 +2674,7 @@ public class LibMatrixReorg {
private static void rollDense(MatrixBlock in, MatrixBlock out, int
shift) {
final int m = in.rlen;
- shift %= (m != 0 ? m : 1); // roll matrix with axis=none
+ shift = getNormalizedShiftForRoll(shift, m); // roll matrix
with axis=none
copyDenseMtx(in, out, 0, shift, m - shift, false, true);
copyDenseMtx(in, out, m - shift, 0, shift, true, true);
@@ -2562,7 +2682,7 @@ public class LibMatrixReorg {
private static void rollSparse(MatrixBlock in, MatrixBlock out, int
shift) {
final int m = in.rlen;
- shift %= (m != 0 ? m : 1); // roll matrix with axis=0
+ shift = getNormalizedShiftForRoll(shift, m); // roll matrix
with axis=0
copySparseMtx(in, out, 0, shift, m - shift, false, true);
copySparseMtx(in, out, m-shift, 0, shift, false, true);
diff --git
a/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java
b/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java
new file mode 100644
index 0000000000..05f291b0ab
--- /dev/null
+++ b/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.performance.matrix;
+
+import org.apache.sysds.performance.compression.APerfTest;
+import org.apache.sysds.performance.generators.ConstMatrix;
+import org.apache.sysds.performance.generators.IGenerate;
+import org.apache.sysds.runtime.functionobjects.IndexFunction;
+import org.apache.sysds.runtime.functionobjects.RollIndex;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
+
+import java.util.Random;
+
+public class MatrixRollPerf extends APerfTest<Object, MatrixBlock> {
+
+ private final int rows;
+ private final int cols;
+ private final int shift;
+ private final int k;
+
+ private final ReorgOperator reorg;
+ private MatrixBlock out;
+
+ public MatrixRollPerf(int N, int W, IGenerate<MatrixBlock> gen, int
rows, int cols, int shift, int k) {
+ super(N, W, gen);
+ this.rows = rows;
+ this.cols = cols;
+ this.shift = shift;
+ this.k = k;
+
+ IndexFunction op = new RollIndex(shift);
+ this.reorg = new ReorgOperator(op, k);
+ }
+
+ public void run() throws Exception {
+ MatrixBlock mb = gen.take();
+ logInfos(rows, cols, shift, mb.getSparsity(), k);
+
+
+ String info = String.format("rows: %5d cols: %5d sp: %.4f
shift: %4d k: %2d",
+ rows, cols, mb.getSparsity(), shift, k);
+
+
+ warmup(this::rollOnce, W);
+
+ execute(this::rollOnce, info);
+ }
+
+ private void logInfos(int rows, int cols, int shift, double sparsity,
int k) {
+ String matrixType = sparsity == 1 ? "Dense" : "Sparse";
+ if (k == 1) {
+
System.out.println("---------------------------------------------------------------------------------------------------------");
+ System.out.printf("%s Experiment for rows %d columns %d
and shift %d \n", matrixType, rows, cols, shift);
+
System.out.println("---------------------------------------------------------------------------------------------------------");
+ }
+ }
+
+ private void rollOnce() {
+ MatrixBlock in = gen.take();
+
+ if (out == null)
+ out = new MatrixBlock(rows, cols,
in.isInSparseFormat());
+
+ out.reset(rows, cols, in.isInSparseFormat());
+
+ in.reorgOperations(reorg, out, 0, 0, 0);
+
+ ret.add(null);
+ }
+
+ @Override
+ protected String makeResString() {
+ return "";
+ }
+
+ public static void main(String[] args) throws Exception {
+ int kMulti = InfrastructureAnalyzer.getLocalParallelism();
+ int reps = 2000;
+ int warmup = 200;
+
+ int minRows = 2017;
+ int minCols = 1001;
+ double spSparse = 0.01;
+ int minShift = -50;
+ int maxShift = 1022;
+ int iterations = 10;
+
+ Random rand = new Random(42);
+
+ for (int i = 0; i < iterations; i++) {
+ int rows = 10_000_000;
+ int cols = 10;
+ int shift = rand.nextInt((maxShift - minShift) + 1) +
minShift;
+
+ MatrixBlock denseIn =
TestUtils.generateTestMatrixBlock(rows, cols, -100, 100, 1.0, 42);
+ MatrixBlock sparseIn =
TestUtils.generateTestMatrixBlock(rows, cols, -100, 100, spSparse, 42);
+
+ // Run Dense Case (Single vs Multi-threaded)
+ new MatrixRollPerf(reps, warmup, new
ConstMatrix(denseIn, -1), rows, cols, shift, 1).run();
+ new MatrixRollPerf(reps, warmup, new
ConstMatrix(denseIn, -1), rows, cols, shift, kMulti).run();
+
+ // Run Sparse Case (Single vs Multi-threaded)
+ new MatrixRollPerf(reps, warmup, new
ConstMatrix(sparseIn, -1), rows, cols, shift, 1).run();
+ new MatrixRollPerf(reps, warmup, new
ConstMatrix(sparseIn, -1), rows, cols, shift, kMulti).run();
+ }
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/DenseMatrixRollOperationCorrectnessTest.java
b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/DenseMatrixRollOperationCorrectnessTest.java
new file mode 100644
index 0000000000..157e411cf6
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/DenseMatrixRollOperationCorrectnessTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.matrix.libMatrixReorg;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.runtime.functionobjects.IndexFunction;
+import org.apache.sysds.runtime.functionobjects.RollIndex;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DenseMatrixRollOperationCorrectnessTest {
+
+ private final double[][] input;
+ private final double[][] expected;
+ private final int shift;
+
+ public DenseMatrixRollOperationCorrectnessTest(double[][] input,
double[][] expected, int shift) {
+ this.input = input;
+ this.expected = expected;
+ this.shift = shift;
+ }
+
+ @Parameterized.Parameters(name = "Shift={2}, Size={0}x{1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {
+ new double[][] {{1, 2, 3, 4, 5}},
+ new double[][] {{1, 2, 3, 4, 5}},
+ 0
+ },
+ {
+ new double[][] {{1, 2, 3, 4, 5}},
+ new double[][] {{1, 2, 3, 4, 5}},
+ 1
+ },
+ {
+ new double[][] {{1, 2, 3, 4, 5}},
+ new double[][] {{1, 2, 3, 4, 5}},
+ -3
+ },
+ {
+ new double[][] {{1, 2, 3, 4, 5}},
+ new double[][] {{1, 2, 3, 4, 5}},
+ 999
+ },
+ {
+ new double[][] {{1}, {2}, {3}, {4}, {5}},
+ new double[][] {{4}, {5}, {1}, {2}, {3}},
+ 2
+ },
+ {
+ new double[][] {{1}, {2}, {3}, {4}, {5}},
+ new double[][] {{2}, {3}, {4}, {5}, {1}},
+ -1
+ },
+ {
+ new double[][] {{1}, {2}, {3}, {4}, {5}},
+ new double[][] {{1}, {2}, {3}, {4}, {5}},
+ 5
+ },
+ {
+ new double[][] {{1, 2, 3}, {4, 5, 6}},
+ new double[][] {{4, 5, 6}, {1, 2, 3}},
+ 1
+ },
+ {
+ new double[][] {{1, 2, 3}, {4, 5, 6}},
+ new double[][] {{4, 5, 6}, {1, 2, 3}},
+ 7
+ },
+ {
+ new double[][] {{1, 2, 3}, {4, 5, 6}},
+ new double[][] {{1, 2, 3}, {4, 5, 6}},
+ 2
+ },
+ {
+ new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8,
9}},
+ new double[][] {{7, 8, 9}, {1, 2, 3}, {4, 5,
6}},
+ 1
+ },
+ {
+ new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8,
9}},
+ new double[][] {{4, 5, 6}, {7, 8, 9}, {1, 2,
3}},
+ -1
+ },
+ {
+ new double[][] {{9, 8, 7}, {6, 5, 4}, {3, 2,
1}},
+ new double[][] {{3, 2, 1}, {9, 8, 7}, {6, 5,
4}},
+ 1
+ },
+ {
+ new double[][] {{1, 2, 3, 4}, {5, 6, 7, 8}, {9,
10, 11, 12}},
+ new double[][] {{9, 10, 11, 12}, {1, 2, 3, 4},
{5, 6, 7, 8}},
+ 1
+ },
+ {
+ new double[][] {{1, 2, 3, 4}, {5, 6, 7, 8}, {9,
10, 11, 12}},
+ new double[][] {{5, 6, 7, 8}, {9, 10, 11, 12},
{1, 2, 3, 4}},
+ -1
+ },
+ {
+ new double[][] {{1, 2, 3, 4, 5}, {6, 7, 8, 9,
10}, {11, 12, 13, 14, 15}, {16, 17, 18, 19, 20}, {21, 22, 23, 24, 25}},
+ new double[][] {{21, 22, 23, 24, 25}, {1, 2, 3,
4, 5}, {6, 7, 8, 9, 10}, {11, 12, 13, 14, 15}, {16, 17, 18, 19, 20}},
+ 1
+ },
+ {
+ new double[][] {{1, 2, 3, 4, 5}, {6, 7, 8, 9,
10}, {11, 12, 13, 14, 15}, {16, 17, 18, 19, 20}, {21, 22, 23, 24, 25}},
+ new double[][] {{11, 12, 13, 14, 15}, {16, 17,
18, 19, 20}, {21, 22, 23, 24, 25}, {1, 2, 3, 4, 5}, {6, 7, 8, 9, 10}},
+ -2
+ },
+ {
+ new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8,
9}, {10, 11, 12}, {13, 14, 15}, {16, 17, 18}, {19, 20, 21},
+ {22, 23, 24}, {25, 26, 27}, {28, 29,
30}},
+ new double[][] {{22, 23, 24}, {25, 26, 27},
{28, 29, 30}, {1, 2, 3}, {4, 5, 6}, {7, 8, 9}, {10, 11, 12},
+ {13, 14, 15}, {16, 17, 18}, {19, 20,
21}},
+ 3
+ },
+ {
+ new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}},
+ new double[][] {{5, 6}, {7, 8}, {1, 2}, {3, 4}},
+ 1002
+ },
+ {
+ new double[][] {{1}, {2}, {3}, {4}, {5}},
+ new double[][] {{3}, {4}, {5}, {1}, {2}},
+ -12
+ },
+ {
+ new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8,
9}},
+ new double[][] {{4, 5, 6}, {7, 8, 9}, {1, 2,
3}},
+ -10
+ },
+ {
+ new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}},
+ new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}},
+ -4
+ },
+ {
+ new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}},
+ new double[][] {{3, 4}, {5, 6}, {7, 8}, {1, 2}},
+ -5
+ }
+ });
+ }
+
+ @Test
+ public void testRollOperationProducesExpectedOutput() {
+ MatrixBlock inBlock = new MatrixBlock(input.length,
input[0].length, false);
+ inBlock.init(input, input.length, input[0].length);
+
+ IndexFunction op = new RollIndex(shift);
+ MatrixBlock outBlock = inBlock.reorgOperations(new
ReorgOperator(op), new MatrixBlock(), 0, 0, 5);
+
+ MatrixBlock expectedBlock = new MatrixBlock(expected.length,
expected[0].length, false);
+ expectedBlock.init(expected, expected.length,
expected[0].length);
+
+ TestUtils.compareMatrices(outBlock, expectedBlock, 1e-12,
"Dense Roll operation does not match expected output");
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java
b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java
new file mode 100644
index 0000000000..b6b5053ca1
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.matrix.libMatrixReorg;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Random;
+
+import org.apache.sysds.runtime.functionobjects.IndexFunction;
+import org.apache.sysds.runtime.functionobjects.RollIndex;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class RollOperationThreadSafetyTest {
+
+ private static final int MIN_ROWS = 2017;
+ private static final int MIN_COLS = 1001;
+ private static final int MIN_SHIFT = -50;
+ private static final int MAX_SHIFT = 1022;
+ private static final int NUM_TESTS = 100;
+ private static final double TEST_SPARSITY = 0.01;
+ private final int rows;
+ private final int cols;
+ private final int shift;
+ private final MatrixBlock inputDense;
+ private final MatrixBlock inputSparse;
+
+ public RollOperationThreadSafetyTest(int rows, int cols, int shift) {
+ this.rows = rows;
+ this.cols = cols;
+ this.shift = shift;
+
+ MatrixBlock tempInput = TestUtils.generateTestMatrixBlock(rows,
cols, -100, 100, TEST_SPARSITY, 42);
+
+ this.inputSparse = tempInput;
+
+ this.inputDense = new MatrixBlock(rows, cols, false);
+ this.inputDense.copy(tempInput, false);
+ this.inputDense.recomputeNonZeros();
+ }
+
+ /**
+ * Defines the parameters for the test cases (Random Rows, Random Cols,
Random Shift).
+ *
+ * @return Collection of test parameters.
+ */
+ @Parameters(name = "Case {index}: Size={0}x{1}, Shift={2}")
+ public static Collection<Object[]> data() {
+ ArrayList<Object[]> tests = new ArrayList<>();
+ Random rand = new Random(42);
+
+ for(int i = 0; i < NUM_TESTS; i++) {
+ // Generate random dimensions (adding random buffer to
the minimums)
+ int r = MIN_ROWS + rand.nextInt(500);
+ int c = MIN_COLS + rand.nextInt(500);
+
+ int s = rand.nextInt((MAX_SHIFT - MIN_SHIFT) + 1) +
MIN_SHIFT;
+
+ tests.add(new Object[] {r, c, s});
+ }
+ return tests;
+ }
+
+ @Test
+ public void
denseRollOperationSingleAndMultiThreadedShouldReturnSameResult() {
+ int numThreads = getNumThreads();
+
+ MatrixBlock outSingle = rollOperation(inputDense, 1);
+
+ MatrixBlock outMulti = rollOperation(inputDense, numThreads);
+
+ TestUtils.compareMatrices(outSingle, outMulti, 1e-12,
+ "Dense Mismatch (numThreads=1 vs numThreads>1) for
Size=" + rows + "x" + cols + " Shift=" + shift);
+ }
+
+ @Test
+ public void
sparseRollOperationSingleAndMultiThreadedShouldReturnSameResult() {
+ int numThreads = getNumThreads();
+
+ MatrixBlock outSingle = rollOperation(inputSparse, 1);
+
+ MatrixBlock outMulti = rollOperation(inputSparse, numThreads);
+
+ TestUtils.compareMatrices(outSingle, outMulti, 1e-12,
+ "Sparse Mismatch (numThreads=1 vs numThreads>1) for
Size=" + rows + "x" + cols + " Shift=" + shift);
+ }
+
+ private MatrixBlock rollOperation(MatrixBlock inBlock, int numThreads) {
+ IndexFunction op = new RollIndex(shift);
+ ReorgOperator reorgOperator = new ReorgOperator(op, numThreads);
+
+ MatrixBlock outBlock = new MatrixBlock(rows, cols,
inBlock.isInSparseFormat());
+
+ return inBlock.reorgOperations(reorgOperator, outBlock, 0, 0,
0);
+ }
+
+ private static int getNumThreads() {
+ // number of threads should be at least two to invoke
multithreaded operation
+ int cores = Runtime.getRuntime().availableProcessors();
+ return Math.max(2, cores);
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java
b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java
index d2ad83597b..dc37990c33 100644
---
a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java
+++
b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java
@@ -100,15 +100,36 @@ public class RollTest {
/**
* The actual test method that performs the roll operation on both
* sparse and dense matrices and compares the results.
+ * This test will execute the single threaded operation
*/
@Test
- public void test() {
+ public void testSingleThreadedOperation() {
+ int numThreads = 1;
+ compareDenseAndSparseRepresentation(numThreads);
+ }
+
+
+ /**
+ * The actual test method that performs the roll operation on both
+ * sparse and dense matrices and compares the results.
+ * This test will execute the multithreaded operation
+ */
+ @Test
+ public void testMultiThreadedOperation() {
+ // number of threads should be at least two to invoke
multithreaded operation
+ int cores = Runtime.getRuntime().availableProcessors();
+ int numThreads = Math.max(2, cores);
+
+ compareDenseAndSparseRepresentation(numThreads);
+ }
+
+ private void compareDenseAndSparseRepresentation(int numThreads) {
try {
IndexFunction op = new RollIndex(shift);
MatrixBlock outputDense = inputDense.reorgOperations(
- new ReorgOperator(op), new
MatrixBlock(), 0, 0, 0);
+ new ReorgOperator(op, numThreads), new
MatrixBlock(), 0, 0, 0);
MatrixBlock outputSparse = inputSparse.reorgOperations(
- new ReorgOperator(op), new
MatrixBlock(), 0, 0, 0);
+ new ReorgOperator(op, numThreads), new
MatrixBlock(), 0, 0, 0);
outputSparse.sparseToDense();
// Compare the dense representations of both outputs
diff --git
a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/SparseMatrixRollOperationCorrectnessTest.java
b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/SparseMatrixRollOperationCorrectnessTest.java
new file mode 100644
index 0000000000..e72b29072c
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/SparseMatrixRollOperationCorrectnessTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.matrix.libMatrixReorg;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.runtime.functionobjects.IndexFunction;
+import org.apache.sysds.runtime.functionobjects.RollIndex;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class SparseMatrixRollOperationCorrectnessTest {
+
+ private final double[][] input;
+ private final double[][] expected;
+ private final int shift;
+
+ public SparseMatrixRollOperationCorrectnessTest(double[][] input,
double[][] expected, int shift) {
+ this.input = input;
+ this.expected = expected;
+ this.shift = shift;
+ }
+
+ @Parameterized.Parameters(name = "Shift={2}, Size={0}x{1} (Sparse)")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {
+ new double[][] {{1, 0, 0}, {0, 2, 0}, {0, 0,
3}},
+ new double[][] {{0, 0, 3}, {1, 0, 0}, {0, 2,
0}},
+ 1
+ },
+ {
+ new double[][] {{1, 0, 0}, {0, 2, 0}, {0, 0,
3}},
+ new double[][] {{0, 2, 0}, {0, 0, 3}, {1, 0,
0}},
+ -1
+ },
+ {
+ new double[][] {{0}, {10}, {0}, {20}, {0}},
+ new double[][] {{20}, {0}, {0}, {10}, {0}},
+ 2
+ },
+ {
+ new double[][] {{1, 2}, {0, 0}, {3, 4}, {0, 0}},
+ new double[][] {{0, 0}, {1, 2}, {0, 0}, {3, 4}},
+ 1
+ },
+ {
+ new double[][] {{0, 0, 0}, {0, 0, 0}, {0, 5,
0}, {0, 0, 0}},
+ new double[][] {{0, 5, 0}, {0, 0, 0}, {0, 0,
0}, {0, 0, 0}},
+ 2
+ },
+ {
+ new double[][] {{1, 0}, {0, 2}, {3, 0}},
+ new double[][] {{3, 0}, {1, 0}, {0, 2}},
+ 4
+ },
+ {
+ new double[][] {{0, 1}, {0, 0}, {2, 0}},
+ new double[][] {{0, 0}, {2, 0}, {0, 1}},
+ -1
+ },
+ {
+ new double[][] {{0, 0}, {0, 0}},
+ new double[][] {{0, 0}, {0, 0}},
+ 1
+ },
+ {
+ new double[][] {{1, 0, 1}, {0, 1, 0}, {1, 0,
1}},
+ new double[][] {{1, 0, 1}, {1, 0, 1}, {0, 1,
0}},
+ 1
+ },
+ {
+ new double[][] {{0, 5}, {0, 0}, {2, 0}},
+ new double[][] {{0, 5}, {0, 0}, {2, 0}},
+ 0
+ },
+ {
+ new double[][] {{0, 5}, {0, 0}, {2, 0}},
+ new double[][] {{0, 5}, {0, 0}, {2, 0}},
+ 3
+ },
+ {
+ new double[][] {{0, 5}, {0, 0}, {2, 0}},
+ new double[][] {{0, 5}, {0, 0}, {2, 0}},
+ -3
+ },
+ {
+ new double[][] {{0, 0, 1, 0}, {0, 2, 0, 0}},
+ new double[][] {{0, 2, 0, 0}, {0, 0, 1, 0}},
+ 1
+ },
+ {
+ new double[][] {{0, 0, 1, 0}, {0, 2, 0, 0}},
+ new double[][] {{0, 2, 0, 0}, {0, 0, 1, 0}},
+ -1
+ },
+ {
+ new double[][] {{1, 1}, {0, 0}, {2, 2}, {0, 0}},
+ new double[][] {{0, 0}, {1, 1}, {0, 0}, {2, 2}},
+ 1
+ },
+ {
+ new double[][] {{0, 0}, {0, 0}, {1, 2}, {3, 4}},
+ new double[][] {{1, 2}, {3, 4}, {0, 0}, {0, 0}},
+ 2
+ },
+ {
+ new double[][] {{1, 0}, {0, 0}, {0, 2}},
+ new double[][] {{0, 2}, {1, 0}, {0, 0}},
+ 10
+ },
+ {
+ new double[][] {{1, 0}, {0, 0}, {0, 2}},
+ new double[][] {{0, 0}, {0, 2}, {1, 0}},
+ -10
+ },
+ {
+ new double[][] {{5, 0, 0, 0}, {0, 0, 0, 0}, {0,
0, 0, 0}, {0, 0, 0, 0}},
+ new double[][] {{0, 0, 0, 0}, {0, 0, 0, 0}, {0,
0, 0, 0}, {5, 0, 0, 0}},
+ 3
+ },
+ {
+ new double[][] {{5, 0, 0, 0}, {0, 0, 0, 0}, {0,
0, 0, 0}, {0, 0, 0, 0}},
+ new double[][] {{5, 0, 0, 0}, {0, 0, 0, 0}, {0,
0, 0, 0}, {0, 0, 0, 0}},
+ 4
+ },
+ {
+ new double[][] {{0, 1}, {0, 2}, {0, 3}, {0, 4},
{0, 5}},
+ new double[][] {{0, 3}, {0, 4}, {0, 5}, {0, 1},
{0, 2}},
+ 3
+ },
+ {
+ new double[][] {{-1, 0}, {0, 0}, {0, 5}},
+ new double[][] {{0, 5}, {-1, 0}, {0, 0}},
+ 1
+ }
+ });
+ }
+
+ @Test
+ public void testRollOperationProducesExpectedOutputSparse() {
+ MatrixBlock inBlock = new MatrixBlock(input.length,
input[0].length, false);
+ inBlock.init(input, input.length, input[0].length);
+
+ inBlock.denseToSparse(true);
+
+ Assert.assertTrue("Input block must be in sparse format",
inBlock.isInSparseFormat());
+
+ IndexFunction op = new RollIndex(shift);
+ ReorgOperator reorgOperator = new ReorgOperator(op);
+ MatrixBlock matrixBlock = new MatrixBlock();
+
+ MatrixBlock outBlock = inBlock.reorgOperations(reorgOperator,
matrixBlock, 0, 0, 0);
+
+ MatrixBlock expectedBlock = new MatrixBlock(expected.length,
expected[0].length, false);
+ expectedBlock.init(expected, expected.length,
expected[0].length);
+
+ TestUtils.compareMatrices(outBlock, expectedBlock, 1e-12,
+ "Sparse Roll operation does not match expected output");
+ }
+}