This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 11c5a74ed65bc23a9e7e144a1b80aa862899ca09 Author: Matthias Boehm <[email protected]> AuthorDate: Sun Jul 20 11:28:44 2025 +0200 [SYSTEMDS-3730] Fixed and improved multi-threaded reverse operations This patch consolidates the new multi-threaded reverse operation, by using common single- and multi-threaded kernels, fixing parallelization decisions and preallocation, and consolidating the tests. --- src/main/java/org/apache/sysds/hops/ReorgOp.java | 6 +- .../sysds/runtime/matrix/data/LibMatrixReorg.java | 70 ++++----- .../test/functions/reorg/FullReverseTest.java | 156 ++++----------------- 3 files changed, 56 insertions(+), 176 deletions(-) diff --git a/src/main/java/org/apache/sysds/hops/ReorgOp.java b/src/main/java/org/apache/sysds/hops/ReorgOp.java index bd4fdc4f1d..5fc73e2bd3 100644 --- a/src/main/java/org/apache/sysds/hops/ReorgOp.java +++ b/src/main/java/org/apache/sysds/hops/ReorgOp.java @@ -159,12 +159,10 @@ public class ReorgOp extends MultiThreadedHop break; } case REV: { - long numel = getDim1() * getDim2(); - int k = (numel < 3000_000) ? - 1 : OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); Transform transform1 = new Transform( getInput().get(0).constructLops(), - _op, getDataType(), getValueType(), et, k); + _op, getDataType(), getValueType(), et, + OptimizerUtils.getConstrainedNumThreads(_maxNumThreads)); setOutputDimensions(transform1); setLineNumbers(transform1); setLops(transform1); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java index 54f088792e..90ea445be8 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java @@ -382,10 +382,19 @@ public class LibMatrixReorg { return out; } - if( in.sparse ) - reverseSparse( in, out ); - else - reverseDense( in, out ); + //set basic meta data and allocate output + out.sparse = in.sparse; + out.nonZeros = in.nonZeros; + + + if( in.sparse ) { + out.allocateSparseRowsBlock(false); + reverseSparse(in, out, 0, in.rlen); + } + else { + out.allocateDenseBlock(false); + reverseDense(in, out, 0, in.rlen); + } //System.out.println("rev ("+in.rlen+", "+in.clen+", "+in.sparse+") in "+time.stop()+" ms."); @@ -393,7 +402,9 @@ public class LibMatrixReorg { } public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { - if (k <= 1 || in.isEmptyBlock(false) ) { + if (k <= 1 || in.isEmptyBlock(false) + || in.getLength() < PAR_NUMCELL_THRESHOLD ) + { return rev(in, out); // fallback to single-threaded } @@ -405,10 +416,11 @@ public class LibMatrixReorg { out.reset(numRows, numCols, sparse); // Before starting threads, ensure the output sparse block is allocated! - if (sparse) { + if (sparse) out.allocateSparseRowsBlock(false); - } - + else + out.allocateDenseBlock(false); + // Set up thread pool ExecutorService pool = CommonThreadPool.get(k); try { @@ -420,25 +432,10 @@ public class LibMatrixReorg { final int endRow = Math.min((i + 1) * blklen, numRows); tasks.add(pool.submit(() -> { - if (!sparse) { - // Dense case - double[] inVals = in.getDenseBlockValues(); - double[] outVals = out.getDenseBlockValues(); - for (int r = startRow; r < endRow; r++) { - int revRow = numRows - r - 1; - System.arraycopy(inVals, revRow * numCols, outVals, r * numCols, numCols); - } - } else { - // Sparse case - SparseBlock inBlk = in.getSparseBlock(); - SparseBlock outBlk = out.getSparseBlock(); - for (int r = startRow; r < endRow; r++) { - int revRow = numRows - r - 1; - if (!inBlk.isEmpty(revRow)) { - outBlk.set(r, inBlk.get(revRow), true); - } - } - } + if( in.sparse ) + reverseSparse(in, out, startRow, endRow); + else + reverseDense(in, out, startRow, endRow); })); } @@ -2523,41 +2520,30 @@ public class LibMatrixReorg { return cnt; } - private static void reverseDense(MatrixBlock in, MatrixBlock out) { + private static void reverseDense(MatrixBlock in, MatrixBlock out, int rl, int ru) { final int m = in.rlen; final int n = in.clen; - //set basic meta data and allocate output - out.sparse = false; - out.nonZeros = in.nonZeros; - out.allocateDenseBlock(false); - //copy all rows into target positions if( n == 1 ) { //column vector double[] a = in.getDenseBlockValues(); double[] c = out.getDenseBlockValues(); - for( int i=0; i<m; i++ ) + for( int i=rl; i<ru; i++ ) c[m-1-i] = a[i]; } else { //general matrix case DenseBlock a = in.getDenseBlock(); DenseBlock c = out.getDenseBlock(); - for( int i=0; i<m; i++ ) { + for( int i=rl; i<ru; i++ ) { final int ri = m - 1 - i; System.arraycopy(a.values(i), a.pos(i), c.values(ri), c.pos(ri), n); } } } - private static void reverseSparse(MatrixBlock in, MatrixBlock out) { + private static void reverseSparse(MatrixBlock in, MatrixBlock out, int rl, int ru) { final int m = in.rlen; - //set basic meta data and allocate output - out.sparse = true; - out.nonZeros = in.nonZeros; - - out.allocateSparseRowsBlock(false); - //copy all rows into target positions SparseBlock a = in.getSparseBlock(); SparseBlock c = out.getSparseBlock(); diff --git a/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java b/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java index fb5f936641..d1a66791c8 100644 --- a/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java +++ b/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java @@ -22,10 +22,8 @@ package org.apache.sysds.test.functions.reorg; import java.util.HashMap; import org.apache.sysds.common.Opcodes; -import org.apache.sysds.utils.stats.InfrastructureAnalyzer; import org.junit.Assert; import org.junit.Test; -import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.ExecMode; import org.apache.sysds.common.Types.ExecType; import org.apache.sysds.runtime.instructions.Instruction; @@ -44,18 +42,16 @@ public class FullReverseTest extends AutomatedTestBase private final static String TEST_DIR = "functions/reorg/"; private static final String TEST_CLASS_DIR = TEST_DIR + FullReverseTest.class.getSimpleName() + "/"; - private final static int rows1 = 2017; - private final static int cols1 = 1001; + //single-threaded execution + private final static int rows1 = 201; + private final static int cols1 = 100; + //multi-threaded / distributed execution + private final static int rows2 = 2017; + private final static int cols2 = 1001; + private final static double sparsity1 = 0.7; private final static double sparsity2 = 0.1; - // Multi-threading test parameters - private final static int rows_mt = 5018; // Larger for multi-threading benefits - private final static int cols_mt = 1001; // Larger for multi-threading benefits - private final static int[] threadCounts = {1, 2, 4, 8}; - // Set global parallelism for SystemDS to enable multi-threading - private final static int oldPar = InfrastructureAnalyzer.getLocalParallelism(); - @Override public void setUp() { TestUtils.clearAssertionInformation(); @@ -65,97 +61,74 @@ public class FullReverseTest extends AutomatedTestBase @Test public void testReverseVectorDenseCP() { - runReverseTest(TEST_NAME1, false, false, ExecType.CP); + runReverseTest(TEST_NAME1, false, rows1, 1, ExecType.CP); } @Test public void testReverseVectorSparseCP() { - runReverseTest(TEST_NAME1, false, true, ExecType.CP); + runReverseTest(TEST_NAME1, true, rows1, 1, ExecType.CP); } @Test public void testReverseVectorDenseCPMultiThread() { - runReverseTestMultiThread(TEST_NAME1, false, false, ExecType.CP); + runReverseTest(TEST_NAME1, false, rows2, 1, ExecType.CP); } @Test public void testReverseVectorSparseCPMultiThread() { - runReverseTestMultiThread(TEST_NAME1, false, true, ExecType.CP); - } - - @Test - public void testReverseVectorDenseSPMultiThread() { - runReverseTestMultiThread(TEST_NAME1, false, false, ExecType.SPARK); + runReverseTest(TEST_NAME1, true, rows2, 1, ExecType.CP); } @Test public void testReverseVectorDenseSP() { - runReverseTest(TEST_NAME1, false, false, ExecType.SPARK); + runReverseTest(TEST_NAME1, false, rows2, 1, ExecType.SPARK); } @Test public void testReverseVectorSparseSP() { - runReverseTest(TEST_NAME1, false, true, ExecType.SPARK); + runReverseTest(TEST_NAME1, true, rows2, 1, ExecType.SPARK); } @Test public void testReverseMatrixDenseCP() { - runReverseTest(TEST_NAME1, true, false, ExecType.CP); + runReverseTest(TEST_NAME1, false, rows1, cols1, ExecType.CP); } @Test public void testReverseMatrixSparseCP() { - runReverseTest(TEST_NAME1, true, true, ExecType.CP); + runReverseTest(TEST_NAME1, true, rows1, cols1, ExecType.CP); } @Test public void testReverseMatrixDenseSP() { - runReverseTest(TEST_NAME1, true, false, ExecType.SPARK); + runReverseTest(TEST_NAME1, false, rows2, cols2, ExecType.SPARK); } @Test public void testReverseMatrixSparseSP() { - runReverseTest(TEST_NAME1, true, true, ExecType.SPARK); + runReverseTest(TEST_NAME1, true, rows2, cols2, ExecType.SPARK); } @Test public void testReverseVectorDenseRewriteCP() { - runReverseTest(TEST_NAME2, false, false, ExecType.CP); + runReverseTest(TEST_NAME2, false, rows1, 1, ExecType.CP); } @Test public void testReverseMatrixDenseRewriteCP() { - runReverseTest(TEST_NAME2, true, false, ExecType.CP); - } - + runReverseTest(TEST_NAME2, false, rows1, 1, ExecType.CP); + } - /** - * - * @param sparseM1 - * @param sparseM2 - * @param instType - */ - private void runReverseTest(String testname, boolean matrix, boolean sparse, ExecType instType) + private void runReverseTest(String testname, boolean sparse, int rows, int cols, ExecType instType) { - //rtplatform for MR - ExecMode platformOld = rtplatform; - switch( instType ){ - case SPARK: rtplatform = ExecMode.SPARK; break; - default: rtplatform = ExecMode.HYBRID; break; - } - boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; - if( rtplatform == ExecMode.SPARK ) - DMLScript.USE_LOCAL_SPARK_CONFIG = true; - + ExecMode platformOld = setExecMode(instType); String TEST_NAME = testname; try { - int cols = matrix ? cols1 : 1; double sparsity = sparse ? sparsity2 : sparsity1; getAndLoadTestConfiguration(TEST_NAME); - /* This is for running the junit test the new way, i.e., construct the arguments directly */ String HOME = SCRIPT_DIR + TEST_DIR; fullDMLScriptName = HOME + TEST_NAME + ".dml"; programArgs = new String[]{"-stats","-explain","-args", input("A"), output("B") }; @@ -164,10 +137,10 @@ public class FullReverseTest extends AutomatedTestBase rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir(); //generate actual dataset - double[][] A = getRandomMatrix(rows1, cols, -1, 1, sparsity, 7); + double[][] A = getRandomMatrix(rows, cols, -1, 1, sparsity, 7); writeInputMatrixWithMTD("A", A, true); - runTest(true, false, null, -1); + runTest(true, false, null, -1); runRScript(true); //compare matrices @@ -181,85 +154,8 @@ public class FullReverseTest extends AutomatedTestBase else if ( instType == ExecType.SPARK ) Assert.assertTrue("Missing opcode: "+Instruction.SP_INST_PREFIX+Opcodes.REV.toString(), Statistics.getCPHeavyHitterOpCodes().contains(Instruction.SP_INST_PREFIX+Opcodes.REV)); } - finally - { - //reset flags - rtplatform = platformOld; - DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; - } - } - - private void runReverseTestMultiThread(String testname, boolean matrix, boolean sparse, ExecType instType) - { - // Compare single-thread vs multi-thread results -// HashMap<CellIndex, Double> stResult = runReverseWithThreads(testname, matrix, sparse, instType, 1); - HashMap<CellIndex, Double> mtResult = runReverseWithThreads(testname, matrix, sparse, instType, 8); - - // Compare results to ensure consistency -// TestUtils.compareMatrices(stResult, mtResult, 0, "ST-Result", "MT-Result"); - } - - private HashMap<CellIndex, Double> runReverseWithThreads(String testname, boolean matrix, boolean sparse, ExecType instType, int numThreads) - { - //rtplatform for MR - ExecMode platformOld = rtplatform; - switch( instType ){ - case SPARK: rtplatform = ExecMode.SPARK; break; - default: rtplatform = ExecMode.HYBRID; break; - } - boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; - if( rtplatform == ExecMode.SPARK ) - DMLScript.USE_LOCAL_SPARK_CONFIG = true; - - String TEST_NAME = testname; - - System.out.println("I am trying to run multi-thread"); - - try - { - System.setProperty("sysds.parallel.threads", String.valueOf(numThreads)); - -// int cols = matrix ? cols_mt : 1; - double sparsity = sparse ? sparsity2 : sparsity1; - getAndLoadTestConfiguration(TEST_NAME); - - /* This is for running the junit test the new way, i.e., construct the arguments directly */ - String HOME = SCRIPT_DIR + TEST_DIR; - fullDMLScriptName = HOME + TEST_NAME + ".dml"; - - // Add thread count to program arguments - programArgs = new String[]{"-stats","-explain","-args", input("A"), output("B") }; - - fullRScriptName = HOME + TEST_NAME + ".R"; - rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir(); - - //generate actual dataset - double[][] A = getRandomMatrix(rows_mt, cols_mt, -1, 1, sparsity, 7); - writeInputMatrixWithMTD("A", A, true); - - // Run with specified thread count (this is the key part) - runTest(true, false, null, -1); - - //read and return results - HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("B"); - - //check generated opcode - if( instType == ExecType.CP ) - Assert.assertTrue("Missing opcode: rev", Statistics.getCPHeavyHitterOpCodes().contains(Opcodes.REV.toString())); - else if ( instType == ExecType.SPARK ) - Assert.assertTrue("Missing opcode: "+Instruction.SP_INST_PREFIX+Opcodes.REV.toString(), Statistics.getCPHeavyHitterOpCodes().contains(Instruction.SP_INST_PREFIX+Opcodes.REV)); - - return dmlfile; - } - catch(Exception ex) { - throw new RuntimeException(ex); - } finally { - //reset flags - rtplatform = platformOld; - DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; - System.setProperty("sysds.parallel.threads", String.valueOf(oldPar)); + resetExecMode(platformOld); } } - -} \ No newline at end of file +}
