This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 11c5a74ed65bc23a9e7e144a1b80aa862899ca09
Author: Matthias Boehm <[email protected]>
AuthorDate: Sun Jul 20 11:28:44 2025 +0200

    [SYSTEMDS-3730] Fixed and improved multi-threaded reverse operations
    
    This patch consolidates the new multi-threaded reverse operation, by
    using common single- and multi-threaded kernels, fixing parallelization
    decisions and preallocation, and consolidating the tests.
---
 src/main/java/org/apache/sysds/hops/ReorgOp.java   |   6 +-
 .../sysds/runtime/matrix/data/LibMatrixReorg.java  |  70 ++++-----
 .../test/functions/reorg/FullReverseTest.java      | 156 ++++-----------------
 3 files changed, 56 insertions(+), 176 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/ReorgOp.java 
b/src/main/java/org/apache/sysds/hops/ReorgOp.java
index bd4fdc4f1d..5fc73e2bd3 100644
--- a/src/main/java/org/apache/sysds/hops/ReorgOp.java
+++ b/src/main/java/org/apache/sysds/hops/ReorgOp.java
@@ -159,12 +159,10 @@ public class ReorgOp extends MultiThreadedHop
                                break;
                                }
                        case REV: {
-                               long numel = getDim1() * getDim2();
-                               int k = (numel < 3000_000) ?
-                                               1 : 
OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
                                Transform transform1 = new Transform(
                                        getInput().get(0).constructLops(),
-                                       _op, getDataType(), getValueType(), et, 
k);
+                                       _op, getDataType(), getValueType(), et,
+                                       
OptimizerUtils.getConstrainedNumThreads(_maxNumThreads));
                                setOutputDimensions(transform1);
                                setLineNumbers(transform1);
                                setLops(transform1);
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
index 54f088792e..90ea445be8 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
@@ -382,10 +382,19 @@ public class LibMatrixReorg {
                        return out;
                }
                
-               if( in.sparse )
-                       reverseSparse( in, out );
-               else
-                       reverseDense( in, out );
+               //set basic meta data and allocate output
+               out.sparse = in.sparse;
+               out.nonZeros = in.nonZeros;
+                               
+               
+               if( in.sparse ) {
+                       out.allocateSparseRowsBlock(false);
+                       reverseSparse(in, out, 0, in.rlen);
+               }
+               else {
+                       out.allocateDenseBlock(false);
+                       reverseDense(in, out, 0, in.rlen);
+               }
                
                //System.out.println("rev ("+in.rlen+", "+in.clen+", 
"+in.sparse+") in "+time.stop()+" ms.");
 
