This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a40d3f7bf [MINOR] Additional parfor tests for result merge 
implementations
5a40d3f7bf is described below

commit 5a40d3f7bf9cc603cdd6687c49fdbd68db7c6c01
Author: Matthias Boehm <mboe...@gmail.com>
AuthorDate: Mon Aug 19 19:53:30 2024 +0200

    [MINOR] Additional parfor tests for result merge implementations
---
 .../runtime/controlprogram/ParForProgramBlock.java |  12 ++-
 .../sysds/runtime/matrix/data/MatrixBlock.java     |   4 +
 .../test/component/parfor/ResultMergeTest.java     | 111 +++++++++++++++++++++
 .../test/component/parfor/TaskPartitionerTest.java |   8 +-
 4 files changed, 129 insertions(+), 6 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 01896cb73e..f415a8e695 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -1310,8 +1310,8 @@ public class ParForProgramBlock extends ForProgramBlock {
                return dp;
        }
 
-       private ResultMerge<?> createResultMerge( PResultMerge prm,
-               CacheableData<?> out, CacheableData<?>[] in, String fname, 
boolean accum, ExecutionContext ec ) 
+       public static ResultMerge<?> createResultMerge( PResultMerge prm,
+               CacheableData<?> out, CacheableData<?>[] in, String fname, 
boolean accum, int numThreads, ExecutionContext ec ) 
        {
                ResultMerge<?> rm;
                
@@ -1333,7 +1333,7 @@ public class ParForProgramBlock extends ForProgramBlock {
                                        rm = new ResultMergeLocalAutomatic( 
(MatrixObject)out, (MatrixObject[])in, fname, accum );
                                        break;
                                case REMOTE_SPARK:
-                                       int numMap = Math.max(_numThreads,
+                                       int numMap = Math.max(numThreads,
                                                
SparkExecutionContext.getDefaultParallelism(true));
                                        int numRed = numMap; //equal map/reduce
                                        rm = new ResultMergeRemoteSpark( 
(MatrixObject)out,
@@ -1460,7 +1460,8 @@ public class ParForProgramBlock extends ForProgramBlock {
                                        CacheableData<?>[] in = (dat instanceof 
MatrixObject) ?
                                                
tmp.toArray(MatrixObject[]::new) : tmp.toArray(FrameObject[]::new);
                                        String fname = 
constructResultMergeFileName();
-                                       ResultMerge<?> rm = 
createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec);
+                                       ResultMerge<?> rm = 
createResultMerge(_resultMerge,
+                                               out, in, fname, var._isAccum, 
_numThreads, ec);
                                        CacheableData<?> outNew = 
USE_PARALLEL_RESULT_MERGE ?
                                                
rm.executeParallelMerge(_numThreads) :
                                                rm.executeSerialMerge();
@@ -1678,7 +1679,8 @@ public class ParForProgramBlock extends ForProgramBlock {
                                        
                                        String fname = 
constructResultMergeFileName();
                                
-                                       ResultMerge<?> rm = 
createResultMerge(_resultMerge, out, in, fname, var._isAccum, _ec);
+                                       ResultMerge<?> rm = 
createResultMerge(_resultMerge,
+                                               out, in, fname, var._isAccum, 
_numThreads, _ec);
                                        CacheableData<?> outNew;
                                        if( USE_PARALLEL_RESULT_MERGE )
                                                outNew = 
rm.executeParallelMerge( _numThreads );
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 16a9503699..3bc43ff74c 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -3686,6 +3686,10 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock<MatrixBlock>,
        public final MatrixBlock append(MatrixBlock that) {
                return append(that, null, true); // default cbind
        }
+       
+       public final MatrixBlock rbind(MatrixBlock that) {
+               return append(that, null, false);
+       }
 
        /**
         * Append that matrix to this matrix, while allocating a new matrix. 
diff --git 
a/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java 
b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
new file mode 100644
index 0000000000..fabfce69ce
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.parfor;
+
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
+import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PResultMerge;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysds.runtime.controlprogram.parfor.ResultMerge;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.MetaDataFormat;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class ResultMergeTest extends AutomatedTestBase{
+       private final static String TEST_NAME = "parfor_rm";
+       private final static String TEST_DIR = "functions/parfor/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
ResultMergeTest.class.getSimpleName() + "/";
+       
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME,new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME,new String[]{"C"})); 
+       }
+       
+       @Test
+       public void testLocalMem() {
+               testResultMergeAll(PResultMerge.LOCAL_MEM);
+       }
+       
+       @Test
+       @Ignore //FIXME
+       public void testLocalFile() {
+               testResultMergeAll(PResultMerge.LOCAL_FILE);
+       }
+       
+       @Test
+       public void testLocalAutomatic() {
+               testResultMergeAll(PResultMerge.LOCAL_AUTOMATIC);
+       }
+       
+       private void testResultMergeAll(PResultMerge mtype) {
+               testResultMerge(false, false, false, mtype);
+               testResultMerge(false, true, false, mtype);
+               testResultMerge(true, false, false, mtype);
+               //testResultMerge(true, true, false, mtype); invalid
+       }
+       
+       private void testResultMerge(boolean par, boolean accum, boolean 
compare, PResultMerge mtype) {
+               loadTestConfiguration(getTestConfiguration(TEST_NAME));
+
+               //create input and output objects
+               MatrixBlock A = MatrixBlock.randOperations(1200, 1100, 0.1);
+               CacheableData<?> Cobj = compare ?
+                       toMatrixObject(A, output("C")) :
+                       toMatrixObject(new MatrixBlock(1200,1100,true), 
output("C"));
+               MatrixBlock empty = new MatrixBlock(400,1100,true);
+               MatrixObject[] Bobj = new MatrixObject[3];
+               Bobj[0] = 
toMatrixObject(A.slice(0,399).rbind(empty).rbind(empty), output("B0"));
+               Bobj[1] = 
toMatrixObject(empty.rbind(A.slice(400,799)).rbind(empty), output("B1"));
+               Bobj[2] = 
toMatrixObject(empty.rbind(empty).rbind(A.slice(800,1199)), output("B1"));
+               
+               //create result merge
+               ExecutionContext ec = ExecutionContextFactory.createContext();
+               int numThreads = 3;
+               ResultMerge<?> rm = ParForProgramBlock.createResultMerge(
+                       mtype, Cobj, Bobj, output("C"), accum, numThreads, ec);
+                       
+               //execute results merge
+               if( par )
+                       Cobj = rm.executeParallelMerge(numThreads);
+               else 
+                       Cobj = rm.executeSerialMerge();
+               
+               //check results
+               TestUtils.compareMatrices(A, 
+                       (MatrixBlock)Cobj.acquireReadAndRelease(), 1e-14);
+       }
+       
+       private static MatrixObject toMatrixObject(MatrixBlock mb, String 
filename) {
+               MetaDataFormat md = new MetaDataFormat(
+                       mb.getDataCharacteristics().setBlocksize(1000), 
FileFormat.BINARY);
+               MatrixObject mo = new MatrixObject(ValueType.FP64, filename, 
md);
+               mo.acquireModify(mb);
+               mo.release();
+               return mo;
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java 
b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
index 4e3acbe4c6..8698381e78 100644
--- 
a/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
@@ -27,11 +27,17 @@ import 
org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactory;
 import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.instructions.cp.IntObject;
 import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.test.AutomatedTestBase;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class TaskPartitionerTest {
+public class TaskPartitionerTest extends AutomatedTestBase{
 
+       @Override
+       public void setUp() {
+               
+       }
+       
        @Test
        public void testNaive() {
                testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, 
PTaskPartitioner.NAIVE);

Reply via email to