This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 5a40d3f7bf [MINOR] Additional parfor tests for result merge
implementations
5a40d3f7bf is described below
commit 5a40d3f7bf9cc603cdd6687c49fdbd68db7c6c01
Author: Matthias Boehm <[email protected]>
AuthorDate: Mon Aug 19 19:53:30 2024 +0200
[MINOR] Additional parfor tests for result merge implementations
---
.../runtime/controlprogram/ParForProgramBlock.java | 12 ++-
.../sysds/runtime/matrix/data/MatrixBlock.java | 4 +
.../test/component/parfor/ResultMergeTest.java | 111 +++++++++++++++++++++
.../test/component/parfor/TaskPartitionerTest.java | 8 +-
4 files changed, 129 insertions(+), 6 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 01896cb73e..f415a8e695 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -1310,8 +1310,8 @@ public class ParForProgramBlock extends ForProgramBlock {
return dp;
}
- private ResultMerge<?> createResultMerge( PResultMerge prm,
- CacheableData<?> out, CacheableData<?>[] in, String fname,
boolean accum, ExecutionContext ec )
+ public static ResultMerge<?> createResultMerge( PResultMerge prm,
+ CacheableData<?> out, CacheableData<?>[] in, String fname,
boolean accum, int numThreads, ExecutionContext ec )
{
ResultMerge<?> rm;
@@ -1333,7 +1333,7 @@ public class ParForProgramBlock extends ForProgramBlock {
rm = new ResultMergeLocalAutomatic(
(MatrixObject)out, (MatrixObject[])in, fname, accum );
break;
case REMOTE_SPARK:
- int numMap = Math.max(_numThreads,
+ int numMap = Math.max(numThreads,
SparkExecutionContext.getDefaultParallelism(true));
int numRed = numMap; //equal map/reduce
rm = new ResultMergeRemoteSpark(
(MatrixObject)out,
@@ -1460,7 +1460,8 @@ public class ParForProgramBlock extends ForProgramBlock {
CacheableData<?>[] in = (dat instanceof
MatrixObject) ?
tmp.toArray(MatrixObject[]::new) : tmp.toArray(FrameObject[]::new);
String fname =
constructResultMergeFileName();
- ResultMerge<?> rm =
createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec);
+ ResultMerge<?> rm =
createResultMerge(_resultMerge,
+ out, in, fname, var._isAccum,
_numThreads, ec);
CacheableData<?> outNew =
USE_PARALLEL_RESULT_MERGE ?
rm.executeParallelMerge(_numThreads) :
rm.executeSerialMerge();
@@ -1678,7 +1679,8 @@ public class ParForProgramBlock extends ForProgramBlock {
String fname =
constructResultMergeFileName();
- ResultMerge<?> rm =
createResultMerge(_resultMerge, out, in, fname, var._isAccum, _ec);
+ ResultMerge<?> rm =
createResultMerge(_resultMerge,
+ out, in, fname, var._isAccum,
_numThreads, _ec);
CacheableData<?> outNew;
if( USE_PARALLEL_RESULT_MERGE )
outNew =
rm.executeParallelMerge( _numThreads );
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 16a9503699..3bc43ff74c 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -3686,6 +3686,10 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock<MatrixBlock>,
public final MatrixBlock append(MatrixBlock that) {
return append(that, null, true); // default cbind
}
+
+ public final MatrixBlock rbind(MatrixBlock that) {
+ return append(that, null, false);
+ }
/**
* Append that matrix to this matrix, while allocating a new matrix.
diff --git
a/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
new file mode 100644
index 0000000000..fabfce69ce
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.parfor;
+
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
+import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PResultMerge;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysds.runtime.controlprogram.parfor.ResultMerge;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.MetaDataFormat;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class ResultMergeTest extends AutomatedTestBase{
+ private final static String TEST_NAME = "parfor_rm";
+ private final static String TEST_DIR = "functions/parfor/";
+ private static final String TEST_CLASS_DIR = TEST_DIR +
ResultMergeTest.class.getSimpleName() + "/";
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME,new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME,new String[]{"C"}));
+ }
+
+ @Test
+ public void testLocalMem() {
+ testResultMergeAll(PResultMerge.LOCAL_MEM);
+ }
+
+ @Test
+ @Ignore //FIXME
+ public void testLocalFile() {
+ testResultMergeAll(PResultMerge.LOCAL_FILE);
+ }
+
+ @Test
+ public void testLocalAutomatic() {
+ testResultMergeAll(PResultMerge.LOCAL_AUTOMATIC);
+ }
+
+ private void testResultMergeAll(PResultMerge mtype) {
+ testResultMerge(false, false, false, mtype);
+ testResultMerge(false, true, false, mtype);
+ testResultMerge(true, false, false, mtype);
+ //testResultMerge(true, true, false, mtype); invalid
+ }
+
+ private void testResultMerge(boolean par, boolean accum, boolean
compare, PResultMerge mtype) {
+ loadTestConfiguration(getTestConfiguration(TEST_NAME));
+
+ //create input and output objects
+ MatrixBlock A = MatrixBlock.randOperations(1200, 1100, 0.1);
+ CacheableData<?> Cobj = compare ?
+ toMatrixObject(A, output("C")) :
+ toMatrixObject(new MatrixBlock(1200,1100,true),
output("C"));
+ MatrixBlock empty = new MatrixBlock(400,1100,true);
+ MatrixObject[] Bobj = new MatrixObject[3];
+ Bobj[0] =
toMatrixObject(A.slice(0,399).rbind(empty).rbind(empty), output("B0"));
+ Bobj[1] =
toMatrixObject(empty.rbind(A.slice(400,799)).rbind(empty), output("B1"));
+ Bobj[2] =
toMatrixObject(empty.rbind(empty).rbind(A.slice(800,1199)), output("B1"));
+
+ //create result merge
+ ExecutionContext ec = ExecutionContextFactory.createContext();
+ int numThreads = 3;
+ ResultMerge<?> rm = ParForProgramBlock.createResultMerge(
+ mtype, Cobj, Bobj, output("C"), accum, numThreads, ec);
+
+ //execute results merge
+ if( par )
+ Cobj = rm.executeParallelMerge(numThreads);
+ else
+ Cobj = rm.executeSerialMerge();
+
+ //check results
+ TestUtils.compareMatrices(A,
+ (MatrixBlock)Cobj.acquireReadAndRelease(), 1e-14);
+ }
+
+ private static MatrixObject toMatrixObject(MatrixBlock mb, String
filename) {
+ MetaDataFormat md = new MetaDataFormat(
+ mb.getDataCharacteristics().setBlocksize(1000),
FileFormat.BINARY);
+ MatrixObject mo = new MatrixObject(ValueType.FP64, filename,
md);
+ mo.acquireModify(mb);
+ mo.release();
+ return mo;
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
index 4e3acbe4c6..8698381e78 100644
---
a/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
+++
b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
@@ -27,11 +27,17 @@ import
org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactory;
import
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.instructions.cp.IntObject;
import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.test.AutomatedTestBase;
import org.junit.Assert;
import org.junit.Test;
-public class TaskPartitionerTest {
+public class TaskPartitionerTest extends AutomatedTestBase{
+ @Override
+ public void setUp() {
+
+ }
+
@Test
public void testNaive() {
testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE,
PTaskPartitioner.NAIVE);