@@ -393,7 +402,9 @@ public class LibMatrixReorg {
        }
 
        public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) {
-               if (k <= 1 || in.isEmptyBlock(false) ) {
+               if (k <= 1 || in.isEmptyBlock(false)
+                       || in.getLength() < PAR_NUMCELL_THRESHOLD ) 
+               {
                        return rev(in, out); // fallback to single-threaded
 
                }
@@ -405,10 +416,11 @@ public class LibMatrixReorg {
                out.reset(numRows, numCols, sparse);
 
                // Before starting threads, ensure the output sparse block is 
allocated!
-               if (sparse) {
+               if (sparse)
                        out.allocateSparseRowsBlock(false);
-               }
-
+               else
+                       out.allocateDenseBlock(false);
+               
                // Set up thread pool
                ExecutorService pool = CommonThreadPool.get(k);
                try {
@@ -420,25 +432,10 @@ public class LibMatrixReorg {
                                final int endRow = Math.min((i + 1) * blklen, 
numRows);
 
                                tasks.add(pool.submit(() -> {
-                                       if (!sparse) {
-                                               // Dense case
-                                               double[] inVals = 
in.getDenseBlockValues();
-                                               double[] outVals = 
out.getDenseBlockValues();
-                                               for (int r = startRow; r < 
endRow; r++) {
-                                                       int revRow = numRows - 
r - 1;
-                                                       
System.arraycopy(inVals, revRow * numCols, outVals, r * numCols, numCols);
-                                               }
-                                       } else {
-                                               // Sparse case
-                                               SparseBlock inBlk = 
in.getSparseBlock();
-                                               SparseBlock outBlk = 
out.getSparseBlock();
-                                               for (int r = startRow; r < 
endRow; r++) {
-                                                       int revRow = numRows - 
r - 1;
-                                                       if 
(!inBlk.isEmpty(revRow)) {
-                                                               outBlk.set(r, 
inBlk.get(revRow), true);
-                                                       }
-                                               }
-                                       }
+                                       if( in.sparse )
+                                               reverseSparse(in, out, 
startRow, endRow);
+                                       else
+                                               reverseDense(in, out, startRow, 
endRow);
                                }));
                        }
 
@@ -2523,41 +2520,30 @@ public class LibMatrixReorg {
                return cnt;
        }
 
-       private static void reverseDense(MatrixBlock in, MatrixBlock out) {
+       private static void reverseDense(MatrixBlock in, MatrixBlock out, int 
rl, int ru) {
                final int m = in.rlen;
                final int n = in.clen;
                
-               //set basic meta data and allocate output
-               out.sparse = false;
-               out.nonZeros = in.nonZeros;
-               out.allocateDenseBlock(false);
-               
                //copy all rows into target positions
                if( n == 1 ) { //column vector
                        double[] a = in.getDenseBlockValues();
                        double[] c = out.getDenseBlockValues();
-                       for( int i=0; i<m; i++ )
+                       for( int i=rl; i<ru; i++ )
                                c[m-1-i] = a[i];
                }
                else { //general matrix case
                        DenseBlock a = in.getDenseBlock();
                        DenseBlock c = out.getDenseBlock();
-                       for( int i=0; i<m; i++ ) {
+                       for( int i=rl; i<ru; i++ ) {
                                final int ri = m - 1 - i;
                                System.arraycopy(a.values(i), a.pos(i), 
c.values(ri), c.pos(ri), n);
                        }
                }
        }
 
-       private static void reverseSparse(MatrixBlock in, MatrixBlock out) {
+       private static void reverseSparse(MatrixBlock in, MatrixBlock out, int 
rl, int ru) {
                final int m = in.rlen;
                
-               //set basic meta data and allocate output
-               out.sparse = true;
-               out.nonZeros = in.nonZeros;
-               
-               out.allocateSparseRowsBlock(false);
-               
                //copy all rows into target positions
                SparseBlock a = in.getSparseBlock();
                SparseBlock c = out.getSparseBlock();
diff --git 
a/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java 
b/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java
index fb5f936641..d1a66791c8 100644
--- a/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java
@@ -22,10 +22,8 @@ package org.apache.sysds.test.functions.reorg;
 import java.util.HashMap;
 
 import org.apache.sysds.common.Opcodes;
-import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
 import org.junit.Assert;
 import org.junit.Test;
-import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.common.Types.ExecType;
 import org.apache.sysds.runtime.instructions.Instruction;
@@ -44,18 +42,16 @@ public class FullReverseTest extends AutomatedTestBase
        private final static String TEST_DIR = "functions/reorg/";
        private static final String TEST_CLASS_DIR = TEST_DIR + 
FullReverseTest.class.getSimpleName() + "/";
        
-       private final static int rows1 = 2017;
-       private final static int cols1 = 1001;
+       //single-threaded execution
+       private final static int rows1 = 201;
+       private final static int cols1 = 100;
+       //multi-threaded / distributed execution
+       private final static int rows2 = 2017;
+       private final static int cols2 = 1001;
+       
        private final static double sparsity1 = 0.7;
        private final static double sparsity2 = 0.1;
 
-       // Multi-threading test parameters
-       private final static int rows_mt = 5018;  // Larger for multi-threading 
benefits
-       private final static int cols_mt = 1001;   // Larger for 
multi-threading benefits
-       private final static int[] threadCounts = {1, 2, 4, 8};
-       // Set global parallelism for SystemDS to enable multi-threading
-       private final static int oldPar = 
InfrastructureAnalyzer.getLocalParallelism();
-
        @Override
        public void setUp() {
                TestUtils.clearAssertionInformation();
@@ -65,97 +61,74 @@ public class FullReverseTest extends AutomatedTestBase
 
        @Test
        public void testReverseVectorDenseCP() {
-               runReverseTest(TEST_NAME1, false, false, ExecType.CP);
+               runReverseTest(TEST_NAME1, false, rows1, 1, ExecType.CP);
        }
        
        @Test
        public void testReverseVectorSparseCP() {
-               runReverseTest(TEST_NAME1, false, true, ExecType.CP);
+               runReverseTest(TEST_NAME1, true, rows1, 1, ExecType.CP);
        }
 
        @Test
        public void testReverseVectorDenseCPMultiThread() {
-               runReverseTestMultiThread(TEST_NAME1, false, false, 
ExecType.CP);
+               runReverseTest(TEST_NAME1, false, rows2, 1, ExecType.CP);
        }
 
        @Test
        public void testReverseVectorSparseCPMultiThread() {
-               runReverseTestMultiThread(TEST_NAME1, false, true, ExecType.CP);
-       }
-
-       @Test
-       public void testReverseVectorDenseSPMultiThread() {
-               runReverseTestMultiThread(TEST_NAME1, false, false, 
ExecType.SPARK);
+               runReverseTest(TEST_NAME1, true, rows2, 1, ExecType.CP);
        }
 
        @Test
        public void testReverseVectorDenseSP() {
-               runReverseTest(TEST_NAME1, false, false, ExecType.SPARK);
+               runReverseTest(TEST_NAME1, false, rows2, 1, ExecType.SPARK);
        }
        
        @Test
        public void testReverseVectorSparseSP() {
-               runReverseTest(TEST_NAME1, false, true, ExecType.SPARK);
+               runReverseTest(TEST_NAME1, true, rows2, 1, ExecType.SPARK);
        }
        
        @Test
        public void testReverseMatrixDenseCP() {
-               runReverseTest(TEST_NAME1, true, false, ExecType.CP);
+               runReverseTest(TEST_NAME1, false, rows1, cols1, ExecType.CP);
        }
        
        @Test
        public void testReverseMatrixSparseCP() {
-               runReverseTest(TEST_NAME1, true, true, ExecType.CP);
+               runReverseTest(TEST_NAME1, true, rows1, cols1, ExecType.CP);
        }
        
        @Test
        public void testReverseMatrixDenseSP() {
-               runReverseTest(TEST_NAME1, true, false, ExecType.SPARK);
+               runReverseTest(TEST_NAME1, false, rows2, cols2, ExecType.SPARK);
        }
        
        @Test
        public void testReverseMatrixSparseSP() {
-               runReverseTest(TEST_NAME1, true, true, ExecType.SPARK);
+               runReverseTest(TEST_NAME1, true, rows2, cols2, ExecType.SPARK);
        }       
 
        @Test
        public void testReverseVectorDenseRewriteCP() {
-               runReverseTest(TEST_NAME2, false, false, ExecType.CP);
+               runReverseTest(TEST_NAME2, false, rows1, 1, ExecType.CP);
        }
        
        @Test
        public void testReverseMatrixDenseRewriteCP() {
-               runReverseTest(TEST_NAME2, true, false, ExecType.CP);
-       }       
-
+               runReverseTest(TEST_NAME2, false, rows1, 1, ExecType.CP);
+       }
        
-       /**
-        * 
-        * @param sparseM1
-        * @param sparseM2
-        * @param instType
-        */
-       private void runReverseTest(String testname, boolean matrix, boolean 
sparse, ExecType instType)
+       private void runReverseTest(String testname, boolean sparse, int rows, 
int cols, ExecType instType)
        {
-               //rtplatform for MR
-               ExecMode platformOld = rtplatform;
-               switch( instType ){
-                       case SPARK: rtplatform = ExecMode.SPARK; break;
-                       default: rtplatform = ExecMode.HYBRID; break;
-               }
-               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
-               if( rtplatform == ExecMode.SPARK )
-                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-               
+               ExecMode platformOld = setExecMode(instType);
                String TEST_NAME = testname;
                
                try
                {
-                       int cols = matrix ? cols1 : 1;
                        double sparsity = sparse ? sparsity2 : sparsity1;
                        getAndLoadTestConfiguration(TEST_NAME);
                        
-                       /* This is for running the junit test the new way, 
i.e., construct the arguments directly */
                        String HOME = SCRIPT_DIR + TEST_DIR;
                        fullDMLScriptName = HOME + TEST_NAME + ".dml";
                        programArgs = new String[]{"-stats","-explain","-args", 
input("A"), output("B") };
@@ -164,10 +137,10 @@ public class FullReverseTest extends AutomatedTestBase
                        rCmd = "Rscript" + " " + fullRScriptName + " " + 
inputDir() + " " + expectedDir();
        
                        //generate actual dataset 
-                       double[][] A = getRandomMatrix(rows1, cols, -1, 1, 
sparsity, 7); 
+                       double[][] A = getRandomMatrix(rows, cols, -1, 1, 
sparsity, 7); 
                        writeInputMatrixWithMTD("A", A, true);
        
-                       runTest(true, false, null, -1);                 
+                       runTest(true, false, null, -1); 
                        runRScript(true); 
                
                        //compare matrices 
@@ -181,85 +154,8 @@ public class FullReverseTest extends AutomatedTestBase
                        else if ( instType == ExecType.SPARK )
                                Assert.assertTrue("Missing opcode: 
"+Instruction.SP_INST_PREFIX+Opcodes.REV.toString(), 
Statistics.getCPHeavyHitterOpCodes().contains(Instruction.SP_INST_PREFIX+Opcodes.REV));
                }
-               finally
-               {
-                       //reset flags
-                       rtplatform = platformOld;
-                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-               }
-       }
-
-       private void runReverseTestMultiThread(String testname, boolean matrix, 
boolean sparse, ExecType instType)
-       {
-               // Compare single-thread vs multi-thread results
-//             HashMap<CellIndex, Double> stResult = 
runReverseWithThreads(testname, matrix, sparse, instType, 1);
-               HashMap<CellIndex, Double> mtResult = 
runReverseWithThreads(testname, matrix, sparse, instType, 8);
-
-               // Compare results to ensure consistency
-//             TestUtils.compareMatrices(stResult, mtResult, 0, "ST-Result", 
"MT-Result");
-       }
-
-       private HashMap<CellIndex, Double> runReverseWithThreads(String 
testname, boolean matrix, boolean sparse, ExecType instType, int numThreads)
-       {
-               //rtplatform for MR
-               ExecMode platformOld = rtplatform;
-               switch( instType ){
-                       case SPARK: rtplatform = ExecMode.SPARK; break;
-                       default: rtplatform = ExecMode.HYBRID; break;
-               }
-               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
-               if( rtplatform == ExecMode.SPARK )
-                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-
-               String TEST_NAME = testname;
-
-               System.out.println("I am trying to run multi-thread");
-
-               try
-               {
-                       System.setProperty("sysds.parallel.threads", 
String.valueOf(numThreads));
-
-//                     int cols = matrix ? cols_mt : 1;
-                       double sparsity = sparse ? sparsity2 : sparsity1;
-                       getAndLoadTestConfiguration(TEST_NAME);
-
-                       /* This is for running the junit test the new way, 
i.e., construct the arguments directly */
-                       String HOME = SCRIPT_DIR + TEST_DIR;
-                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
-
-                       // Add thread count to program arguments
-                       programArgs = new String[]{"-stats","-explain","-args", 
input("A"), output("B") };
-
-                       fullRScriptName = HOME + TEST_NAME + ".R";
-                       rCmd = "Rscript" + " " + fullRScriptName + " " + 
inputDir() + " " + expectedDir();
-
-                       //generate actual dataset
-                       double[][] A = getRandomMatrix(rows_mt, cols_mt, -1, 1, 
sparsity, 7);
-                       writeInputMatrixWithMTD("A", A, true);
-
-                       // Run with specified thread count (this is the key 
part)
-                       runTest(true, false, null, -1);
-
-                       //read and return results
-                       HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromOutputDir("B");
-
-                       //check generated opcode
-                       if( instType == ExecType.CP )
-                               Assert.assertTrue("Missing opcode: rev", 
Statistics.getCPHeavyHitterOpCodes().contains(Opcodes.REV.toString()));
-                       else if ( instType == ExecType.SPARK )
-                               Assert.assertTrue("Missing opcode: 
"+Instruction.SP_INST_PREFIX+Opcodes.REV.toString(), 
Statistics.getCPHeavyHitterOpCodes().contains(Instruction.SP_INST_PREFIX+Opcodes.REV));
-
-                       return dmlfile;
-               }
-               catch(Exception ex) {
-                       throw new RuntimeException(ex);
-               }
                finally {
-                       //reset flags
-                       rtplatform = platformOld;
-                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-                       System.setProperty("sysds.parallel.threads", 
String.valueOf(oldPar));
+                       resetExecMode(platformOld);
                }
        }
-               
-}
\ No newline at end of file
+}

Reply via email to