This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new 78b340fe9e [MINOR] Additional parfor tests, and minor cleanups of unused code 78b340fe9e is described below commit 78b340fe9eb84adbc1e236f543366b5d53653faf Author: Matthias Boehm <mboe...@gmail.com> AuthorDate: Sat Aug 17 13:42:11 2024 +0200 [MINOR] Additional parfor tests, and minor cleanups of unused code --- src/main/java/org/apache/sysds/hops/LiteralOp.java | 2 +- .../sysds/parser/BuiltinFunctionExpression.java | 1 + .../runtime/controlprogram/ParForProgramBlock.java | 47 +----- .../parfor/ResultMergeRemoteGrouping.java | 42 ----- .../parfor/ResultMergeRemoteSorting.java | 46 ----- .../parfor/ResultMergeTaggedMatrixIndexes.java | 94 ----------- .../controlprogram/parfor/TaskPartitioner.java | 4 +- .../parfor/TaskPartitionerFactory.java | 50 ++++++ .../parfor/TaskPartitionerNaive.java | 6 +- .../parfor/TaskPartitionerStatic.java | 6 +- .../parfor/opt/OptTreePlanChecker.java | 185 --------------------- .../parfor/opt/OptimizationWrapper.java | 13 -- .../sysds/runtime/util/BinaryBlockInputFormat.java | 48 ------ .../runtime/util/BinaryBlockRecordReader.java | 68 -------- .../apache/sysds/utils/SystemDSLoaderUtils.java | 41 ----- .../sysds/test/component/misc/OpTypeTest.java | 4 - .../test/component/parfor/TaskPartitionerTest.java | 85 ++++++++++ .../functions/builtin/part2/BuiltinRaJoinTest.java | 1 - .../parfor/misc/ParForRecursiveFunctionTest.java | 84 ++++++++++ .../scripts/functions/parfor/parfor_recursive.dml | 37 +++++ 20 files changed, 270 insertions(+), 594 deletions(-) diff --git a/src/main/java/org/apache/sysds/hops/LiteralOp.java b/src/main/java/org/apache/sysds/hops/LiteralOp.java index 9c44ef1187..1d2911f1fa 100644 --- a/src/main/java/org/apache/sysds/hops/LiteralOp.java +++ b/src/main/java/org/apache/sysds/hops/LiteralOp.java @@ -254,8 +254,8 @@ public class LiteralOp extends Hop case FP64: return String.valueOf(value_double); case STRING: + case HASH32: case HASH64: - return value_string; case CHARACTER: return value_string; case UNKNOWN: diff --git a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java index ec9b4a4bbd..de10e090f5 100644 --- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java +++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java @@ -965,6 +965,7 @@ public class BuiltinFunctionExpression extends DataIdentifier { case CHARACTER: case FP64: case FP32: + case HASH32: case HASH64: //default output.setValueType(ValueType.FP64); break; diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java index c4c75c35e7..01896cb73e 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java @@ -75,12 +75,7 @@ import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalMemory; import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeRemoteSpark; import org.apache.sysds.runtime.controlprogram.parfor.Task; import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitioner; -import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactoring; -import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactoringCmax; -import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactoringCmin; -import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFixedsize; -import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerNaive; -import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerStatic; +import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactory; import org.apache.sysds.runtime.controlprogram.parfor.opt.OptTreeConverter; import org.apache.sysds.runtime.controlprogram.parfor.opt.OptimizationWrapper; import org.apache.sysds.runtime.controlprogram.parfor.opt.OptimizerRuleBased; @@ -1279,43 +1274,9 @@ public class ParForProgramBlock extends ForProgramBlock { * @param incr ? * @return task partitioner */ - private TaskPartitioner createTaskPartitioner( IntObject from, IntObject to, IntObject incr ) - { - TaskPartitioner tp; - - switch( _taskPartitioner ) { - case FIXED: - tp = new TaskPartitionerFixedsize( - _taskSize, _iterPredVar, from, to, incr); - break; - case NAIVE: - tp = new TaskPartitionerNaive( - _taskSize, _iterPredVar, from, to, incr); - break; - case STATIC: - tp = new TaskPartitionerStatic( - _taskSize, _numThreads, _iterPredVar, from, to, incr); - break; - case FACTORING: - tp = new TaskPartitionerFactoring( - _taskSize,_numThreads, _iterPredVar, from, to, incr); - break; - case FACTORING_CMIN: - //for constrained factoring the tasksize is used as the minimum constraint - tp = new TaskPartitionerFactoringCmin(_taskSize,_numThreads, - _taskSize, _iterPredVar, from, to, incr); - break; - - case FACTORING_CMAX: - //for constrained factoring the tasksize is used as the minimum constraint - tp = new TaskPartitionerFactoringCmax(_taskSize,_numThreads, - _taskSize, _iterPredVar, from, to, incr); - break; - default: - throw new DMLRuntimeException("Undefined task partitioner: '"+_taskPartitioner+"'."); - } - - return tp; + private TaskPartitioner createTaskPartitioner( IntObject from, IntObject to, IntObject incr ) { + return TaskPartitionerFactory.createTaskPartitioner( + _taskPartitioner, from, to, incr, _taskSize, _numThreads, _iterPredVar); } /** diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java deleted file mode 100644 index ccf27c9a8f..0000000000 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.runtime.controlprogram.parfor; - - -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; - -public class ResultMergeRemoteGrouping extends WritableComparator -{ - protected ResultMergeRemoteGrouping() { - super(ResultMergeTaggedMatrixIndexes.class,true); - } - - @Override - @SuppressWarnings("rawtypes") - public int compare(WritableComparable k1, WritableComparable k2) - { - ResultMergeTaggedMatrixIndexes key1 = (ResultMergeTaggedMatrixIndexes)k1; - ResultMergeTaggedMatrixIndexes key2 = (ResultMergeTaggedMatrixIndexes)k2; - - //group by matrix indexes only (including all tags) - return key1.getIndexes().compareTo(key2.getIndexes()); - } -} diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java deleted file mode 100644 index 5d44903958..0000000000 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.runtime.controlprogram.parfor; - -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; - -public class ResultMergeRemoteSorting extends WritableComparator -{ - protected ResultMergeRemoteSorting() { - super(ResultMergeTaggedMatrixIndexes.class, true); - } - - @Override - @SuppressWarnings("rawtypes") - public int compare(WritableComparable k1, WritableComparable k2) - { - ResultMergeTaggedMatrixIndexes key1 = (ResultMergeTaggedMatrixIndexes)k1; - ResultMergeTaggedMatrixIndexes key2 = (ResultMergeTaggedMatrixIndexes)k2; - - int ret = key1.getIndexes().compareTo(key2.getIndexes()); - if( ret == 0 ) //same indexes, secondary sort - { - ret = ((key1.getTag() == key2.getTag()) ? 0 : - (key1.getTag() < key2.getTag())? -1 : 1); - } - return ret; - } -} diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java deleted file mode 100644 index c0437dce35..0000000000 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.runtime.controlprogram.parfor; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.WritableComparable; -import org.apache.sysds.runtime.matrix.data.MatrixIndexes; - -/** - * This class serves as composite key for the remote result merge job - * (for any data format) in order to sort on both matrix indexes and tag - * but group all blocks according to matrix indexes only. This prevents - * us from doing an 2pass out-of-core algorithm at the reducer since we - * can guarantee that the compare block (tag 0) will be the first element - * in the iterator. - * - */ -public class ResultMergeTaggedMatrixIndexes implements WritableComparable<ResultMergeTaggedMatrixIndexes> -{ - private MatrixIndexes _ix; - private byte _tag = -1; - - public ResultMergeTaggedMatrixIndexes() { - _ix = new MatrixIndexes(); - } - - public MatrixIndexes getIndexes() { - return _ix; - } - - public byte getTag() { - return _tag; - } - - public void setTag(byte tag) { - _tag = tag; - } - - @Override - public void readFields(DataInput in) throws IOException { - if( _ix == null ) - _ix = new MatrixIndexes(); - _ix.readFields(in); - _tag = in.readByte(); - } - - @Override - public void write(DataOutput out) throws IOException { - _ix.write(out); - out.writeByte(_tag); - } - - @Override - public int compareTo(ResultMergeTaggedMatrixIndexes that) { - int ret = _ix.compareTo(that._ix); - if( ret == 0 ) - ret = ((_tag == that._tag) ? 0 : - (_tag < that._tag)? -1 : 1); - return ret; - } - - @Override - public boolean equals(Object other) { - if( !(other instanceof ResultMergeTaggedMatrixIndexes) ) - return false; - ResultMergeTaggedMatrixIndexes that = (ResultMergeTaggedMatrixIndexes)other; - return (_ix.equals(that._ix) && _tag == that._tag); - } - - @Override - public int hashCode() { - throw new RuntimeException("hashCode() should never be called on instances of this class."); - } -} diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java index abd0279077..9d46947ba6 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java @@ -33,8 +33,8 @@ import org.apache.sysds.runtime.instructions.cp.IntObject; */ public abstract class TaskPartitioner { - protected long _taskSize = -1; - protected String _iterVarName = null; + protected long _taskSize = -1; + protected String _iterVarName = null; protected IntObject _fromVal = null; protected IntObject _toVal = null; protected IntObject _incrVal = null; diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java new file mode 100644 index 0000000000..66d4961104 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.controlprogram.parfor; + +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner; +import org.apache.sysds.runtime.instructions.cp.IntObject; + +public abstract class TaskPartitionerFactory +{ + public static TaskPartitioner createTaskPartitioner(PTaskPartitioner type, + IntObject from, IntObject to, IntObject incr, long taskSize, int numThreads, String iterPredVar) + { + switch( type ) { + case FIXED: + return new TaskPartitionerFixedsize(taskSize, iterPredVar, from, to, incr); + case NAIVE: + return new TaskPartitionerNaive(taskSize, iterPredVar, from, to, incr); + case STATIC: + return new TaskPartitionerStatic(taskSize, numThreads, iterPredVar, from, to, incr); + case FACTORING: + return new TaskPartitionerFactoring(taskSize, numThreads, iterPredVar, from, to, incr); + case FACTORING_CMIN: + return new TaskPartitionerFactoringCmin(taskSize, + numThreads, taskSize, iterPredVar, from, to, incr); + case FACTORING_CMAX: + return new TaskPartitionerFactoringCmax(taskSize, + numThreads, taskSize, iterPredVar, from, to, incr); + default: + throw new DMLRuntimeException("Undefined task partitioner: '"+type+"'."); + } + } +} diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java index 7704974548..3894bcff92 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java @@ -29,12 +29,12 @@ import org.apache.sysds.runtime.instructions.cp.IntObject; */ public class TaskPartitionerNaive extends TaskPartitionerFixedsize { - - public TaskPartitionerNaive( long taskSize, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal ) + public TaskPartitionerNaive( long taskSize, String iterVarName, + IntObject fromVal, IntObject toVal, IntObject incrVal ) { super(taskSize, iterVarName, fromVal, toVal, incrVal); //compute the new task size _taskSize = 1; - } + } } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java index 54ea8cbdc1..244cca320d 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java @@ -29,12 +29,12 @@ import org.apache.sysds.runtime.instructions.cp.IntObject; */ public class TaskPartitionerStatic extends TaskPartitionerFixedsize { - - public TaskPartitionerStatic( long taskSize, int numThreads, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal ) + public TaskPartitionerStatic( long taskSize, int numThreads, + String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal ) { super(taskSize, iterVarName, fromVal, toVal, incrVal); _taskSize = _numIter / numThreads; _firstnPlus1 = (int)_numIter % numThreads; - } + } } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanChecker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanChecker.java deleted file mode 100644 index 2cbf1228a9..0000000000 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanChecker.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.runtime.controlprogram.parfor.opt; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Set; - -import org.apache.sysds.hops.FunctionOp; -import org.apache.sysds.hops.Hop; -import org.apache.sysds.parser.DMLProgram; -import org.apache.sysds.parser.ForStatement; -import org.apache.sysds.parser.ForStatementBlock; -import org.apache.sysds.parser.FunctionStatement; -import org.apache.sysds.parser.FunctionStatementBlock; -import org.apache.sysds.parser.IfStatement; -import org.apache.sysds.parser.IfStatementBlock; -import org.apache.sysds.parser.StatementBlock; -import org.apache.sysds.parser.WhileStatement; -import org.apache.sysds.parser.WhileStatementBlock; -import org.apache.sysds.runtime.DMLRuntimeException; -import org.apache.sysds.runtime.controlprogram.BasicProgramBlock; -import org.apache.sysds.runtime.controlprogram.ForProgramBlock; -import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock; -import org.apache.sysds.runtime.controlprogram.IfProgramBlock; -import org.apache.sysds.runtime.controlprogram.Program; -import org.apache.sysds.runtime.controlprogram.ProgramBlock; -import org.apache.sysds.runtime.controlprogram.WhileProgramBlock; -import org.apache.sysds.runtime.instructions.Instruction; -import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction; - -public class OptTreePlanChecker -{ - - public static void checkProgramCorrectness( ProgramBlock pb, StatementBlock sb, Set<String> fnStack ) - { - Program prog = pb.getProgram(); - DMLProgram dprog = sb.getDMLProg(); - - if (pb instanceof FunctionProgramBlock && sb instanceof FunctionStatementBlock ) { - FunctionProgramBlock fpb = (FunctionProgramBlock)pb; - FunctionStatementBlock fsb = (FunctionStatementBlock)sb; - FunctionStatement fstmt = (FunctionStatement)fsb.getStatement(0); - for( int i=0; i<fpb.getChildBlocks().size(); i++ ) { - ProgramBlock pbc = fpb.getChildBlocks().get(i); - StatementBlock sbc = fstmt.getBody().get(i); - checkProgramCorrectness(pbc, sbc, fnStack); - } - } - else if (pb instanceof WhileProgramBlock && sb instanceof WhileStatementBlock) { - WhileProgramBlock wpb = (WhileProgramBlock) pb; - WhileStatementBlock wsb = (WhileStatementBlock) sb; - WhileStatement wstmt = (WhileStatement) wsb.getStatement(0); - checkHopDagCorrectness(prog, dprog, wsb.getPredicateHops(), wpb.getPredicate(), fnStack); - for( int i=0; i<wpb.getChildBlocks().size(); i++ ) { - ProgramBlock pbc = wpb.getChildBlocks().get(i); - StatementBlock sbc = wstmt.getBody().get(i); - checkProgramCorrectness(pbc, sbc, fnStack); - } - checkLinksProgramStatementBlock(wpb, wsb); - } - else if (pb instanceof IfProgramBlock && sb instanceof IfStatementBlock) { - IfProgramBlock ipb = (IfProgramBlock) pb; - IfStatementBlock isb = (IfStatementBlock) sb; - IfStatement istmt = (IfStatement) isb.getStatement(0); - checkHopDagCorrectness(prog, dprog, isb.getPredicateHops(), ipb.getPredicate(), fnStack); - for( int i=0; i<ipb.getChildBlocksIfBody().size(); i++ ) { - ProgramBlock pbc = ipb.getChildBlocksIfBody().get(i); - StatementBlock sbc = istmt.getIfBody().get(i); - checkProgramCorrectness(pbc, sbc, fnStack); - } - for( int i=0; i<ipb.getChildBlocksElseBody().size(); i++ ) { - ProgramBlock pbc = ipb.getChildBlocksElseBody().get(i); - StatementBlock sbc = istmt.getElseBody().get(i); - checkProgramCorrectness(pbc, sbc, fnStack); - } - checkLinksProgramStatementBlock(ipb, isb); - } - else if (pb instanceof ForProgramBlock && sb instanceof ForStatementBlock) { //incl parfor - ForProgramBlock fpb = (ForProgramBlock) pb; - ForStatementBlock fsb = (ForStatementBlock) sb; - ForStatement fstmt = (ForStatement) sb.getStatement(0); - checkHopDagCorrectness(prog, dprog, fsb.getFromHops(), fpb.getFromInstructions(), fnStack); - checkHopDagCorrectness(prog, dprog, fsb.getToHops(), fpb.getToInstructions(), fnStack); - checkHopDagCorrectness(prog, dprog, fsb.getIncrementHops(), fpb.getIncrementInstructions(), fnStack); - for( int i=0; i<fpb.getChildBlocks().size(); i++ ) { - ProgramBlock pbc = fpb.getChildBlocks().get(i); - StatementBlock sbc = fstmt.getBody().get(i); - checkProgramCorrectness(pbc, sbc, fnStack); - } - checkLinksProgramStatementBlock(fpb, fsb); - } - else if( pb instanceof BasicProgramBlock ) { - BasicProgramBlock bpb = (BasicProgramBlock) pb; - checkHopDagCorrectness(prog, dprog, sb.getHops(), bpb.getInstructions(), fnStack); - } - } - - private static void checkHopDagCorrectness( Program prog, DMLProgram dprog, ArrayList<Hop> roots, ArrayList<Instruction> inst, Set<String> fnStack ) { - if( roots != null ) - for( Hop hop : roots ) - checkHopDagCorrectness(prog, dprog, hop, inst, fnStack); - } - - private static void checkHopDagCorrectness( Program prog, DMLProgram dprog, Hop root, ArrayList<Instruction> inst, Set<String> fnStack ) { - //set of checks to perform - checkFunctionNames(prog, dprog, root, inst, fnStack); - } - - private static void checkLinksProgramStatementBlock( ProgramBlock pb, StatementBlock sb ) { - if( pb.getStatementBlock() != sb ) - throw new DMLRuntimeException("Links between programblocks and statementblocks are incorrect ("+pb+")."); - } - - private static void checkFunctionNames( Program prog, DMLProgram dprog, Hop root, ArrayList<Instruction> inst, Set<String> fnStack ) { - //reset visit status of dag - root.resetVisitStatus(); - - //get all function op in this dag - HashMap<String, FunctionOp> fops = new HashMap<>(); - getAllFunctionOps(root, fops); - - for( Instruction linst : inst ) - if( linst instanceof FunctionCallCPInstruction ) - { - FunctionCallCPInstruction flinst = (FunctionCallCPInstruction) linst; - String fnamespace = flinst.getNamespace(); - String fname = flinst.getFunctionName(); - String key = DMLProgram.constructFunctionKey(fnamespace, fname); - - //check 1: instruction name equal to hop name - if( !fops.containsKey(key) ) - throw new DMLRuntimeException( "Function Check: instruction and hop names differ ("+key+", "+fops.keySet()+")" ); - - //check 2: function exists - if( !prog.getFunctionProgramBlocks().containsKey(key) ) - throw new DMLRuntimeException( "Function Check: function does not exits ("+key+")" ); - - //check 3: recursive program check - FunctionProgramBlock fpb = prog.getFunctionProgramBlock(fnamespace, fname); - FunctionStatementBlock fsb = dprog.getFunctionStatementBlock(fnamespace, fname); - if( !fnStack.contains(key) ) - { - fnStack.add(key); - checkProgramCorrectness(fpb, fsb, fnStack); - fnStack.remove(key); - } - } - } - - private static void getAllFunctionOps( Hop hop, HashMap<String, FunctionOp> memo ) - { - if( hop.isVisited() ) - return; - - //process functionop - if( hop instanceof FunctionOp ) { - FunctionOp fop = (FunctionOp) hop; - memo.put(fop.getFunctionKey(), fop); - } - - //process children - for( Hop in : hop.getInput() ) - getAllFunctionOps(in, memo); - - hop.setVisited(); - } -} diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java index 6dbc952172..4a7cec8cd2 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java @@ -20,7 +20,6 @@ package org.apache.sysds.runtime.controlprogram.parfor.opt; import java.util.ArrayList; -import java.util.HashSet; import java.util.Set; import org.apache.commons.logging.Log; @@ -75,7 +74,6 @@ public class OptimizationWrapper //internal parameters public static final double PAR_FACTOR_INFRASTRUCTURE = 1.0; - private static final boolean CHECK_PLAN_CORRECTNESS = false; /** @@ -227,17 +225,6 @@ public class OptimizationWrapper //core optimize opt.optimize(sb, pb, tree, est, numRuns, ec); LOG.debug("ParFOR Opt: Optimized plan (after optimization): \n" + tree.explain(false)); - - //assert plan correctness - if( CHECK_PLAN_CORRECTNESS && LOG.isDebugEnabled() ) { - try{ - OptTreePlanChecker.checkProgramCorrectness(pb, sb, new HashSet<String>()); - LOG.debug("ParFOR Opt: Checked plan and program correctness."); - } - catch(Exception ex) { - throw new DMLRuntimeException("Failed to check program correctness.", ex); - } - } long ltime = (long) time.stop(); LOG.trace("ParFOR Opt: Optimized plan in "+ltime+"ms."); diff --git a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockInputFormat.java b/src/main/java/org/apache/sysds/runtime/util/BinaryBlockInputFormat.java deleted file mode 100644 index d9d8fc080b..0000000000 --- a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockInputFormat.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.runtime.util; - -import java.io.IOException; - -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.apache.sysds.runtime.matrix.data.MatrixIndexes; -import org.apache.hadoop.mapred.FileSplit; - -/** - * Custom binary block input format to return the custom record reader. - * <p> - * NOTE: Not used by default. - * <p> - * NOTE: Used for performance debugging of binary block HDFS reads. - */ -public class BinaryBlockInputFormat extends SequenceFileInputFormat<MatrixIndexes,MatrixBlock> -{ - @Override - public RecordReader<MatrixIndexes, MatrixBlock> getRecordReader(InputSplit split, JobConf job, Reporter reporter) - throws IOException - { - return new BinaryBlockRecordReader(job, (FileSplit)split); - } -} \ No newline at end of file diff --git a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockRecordReader.java b/src/main/java/org/apache/sysds/runtime/util/BinaryBlockRecordReader.java deleted file mode 100644 index ddae00d721..0000000000 --- a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockRecordReader.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.runtime.util; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.SequenceFileRecordReader; -import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.apache.sysds.runtime.matrix.data.MatrixIndexes; - -/** - * Custom record reader for binary block. Currently its only purpose is to allow for - * detailed profiling of overall read time (io, deserialize, decompress). - * - * NOTE: not used by default. - */ -public class BinaryBlockRecordReader extends SequenceFileRecordReader<MatrixIndexes,MatrixBlock> -{ - //private long _time = 0; - - public BinaryBlockRecordReader(Configuration conf, FileSplit split) - throws IOException - { - super(conf, split); - - } - - @Override - public synchronized boolean next(MatrixIndexes key, MatrixBlock value) - throws IOException - { - //long t0 = System.nanoTime(); - boolean ret = super.next(key, value); - //long t1 = System.nanoTime(); - - //_time+=(t1-t0); - - return ret; - } - - @Override - public synchronized void close() - throws IOException - { - //in milliseconds. - //System.out.println(_time/1000000); - super.close(); - } -} diff --git a/src/main/java/org/apache/sysds/utils/SystemDSLoaderUtils.java b/src/main/java/org/apache/sysds/utils/SystemDSLoaderUtils.java deleted file mode 100644 index 67761b414f..0000000000 --- a/src/main/java/org/apache/sysds/utils/SystemDSLoaderUtils.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.utils; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.URL; -import java.net.URLClassLoader; -import java.io.File; -import java.io.IOException; - -public class SystemDSLoaderUtils { - - public void loadSystemDS(String filePath) - throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, IOException { - URL url = new File(filePath).toURI().toURL(); - try( URLClassLoader classLoader = (URLClassLoader)ClassLoader.getSystemClassLoader() ) { - Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); - method.setAccessible(true); - method.invoke(classLoader, url); - } - } - -} diff --git a/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java b/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java index fabff5a459..581ff0d405 100644 --- a/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java +++ b/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java @@ -19,10 +19,6 @@ package org.apache.sysds.test.component.misc; -import java.io.IOException; -import java.net.URL; -import java.util.Enumeration; - import org.apache.sysds.common.Types.OpOp1; import org.apache.sysds.common.Types.OpOp2; import org.apache.sysds.common.Types.OpOp3; diff --git a/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java new file mode 100644 index 0000000000..4e3acbe4c6 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.parfor; + +import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.controlprogram.parfor.Task; +import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitioner; +import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactory; +import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysds.runtime.instructions.cp.IntObject; +import org.apache.sysds.runtime.util.CommonThreadPool; +import org.junit.Assert; +import org.junit.Test; + +public class TaskPartitionerTest { + + @Test + public void testNaive() { + testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, PTaskPartitioner.NAIVE); + } + + @Test + public void testStatic() { + testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, PTaskPartitioner.STATIC); + } + + @Test + public void testFixed() { + testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, PTaskPartitioner.FIXED); + } + + @Test + public void testFactoring() { + testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, PTaskPartitioner.FACTORING); + } + + @Test + public void testFactoring2() { + testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, PTaskPartitioner.FACTORING_CMIN); + } + + @Test + public void testFactoring3() { + testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, PTaskPartitioner.FACTORING_CMAX); + } + + private void testTaskPartitioner(int numTasks, PTaskPartitioner type) { + LocalTaskQueue<Task> queue = new LocalTaskQueue<>(); + TaskPartitioner partitioner = TaskPartitionerFactory.createTaskPartitioner( + type, new IntObject(1), new IntObject(numTasks), new IntObject(1), + numTasks, InfrastructureAnalyzer.getLocalParallelism(), "i"); + //asynchronous task creation + CommonThreadPool.get().submit(()->partitioner.createTasks(queue)); + //consume tasks and check serialization + Task t = null; + try { + while((t = queue.dequeueTask())!=LocalTaskQueue.NO_MORE_TASKS) { + Task ts1 = Task.parseCompactString(t.toCompactString()); + Task ts2 = Task.parseCompactString(t.toCompactString(10)); + Assert.assertEquals(t.toString(), ts1.toString()); + Assert.assertEquals(t.toString(), ts2.toString()); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java index 97b820d2b1..ebaf24135c 100644 --- a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java +++ b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java @@ -26,7 +26,6 @@ import org.apache.sysds.test.TestConfiguration; import org.apache.sysds.test.TestUtils; import org.junit.Test; -import java.util.Arrays; import java.util.HashMap; public class BuiltinRaJoinTest extends AutomatedTestBase diff --git a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRecursiveFunctionTest.java b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRecursiveFunctionTest.java new file mode 100644 index 0000000000..d5dffb8944 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRecursiveFunctionTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.parfor.misc; + +import java.util.HashMap; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.sysds.common.Types.ExecMode; +import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; + +public class ParForRecursiveFunctionTest extends AutomatedTestBase +{ + private final static String TEST_NAME1 = "parfor_recursive"; + private final static String TEST_DIR = "functions/parfor/"; + private final static String TEST_CLASS_DIR = TEST_DIR + ParForRecursiveFunctionTest.class.getSimpleName() + "/"; + private final static double eps = 1e-10; + + private final static int rows = 20; + private final static int cols = 10; + private final static double sparsity = 1.0; + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[]{"Rout"})); + } + + @Test + public void testParForCP() { + runParforTest(TEST_NAME1, ExecMode.SINGLE_NODE); + } + + @Test + public void testParForHybrid() { + runParforTest(TEST_NAME1, ExecMode.HYBRID); + } + + private void runParforTest( String TEST_NAME, ExecMode type ) + { + TestConfiguration config = getTestConfiguration(TEST_NAME); + config.addVariable("rows", rows); + config.addVariable("cols", cols); + loadTestConfiguration(config); + ExecMode oldExec = setExecMode(type); + + try { + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + programArgs = new String[]{"-explain","-stats","-args", input("V"), output("R") }; + + double[][] V = getRandomMatrix(rows, cols, 4, 4, sparsity, 3); + writeInputMatrixWithMTD("V", V, true); + + boolean exceptionExpected = false; + runTest(true, exceptionExpected, null, -1); + + //compare matrices + HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R"); + Assert.assertEquals(5d*rows*cols, dmlfile.get(new CellIndex(1,1)), eps); + } + finally { + resetExecMode(oldExec); + } + } +} diff --git a/src/test/scripts/functions/parfor/parfor_recursive.dml b/src/test/scripts/functions/parfor/parfor_recursive.dml new file mode 100644 index 0000000000..42b6c226a2 --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_recursive.dml @@ -0,0 +1,37 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +fun = function(Matrix[Double] A) + return(Matrix[Double] B) +{ + B = matrix(0, nrow(A), ncol(A)) + parfor(i in 1:nrow(A)) { + if( as.scalar(A[1,1]) < 5 ) + B[i,] = fun(A[i,]+1); + else + B[i,] = A[i, ] + } +} + +A = read($1); +B = as.matrix(sum(fun(A))); +write(B, $2); +