This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new 5a40d3f7bf [MINOR] Additional parfor tests for result merge implementations 5a40d3f7bf is described below commit 5a40d3f7bf9cc603cdd6687c49fdbd68db7c6c01 Author: Matthias Boehm <mboe...@gmail.com> AuthorDate: Mon Aug 19 19:53:30 2024 +0200 [MINOR] Additional parfor tests for result merge implementations --- .../runtime/controlprogram/ParForProgramBlock.java | 12 ++- .../sysds/runtime/matrix/data/MatrixBlock.java | 4 + .../test/component/parfor/ResultMergeTest.java | 111 +++++++++++++++++++++ .../test/component/parfor/TaskPartitionerTest.java | 8 +- 4 files changed, 129 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java index 01896cb73e..f415a8e695 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java @@ -1310,8 +1310,8 @@ public class ParForProgramBlock extends ForProgramBlock { return dp; } - private ResultMerge<?> createResultMerge( PResultMerge prm, - CacheableData<?> out, CacheableData<?>[] in, String fname, boolean accum, ExecutionContext ec ) + public static ResultMerge<?> createResultMerge( PResultMerge prm, + CacheableData<?> out, CacheableData<?>[] in, String fname, boolean accum, int numThreads, ExecutionContext ec ) { ResultMerge<?> rm; @@ -1333,7 +1333,7 @@ public class ParForProgramBlock extends ForProgramBlock { rm = new ResultMergeLocalAutomatic( (MatrixObject)out, (MatrixObject[])in, fname, accum ); break; case REMOTE_SPARK: - int numMap = Math.max(_numThreads, + int numMap = Math.max(numThreads, SparkExecutionContext.getDefaultParallelism(true)); int numRed = numMap; //equal map/reduce rm = new ResultMergeRemoteSpark( (MatrixObject)out, @@ -1460,7 +1460,8 @@ public class ParForProgramBlock extends ForProgramBlock { CacheableData<?>[] in = (dat instanceof MatrixObject) ? tmp.toArray(MatrixObject[]::new) : tmp.toArray(FrameObject[]::new); String fname = constructResultMergeFileName(); - ResultMerge<?> rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec); + ResultMerge<?> rm = createResultMerge(_resultMerge, + out, in, fname, var._isAccum, _numThreads, ec); CacheableData<?> outNew = USE_PARALLEL_RESULT_MERGE ? rm.executeParallelMerge(_numThreads) : rm.executeSerialMerge(); @@ -1678,7 +1679,8 @@ public class ParForProgramBlock extends ForProgramBlock { String fname = constructResultMergeFileName(); - ResultMerge<?> rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, _ec); + ResultMerge<?> rm = createResultMerge(_resultMerge, + out, in, fname, var._isAccum, _numThreads, _ec); CacheableData<?> outNew; if( USE_PARALLEL_RESULT_MERGE ) outNew = rm.executeParallelMerge( _numThreads ); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java index 16a9503699..3bc43ff74c 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java @@ -3686,6 +3686,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock<MatrixBlock>, public final MatrixBlock append(MatrixBlock that) { return append(that, null, true); // default cbind } + + public final MatrixBlock rbind(MatrixBlock that) { + return append(that, null, false); + } /** * Append that matrix to this matrix, while allocating a new matrix. diff --git a/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java new file mode 100644 index 0000000000..fabfce69ce --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.parfor; + +import org.apache.sysds.common.Types.FileFormat; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.controlprogram.ParForProgramBlock; +import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PResultMerge; +import org.apache.sysds.runtime.controlprogram.caching.CacheableData; +import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory; +import org.apache.sysds.runtime.controlprogram.parfor.ResultMerge; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MetaDataFormat; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Ignore; +import org.junit.Test; + +public class ResultMergeTest extends AutomatedTestBase{ + private final static String TEST_NAME = "parfor_rm"; + private final static String TEST_DIR = "functions/parfor/"; + private static final String TEST_CLASS_DIR = TEST_DIR + ResultMergeTest.class.getSimpleName() + "/"; + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME,new String[]{"C"})); + } + + @Test + public void testLocalMem() { + testResultMergeAll(PResultMerge.LOCAL_MEM); + } + + @Test + @Ignore //FIXME + public void testLocalFile() { + testResultMergeAll(PResultMerge.LOCAL_FILE); + } + + @Test + public void testLocalAutomatic() { + testResultMergeAll(PResultMerge.LOCAL_AUTOMATIC); + } + + private void testResultMergeAll(PResultMerge mtype) { + testResultMerge(false, false, false, mtype); + testResultMerge(false, true, false, mtype); + testResultMerge(true, false, false, mtype); + //testResultMerge(true, true, false, mtype); invalid + } + + private void testResultMerge(boolean par, boolean accum, boolean compare, PResultMerge mtype) { + loadTestConfiguration(getTestConfiguration(TEST_NAME)); + + //create input and output objects + MatrixBlock A = MatrixBlock.randOperations(1200, 1100, 0.1); + CacheableData<?> Cobj = compare ? + toMatrixObject(A, output("C")) : + toMatrixObject(new MatrixBlock(1200,1100,true), output("C")); + MatrixBlock empty = new MatrixBlock(400,1100,true); + MatrixObject[] Bobj = new MatrixObject[3]; + Bobj[0] = toMatrixObject(A.slice(0,399).rbind(empty).rbind(empty), output("B0")); + Bobj[1] = toMatrixObject(empty.rbind(A.slice(400,799)).rbind(empty), output("B1")); + Bobj[2] = toMatrixObject(empty.rbind(empty).rbind(A.slice(800,1199)), output("B1")); + + //create result merge + ExecutionContext ec = ExecutionContextFactory.createContext(); + int numThreads = 3; + ResultMerge<?> rm = ParForProgramBlock.createResultMerge( + mtype, Cobj, Bobj, output("C"), accum, numThreads, ec); + + //execute results merge + if( par ) + Cobj = rm.executeParallelMerge(numThreads); + else + Cobj = rm.executeSerialMerge(); + + //check results + TestUtils.compareMatrices(A, + (MatrixBlock)Cobj.acquireReadAndRelease(), 1e-14); + } + + private static MatrixObject toMatrixObject(MatrixBlock mb, String filename) { + MetaDataFormat md = new MetaDataFormat( + mb.getDataCharacteristics().setBlocksize(1000), FileFormat.BINARY); + MatrixObject mo = new MatrixObject(ValueType.FP64, filename, md); + mo.acquireModify(mb); + mo.release(); + return mo; + } +} diff --git a/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java index 4e3acbe4c6..8698381e78 100644 --- a/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java +++ b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java @@ -27,11 +27,17 @@ import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactory; import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysds.runtime.instructions.cp.IntObject; import org.apache.sysds.runtime.util.CommonThreadPool; +import org.apache.sysds.test.AutomatedTestBase; import org.junit.Assert; import org.junit.Test; -public class TaskPartitionerTest { +public class TaskPartitionerTest extends AutomatedTestBase{ + @Override + public void setUp() { + + } + @Test public void testNaive() { testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, PTaskPartitioner.NAIVE);