This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 78b340fe9e [MINOR] Additional parfor tests, and minor cleanups of 
unused code
78b340fe9e is described below

commit 78b340fe9eb84adbc1e236f543366b5d53653faf
Author: Matthias Boehm <mboe...@gmail.com>
AuthorDate: Sat Aug 17 13:42:11 2024 +0200

    [MINOR] Additional parfor tests, and minor cleanups of unused code
---
 src/main/java/org/apache/sysds/hops/LiteralOp.java |   2 +-
 .../sysds/parser/BuiltinFunctionExpression.java    |   1 +
 .../runtime/controlprogram/ParForProgramBlock.java |  47 +-----
 .../parfor/ResultMergeRemoteGrouping.java          |  42 -----
 .../parfor/ResultMergeRemoteSorting.java           |  46 -----
 .../parfor/ResultMergeTaggedMatrixIndexes.java     |  94 -----------
 .../controlprogram/parfor/TaskPartitioner.java     |   4 +-
 .../parfor/TaskPartitionerFactory.java             |  50 ++++++
 .../parfor/TaskPartitionerNaive.java               |   6 +-
 .../parfor/TaskPartitionerStatic.java              |   6 +-
 .../parfor/opt/OptTreePlanChecker.java             | 185 ---------------------
 .../parfor/opt/OptimizationWrapper.java            |  13 --
 .../sysds/runtime/util/BinaryBlockInputFormat.java |  48 ------
 .../runtime/util/BinaryBlockRecordReader.java      |  68 --------
 .../apache/sysds/utils/SystemDSLoaderUtils.java    |  41 -----
 .../sysds/test/component/misc/OpTypeTest.java      |   4 -
 .../test/component/parfor/TaskPartitionerTest.java |  85 ++++++++++
 .../functions/builtin/part2/BuiltinRaJoinTest.java |   1 -
 .../parfor/misc/ParForRecursiveFunctionTest.java   |  84 ++++++++++
 .../scripts/functions/parfor/parfor_recursive.dml  |  37 +++++
 20 files changed, 270 insertions(+), 594 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/LiteralOp.java 
b/src/main/java/org/apache/sysds/hops/LiteralOp.java
index 9c44ef1187..1d2911f1fa 100644
--- a/src/main/java/org/apache/sysds/hops/LiteralOp.java
+++ b/src/main/java/org/apache/sysds/hops/LiteralOp.java
@@ -254,8 +254,8 @@ public class LiteralOp extends Hop
                        case FP64:
                                return String.valueOf(value_double);
                        case STRING:
+                       case HASH32:
                        case HASH64:
-                               return value_string;
                        case CHARACTER:
                                return value_string;
                        case UNKNOWN:
diff --git 
a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java 
b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
index ec9b4a4bbd..de10e090f5 100644
--- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
@@ -965,6 +965,7 @@ public class BuiltinFunctionExpression extends 
DataIdentifier {
                                case CHARACTER:
                                case FP64:
                                case FP32:
+                               case HASH32:
                                case HASH64: //default
                                        output.setValueType(ValueType.FP64);
                                        break;
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index c4c75c35e7..01896cb73e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -75,12 +75,7 @@ import 
org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalMemory;
 import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeRemoteSpark;
 import org.apache.sysds.runtime.controlprogram.parfor.Task;
 import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitioner;
-import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactoring;
-import 
org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactoringCmax;
-import 
org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactoringCmin;
-import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFixedsize;
-import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerNaive;
-import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerStatic;
+import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactory;
 import org.apache.sysds.runtime.controlprogram.parfor.opt.OptTreeConverter;
 import org.apache.sysds.runtime.controlprogram.parfor.opt.OptimizationWrapper;
 import org.apache.sysds.runtime.controlprogram.parfor.opt.OptimizerRuleBased;
@@ -1279,43 +1274,9 @@ public class ParForProgramBlock extends ForProgramBlock {
         * @param incr ?
         * @return task partitioner
         */
-       private TaskPartitioner createTaskPartitioner( IntObject from, 
IntObject to, IntObject incr ) 
-       {
-               TaskPartitioner tp;
-               
-               switch( _taskPartitioner ) {
-                       case FIXED:
-                               tp = new TaskPartitionerFixedsize(
-                                       _taskSize, _iterPredVar, from, to, 
incr);
-                               break;
-                       case NAIVE:
-                               tp = new TaskPartitionerNaive(
-                                       _taskSize, _iterPredVar, from, to, 
incr);
-                               break;
-                       case STATIC:
-                               tp = new TaskPartitionerStatic(
-                                       _taskSize, _numThreads, _iterPredVar, 
from, to, incr);
-                               break;
-                       case FACTORING:
-                               tp = new TaskPartitionerFactoring(
-                                       _taskSize,_numThreads, _iterPredVar, 
from, to, incr);
-                               break;
-                       case FACTORING_CMIN:
-                               //for constrained factoring the tasksize is 
used as the minimum constraint
-                               tp = new 
TaskPartitionerFactoringCmin(_taskSize,_numThreads, 
-                                       _taskSize, _iterPredVar, from, to, 
incr);
-                               break;
-
-                       case FACTORING_CMAX:
-                               //for constrained factoring the tasksize is 
used as the minimum constraint
-                               tp = new 
TaskPartitionerFactoringCmax(_taskSize,_numThreads, 
-                                       _taskSize, _iterPredVar, from, to, 
incr);
-                               break;  
-                       default:
-                               throw new DMLRuntimeException("Undefined task 
partitioner: '"+_taskPartitioner+"'.");
-               }
-               
-               return tp;
+       private TaskPartitioner createTaskPartitioner( IntObject from, 
IntObject to, IntObject incr ) {
+               return TaskPartitionerFactory.createTaskPartitioner(
+                       _taskPartitioner, from, to, incr, _taskSize, 
_numThreads, _iterPredVar);
        }
        
        /**
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
deleted file mode 100644
index ccf27c9a8f..0000000000
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.parfor;
-
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-public class ResultMergeRemoteGrouping extends WritableComparator
-{
-       protected ResultMergeRemoteGrouping() {
-               super(ResultMergeTaggedMatrixIndexes.class,true);
-       }
-       
-       @Override
-       @SuppressWarnings("rawtypes")
-       public int compare(WritableComparable k1, WritableComparable k2) 
-       {
-               ResultMergeTaggedMatrixIndexes key1 = 
(ResultMergeTaggedMatrixIndexes)k1;
-               ResultMergeTaggedMatrixIndexes key2 = 
(ResultMergeTaggedMatrixIndexes)k2;
-               
-               //group by matrix indexes only (including all tags)
-               return key1.getIndexes().compareTo(key2.getIndexes());
-       }
-}
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
deleted file mode 100644
index 5d44903958..0000000000
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.parfor;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-public class ResultMergeRemoteSorting extends WritableComparator
-{
-       protected ResultMergeRemoteSorting() {
-               super(ResultMergeTaggedMatrixIndexes.class, true);
-       }
-       
-       @Override
-       @SuppressWarnings("rawtypes")
-       public int compare(WritableComparable k1, WritableComparable k2) 
-       {
-               ResultMergeTaggedMatrixIndexes key1 = 
(ResultMergeTaggedMatrixIndexes)k1;
-               ResultMergeTaggedMatrixIndexes key2 = 
(ResultMergeTaggedMatrixIndexes)k2;
-
-               int ret = key1.getIndexes().compareTo(key2.getIndexes());
-               if( ret == 0 ) //same indexes, secondary sort
-               {
-                       ret = ((key1.getTag() == key2.getTag()) ? 0 : 
-                               (key1.getTag() < key2.getTag())? -1 : 1);
-               }       
-               return ret;
-       }
-}
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
deleted file mode 100644
index c0437dce35..0000000000
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.parfor;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
-
-/**
- * This class serves as composite key for the remote result merge job
- * (for any data format) in order to sort on both matrix indexes and tag
- * but group all blocks according to matrix indexes only. This prevents
- * us from doing an 2pass out-of-core algorithm at the reducer since we
- * can guarantee that the compare block (tag 0) will be the first element
- * in the iterator.
- * 
- */
-public class ResultMergeTaggedMatrixIndexes implements 
WritableComparable<ResultMergeTaggedMatrixIndexes>
-{
-       private MatrixIndexes _ix;
-       private byte _tag = -1;
-       
-       public ResultMergeTaggedMatrixIndexes() {
-               _ix = new MatrixIndexes();
-       }
-
-       public MatrixIndexes getIndexes() {
-               return _ix;
-       }
-       
-       public byte getTag() {
-               return _tag;
-       }
-       
-       public void setTag(byte tag) {
-               _tag = tag;
-       }
-
-       @Override
-       public void readFields(DataInput in) throws IOException {
-               if( _ix == null )
-                       _ix = new MatrixIndexes();
-               _ix.readFields(in);
-               _tag = in.readByte();
-       }
-
-       @Override
-       public void write(DataOutput out) throws IOException {
-               _ix.write(out);
-               out.writeByte(_tag);
-       }
-
-       @Override
-       public int compareTo(ResultMergeTaggedMatrixIndexes that) {
-               int ret = _ix.compareTo(that._ix);
-               if( ret == 0 )
-                       ret = ((_tag == that._tag) ? 0 : 
-                               (_tag < that._tag)? -1 : 1);
-               return ret;
-       }
-       
-       @Override
-       public boolean equals(Object other) {
-               if( !(other instanceof ResultMergeTaggedMatrixIndexes) )
-                       return false;
-               ResultMergeTaggedMatrixIndexes that = 
(ResultMergeTaggedMatrixIndexes)other;
-               return (_ix.equals(that._ix) && _tag == that._tag);
-       }
-       
-       @Override
-       public int hashCode() {
-               throw new RuntimeException("hashCode() should never be called 
on instances of this class.");
-       }
-}
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java
index abd0279077..9d46947ba6 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java
@@ -33,8 +33,8 @@ import org.apache.sysds.runtime.instructions.cp.IntObject;
  */
 public abstract class TaskPartitioner 
 {      
-       protected long           _taskSize     = -1;    
-       protected String                 _iterVarName  = null;
+       protected long           _taskSize     = -1;
+       protected String         _iterVarName  = null;
        protected IntObject      _fromVal      = null;
        protected IntObject      _toVal        = null;
        protected IntObject      _incrVal      = null;
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java
new file mode 100644
index 0000000000..66d4961104
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.parfor;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import 
org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner;
+import org.apache.sysds.runtime.instructions.cp.IntObject;
+
+public abstract class TaskPartitionerFactory 
+{
+       public static TaskPartitioner createTaskPartitioner(PTaskPartitioner 
type, 
+               IntObject from, IntObject to, IntObject incr, long taskSize, 
int numThreads, String iterPredVar) 
+       {
+               switch( type ) {
+                       case FIXED:
+                               return new TaskPartitionerFixedsize(taskSize, 
iterPredVar, from, to, incr);
+                       case NAIVE:
+                               return new TaskPartitionerNaive(taskSize, 
iterPredVar, from, to, incr);
+                       case STATIC:
+                               return new TaskPartitionerStatic(taskSize, 
numThreads, iterPredVar, from, to, incr);
+                       case FACTORING:
+                               return new TaskPartitionerFactoring(taskSize, 
numThreads, iterPredVar, from, to, incr);
+                       case FACTORING_CMIN:
+                               return new 
TaskPartitionerFactoringCmin(taskSize,
+                                       numThreads, taskSize, iterPredVar, 
from, to, incr);
+                       case FACTORING_CMAX:
+                               return new 
TaskPartitionerFactoringCmax(taskSize,
+                                       numThreads, taskSize, iterPredVar, 
from, to, incr);
+                       default:
+                               throw new DMLRuntimeException("Undefined task 
partitioner: '"+type+"'.");
+               }
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java
index 7704974548..3894bcff92 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java
@@ -29,12 +29,12 @@ import org.apache.sysds.runtime.instructions.cp.IntObject;
  */
 public class TaskPartitionerNaive extends TaskPartitionerFixedsize
 {
-       
-       public TaskPartitionerNaive( long taskSize, String iterVarName, 
IntObject fromVal, IntObject toVal, IntObject incrVal ) 
+       public TaskPartitionerNaive( long taskSize, String iterVarName,
+               IntObject fromVal, IntObject toVal, IntObject incrVal ) 
        {
                super(taskSize, iterVarName, fromVal, toVal, incrVal);
        
                //compute the new task size
                _taskSize = 1;
-       }       
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java
index 54ea8cbdc1..244cca320d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java
@@ -29,12 +29,12 @@ import org.apache.sysds.runtime.instructions.cp.IntObject;
  */
 public class TaskPartitionerStatic extends TaskPartitionerFixedsize
 {
-       
-       public TaskPartitionerStatic( long taskSize, int numThreads, String 
iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal ) 
+       public TaskPartitionerStatic( long taskSize, int numThreads,
+               String iterVarName, IntObject fromVal, IntObject toVal, 
IntObject incrVal ) 
        {
                super(taskSize, iterVarName, fromVal, toVal, incrVal);
        
                _taskSize = _numIter / numThreads;
                _firstnPlus1 = (int)_numIter % numThreads;
-       }       
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanChecker.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanChecker.java
deleted file mode 100644
index 2cbf1228a9..0000000000
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanChecker.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.parfor.opt;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Set;
-
-import org.apache.sysds.hops.FunctionOp;
-import org.apache.sysds.hops.Hop;
-import org.apache.sysds.parser.DMLProgram;
-import org.apache.sysds.parser.ForStatement;
-import org.apache.sysds.parser.ForStatementBlock;
-import org.apache.sysds.parser.FunctionStatement;
-import org.apache.sysds.parser.FunctionStatementBlock;
-import org.apache.sysds.parser.IfStatement;
-import org.apache.sysds.parser.IfStatementBlock;
-import org.apache.sysds.parser.StatementBlock;
-import org.apache.sysds.parser.WhileStatement;
-import org.apache.sysds.parser.WhileStatementBlock;
-import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
-import org.apache.sysds.runtime.controlprogram.ForProgramBlock;
-import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
-import org.apache.sysds.runtime.controlprogram.IfProgramBlock;
-import org.apache.sysds.runtime.controlprogram.Program;
-import org.apache.sysds.runtime.controlprogram.ProgramBlock;
-import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
-import org.apache.sysds.runtime.instructions.Instruction;
-import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
-
-public class OptTreePlanChecker 
-{
-
-       public static void checkProgramCorrectness( ProgramBlock pb, 
StatementBlock sb, Set<String> fnStack ) 
-       {
-               Program prog = pb.getProgram();
-               DMLProgram dprog = sb.getDMLProg();
-               
-               if (pb instanceof FunctionProgramBlock && sb instanceof 
FunctionStatementBlock ) {
-                       FunctionProgramBlock fpb = (FunctionProgramBlock)pb;
-                       FunctionStatementBlock fsb = (FunctionStatementBlock)sb;
-                       FunctionStatement fstmt = 
(FunctionStatement)fsb.getStatement(0);
-                       for( int i=0; i<fpb.getChildBlocks().size(); i++ ) {
-                               ProgramBlock pbc = fpb.getChildBlocks().get(i);
-                               StatementBlock sbc = fstmt.getBody().get(i);
-                               checkProgramCorrectness(pbc, sbc, fnStack);
-                       }
-               }
-               else if (pb instanceof WhileProgramBlock && sb instanceof 
WhileStatementBlock) {
-                       WhileProgramBlock wpb = (WhileProgramBlock) pb;
-                       WhileStatementBlock wsb = (WhileStatementBlock) sb;
-                       WhileStatement wstmt = (WhileStatement) 
wsb.getStatement(0);
-                       checkHopDagCorrectness(prog, dprog, 
wsb.getPredicateHops(), wpb.getPredicate(), fnStack);
-                       for( int i=0; i<wpb.getChildBlocks().size(); i++ ) {
-                               ProgramBlock pbc = wpb.getChildBlocks().get(i);
-                               StatementBlock sbc = wstmt.getBody().get(i);
-                               checkProgramCorrectness(pbc, sbc, fnStack);
-                       }
-                       checkLinksProgramStatementBlock(wpb, wsb);
-               }
-               else if (pb instanceof IfProgramBlock && sb instanceof 
IfStatementBlock) {
-                       IfProgramBlock ipb = (IfProgramBlock) pb;
-                       IfStatementBlock isb = (IfStatementBlock) sb;
-                       IfStatement istmt = (IfStatement) isb.getStatement(0);
-                       checkHopDagCorrectness(prog, dprog, 
isb.getPredicateHops(), ipb.getPredicate(), fnStack);
-                       for( int i=0; i<ipb.getChildBlocksIfBody().size(); i++ 
) {
-                               ProgramBlock pbc = 
ipb.getChildBlocksIfBody().get(i);
-                               StatementBlock sbc = istmt.getIfBody().get(i);
-                               checkProgramCorrectness(pbc, sbc, fnStack);
-                       }
-                       for( int i=0; i<ipb.getChildBlocksElseBody().size(); 
i++ ) {
-                               ProgramBlock pbc = 
ipb.getChildBlocksElseBody().get(i);
-                               StatementBlock sbc = istmt.getElseBody().get(i);
-                               checkProgramCorrectness(pbc, sbc, fnStack);
-                       }
-                       checkLinksProgramStatementBlock(ipb, isb);
-               }
-               else if (pb instanceof ForProgramBlock && sb instanceof 
ForStatementBlock) { //incl parfor
-                       ForProgramBlock fpb = (ForProgramBlock) pb;
-                       ForStatementBlock fsb = (ForStatementBlock) sb;
-                       ForStatement fstmt = (ForStatement) sb.getStatement(0);
-                       checkHopDagCorrectness(prog, dprog, fsb.getFromHops(), 
fpb.getFromInstructions(), fnStack);
-                       checkHopDagCorrectness(prog, dprog, fsb.getToHops(), 
fpb.getToInstructions(), fnStack);
-                       checkHopDagCorrectness(prog, dprog, 
fsb.getIncrementHops(), fpb.getIncrementInstructions(), fnStack);
-                       for( int i=0; i<fpb.getChildBlocks().size(); i++ ) {
-                               ProgramBlock pbc = fpb.getChildBlocks().get(i);
-                               StatementBlock sbc = fstmt.getBody().get(i);
-                               checkProgramCorrectness(pbc, sbc, fnStack);
-                       }
-                       checkLinksProgramStatementBlock(fpb, fsb);
-               }
-               else if( pb instanceof BasicProgramBlock ) {
-                       BasicProgramBlock bpb = (BasicProgramBlock) pb;
-                       checkHopDagCorrectness(prog, dprog, sb.getHops(), 
bpb.getInstructions(), fnStack);
-               }
-       }
-
-       private static void checkHopDagCorrectness( Program prog, DMLProgram 
dprog, ArrayList<Hop> roots, ArrayList<Instruction> inst, Set<String> fnStack ) 
{
-               if( roots != null )
-                       for( Hop hop : roots )
-                               checkHopDagCorrectness(prog, dprog, hop, inst, 
fnStack);
-       }
-
-       private static void checkHopDagCorrectness( Program prog, DMLProgram 
dprog, Hop root, ArrayList<Instruction> inst, Set<String> fnStack ) {
-               //set of checks to perform
-               checkFunctionNames(prog, dprog, root, inst, fnStack);
-       }
-
-       private static void checkLinksProgramStatementBlock( ProgramBlock pb, 
StatementBlock sb ) {
-               if( pb.getStatementBlock() != sb )
-                       throw new DMLRuntimeException("Links between 
programblocks and statementblocks are incorrect ("+pb+").");
-       }
-
-       private static void checkFunctionNames( Program prog, DMLProgram dprog, 
Hop root, ArrayList<Instruction> inst, Set<String> fnStack ) {
-               //reset visit status of dag
-               root.resetVisitStatus();
-               
-               //get all function op in this dag
-               HashMap<String, FunctionOp> fops = new HashMap<>();
-               getAllFunctionOps(root, fops);
-               
-               for( Instruction linst : inst )
-                       if( linst instanceof FunctionCallCPInstruction )
-                       {
-                               FunctionCallCPInstruction flinst = 
(FunctionCallCPInstruction) linst;
-                               String fnamespace = flinst.getNamespace();
-                               String fname = flinst.getFunctionName();
-                               String key = 
DMLProgram.constructFunctionKey(fnamespace, fname);
-                               
-                               //check 1: instruction name equal to hop name
-                               if( !fops.containsKey(key) )
-                                       throw new DMLRuntimeException( 
"Function Check: instruction and hop names differ ("+key+", "+fops.keySet()+")" 
);
-                               
-                               //check 2: function exists
-                               if( 
!prog.getFunctionProgramBlocks().containsKey(key) )
-                                       throw new DMLRuntimeException( 
"Function Check: function does not exits ("+key+")" );
-       
-                               //check 3: recursive program check
-                               FunctionProgramBlock fpb = 
prog.getFunctionProgramBlock(fnamespace, fname);
-                               FunctionStatementBlock fsb = 
dprog.getFunctionStatementBlock(fnamespace, fname);
-                               if( !fnStack.contains(key) )
-                               {
-                                       fnStack.add(key);
-                                       checkProgramCorrectness(fpb, fsb, 
fnStack);
-                                       fnStack.remove(key);
-                               }
-                       }
-       }
-
-       private static void getAllFunctionOps( Hop hop, HashMap<String, 
FunctionOp> memo )
-       {
-               if( hop.isVisited() )
-                       return;
-               
-               //process functionop
-               if( hop instanceof FunctionOp ) {
-                       FunctionOp fop = (FunctionOp) hop;
-                       memo.put(fop.getFunctionKey(), fop);
-               }
-               
-               //process children
-               for( Hop in : hop.getInput() )
-                       getAllFunctionOps(in, memo);
-               
-               hop.setVisited();
-       }
-}
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
index 6dbc952172..4a7cec8cd2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
@@ -20,7 +20,6 @@
 package org.apache.sysds.runtime.controlprogram.parfor.opt;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -75,7 +74,6 @@ public class OptimizationWrapper
        
        //internal parameters
        public static final double PAR_FACTOR_INFRASTRUCTURE = 1.0;
-       private static final boolean CHECK_PLAN_CORRECTNESS = false;
 
 
        /**
@@ -227,17 +225,6 @@ public class OptimizationWrapper
                //core optimize
                opt.optimize(sb, pb, tree, est, numRuns, ec);
                LOG.debug("ParFOR Opt: Optimized plan (after optimization): \n" 
+ tree.explain(false));
-               
-               //assert plan correctness
-               if( CHECK_PLAN_CORRECTNESS && LOG.isDebugEnabled() ) {
-                       try{
-                               OptTreePlanChecker.checkProgramCorrectness(pb, 
sb, new HashSet<String>());
-                               LOG.debug("ParFOR Opt: Checked plan and program 
correctness.");
-                       }
-                       catch(Exception ex) {
-                               throw new DMLRuntimeException("Failed to check 
program correctness.", ex);
-                       }
-               }
 
                long ltime = (long) time.stop();
                LOG.trace("ParFOR Opt: Optimized plan in "+ltime+"ms.");
diff --git 
a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockInputFormat.java 
b/src/main/java/org/apache/sysds/runtime/util/BinaryBlockInputFormat.java
deleted file mode 100644
index d9d8fc080b..0000000000
--- a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockInputFormat.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.util;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
-import org.apache.hadoop.mapred.FileSplit;
-
-/**
- * Custom binary block input format to return the custom record reader.
- * <p>
- * NOTE: Not used by default.
- * <p>
- * NOTE: Used for performance debugging of binary block HDFS reads.
- */
-public class BinaryBlockInputFormat extends 
SequenceFileInputFormat<MatrixIndexes,MatrixBlock>
-{
-       @Override
-       public RecordReader<MatrixIndexes, MatrixBlock> 
getRecordReader(InputSplit split, JobConf job, Reporter reporter)
-               throws IOException 
-       {
-               return new BinaryBlockRecordReader(job, (FileSplit)split);
-       }
-}
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockRecordReader.java 
b/src/main/java/org/apache/sysds/runtime/util/BinaryBlockRecordReader.java
deleted file mode 100644
index ddae00d721..0000000000
--- a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockRecordReader.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.util;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
-
-/**
- * Custom record reader for binary block. Currently its only purpose is to 
allow for
- * detailed profiling of overall read time (io, deserialize, decompress).
- * 
- * NOTE: not used by default.
- */
-public class BinaryBlockRecordReader extends 
SequenceFileRecordReader<MatrixIndexes,MatrixBlock>
-{
-       //private long _time = 0;
-       
-       public BinaryBlockRecordReader(Configuration conf, FileSplit split)
-               throws IOException 
-       {
-               super(conf, split);
-               
-       }
-       
-       @Override
-       public synchronized boolean next(MatrixIndexes key, MatrixBlock value)
-               throws IOException 
-       {
-               //long t0 = System.nanoTime();          
-               boolean ret = super.next(key, value);           
-               //long t1 = System.nanoTime();
-               
-               //_time+=(t1-t0);
-               
-               return ret;
-       }
-
-       @Override
-       public synchronized void close() 
-               throws IOException 
-       {               
-               //in milliseconds.
-               //System.out.println(_time/1000000);
-               super.close();
-       }
-}
diff --git a/src/main/java/org/apache/sysds/utils/SystemDSLoaderUtils.java 
b/src/main/java/org/apache/sysds/utils/SystemDSLoaderUtils.java
deleted file mode 100644
index 67761b414f..0000000000
--- a/src/main/java/org/apache/sysds/utils/SystemDSLoaderUtils.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.utils;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL; 
-import java.net.URLClassLoader;
-import java.io.File;
-import java.io.IOException;
-
-public class SystemDSLoaderUtils  {
-       
-       public void loadSystemDS(String filePath) 
-               throws NoSuchMethodException, SecurityException, 
IllegalAccessException, IllegalArgumentException, InvocationTargetException, 
IOException {
-               URL url = new File(filePath).toURI().toURL();
-               try( URLClassLoader classLoader = 
(URLClassLoader)ClassLoader.getSystemClassLoader() ) {
-                       Method method = 
URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
-                       method.setAccessible(true);
-                       method.invoke(classLoader, url);
-               }
-       }
-
-}
diff --git a/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java 
b/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java
index fabff5a459..581ff0d405 100644
--- a/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java
+++ b/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java
@@ -19,10 +19,6 @@
 
 package org.apache.sysds.test.component.misc;
 
-import java.io.IOException;
-import java.net.URL;
-import java.util.Enumeration;
-
 import org.apache.sysds.common.Types.OpOp1;
 import org.apache.sysds.common.Types.OpOp2;
 import org.apache.sysds.common.Types.OpOp3;
diff --git 
a/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java 
b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
new file mode 100644
index 0000000000..4e3acbe4c6
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.parfor;
+
+import 
org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
+import org.apache.sysds.runtime.controlprogram.parfor.Task;
+import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitioner;
+import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactory;
+import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysds.runtime.instructions.cp.IntObject;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TaskPartitionerTest {
+
+       @Test
+       public void testNaive() {
+               testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, 
PTaskPartitioner.NAIVE);
+       }
+       
+       @Test
+       public void testStatic() {
+               testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, 
PTaskPartitioner.STATIC);
+       }
+       
+       @Test
+       public void testFixed() {
+               testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, 
PTaskPartitioner.FIXED);
+       }
+       
+       @Test
+       public void testFactoring() {
+               testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, 
PTaskPartitioner.FACTORING);
+       }
+       
+       @Test
+       public void testFactoring2() {
+               testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, 
PTaskPartitioner.FACTORING_CMIN);
+       }
+       
+       @Test
+       public void testFactoring3() {
+               testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE, 
PTaskPartitioner.FACTORING_CMAX);
+       }
+       
+       private void testTaskPartitioner(int numTasks, PTaskPartitioner type) {
+               LocalTaskQueue<Task> queue = new LocalTaskQueue<>();
+               TaskPartitioner partitioner = 
TaskPartitionerFactory.createTaskPartitioner(
+                       type, new IntObject(1), new IntObject(numTasks), new 
IntObject(1),
+                       numTasks, InfrastructureAnalyzer.getLocalParallelism(), 
"i");
+               //asynchronous task creation
+               
CommonThreadPool.get().submit(()->partitioner.createTasks(queue));
+               //consume tasks and check serialization
+               Task t = null;
+               try {
+                       while((t = 
queue.dequeueTask())!=LocalTaskQueue.NO_MORE_TASKS) {
+                               Task ts1 = 
Task.parseCompactString(t.toCompactString());
+                               Task ts2 = 
Task.parseCompactString(t.toCompactString(10));
+                               Assert.assertEquals(t.toString(), 
ts1.toString());
+                               Assert.assertEquals(t.toString(), 
ts2.toString());
+                       }
+               } catch (InterruptedException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
 
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
index 97b820d2b1..ebaf24135c 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
@@ -26,7 +26,6 @@ import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 
 public class BuiltinRaJoinTest extends AutomatedTestBase
diff --git 
a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRecursiveFunctionTest.java
 
b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRecursiveFunctionTest.java
new file mode 100644
index 0000000000..d5dffb8944
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRecursiveFunctionTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.parfor.misc;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+
+public class ParForRecursiveFunctionTest extends AutomatedTestBase
+{
+       private final static String TEST_NAME1 = "parfor_recursive";
+       private final static String TEST_DIR = "functions/parfor/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
ParForRecursiveFunctionTest.class.getSimpleName() + "/";
+       private final static double eps = 1e-10;
+       
+       private final static int rows = 20;
+       private final static int cols = 10;
+       private final static double sparsity = 1.0;
+       
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[]{"Rout"}));
+       }
+
+       @Test
+       public void testParForCP() {
+               runParforTest(TEST_NAME1, ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testParForHybrid() {
+               runParforTest(TEST_NAME1, ExecMode.HYBRID);
+       }
+       
+       private void runParforTest( String TEST_NAME, ExecMode type )
+       {
+               TestConfiguration config = getTestConfiguration(TEST_NAME);
+               config.addVariable("rows", rows);
+               config.addVariable("cols", cols);
+               loadTestConfiguration(config);
+               ExecMode oldExec = setExecMode(type);
+               
+               try {
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+                       programArgs = new String[]{"-explain","-stats","-args", 
input("V"), output("R") };
+                       
+                       double[][] V = getRandomMatrix(rows, cols, 4, 4, 
sparsity, 3);
+                       writeInputMatrixWithMTD("V", V, true);
+       
+                       boolean exceptionExpected = false;
+                       runTest(true, exceptionExpected, null, -1);
+                       
+                       //compare matrices
+                       HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromOutputDir("R");
+                       Assert.assertEquals(5d*rows*cols, dmlfile.get(new 
CellIndex(1,1)), eps);
+               }
+               finally {
+                       resetExecMode(oldExec);
+               }
+       }
+}
diff --git a/src/test/scripts/functions/parfor/parfor_recursive.dml 
b/src/test/scripts/functions/parfor/parfor_recursive.dml
new file mode 100644
index 0000000000..42b6c226a2
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_recursive.dml
@@ -0,0 +1,37 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+fun = function(Matrix[Double] A)
+  return(Matrix[Double] B)
+{
+    B = matrix(0, nrow(A), ncol(A))
+    parfor(i in 1:nrow(A)) {
+        if( as.scalar(A[1,1]) < 5  )
+           B[i,] = fun(A[i,]+1);
+        else
+           B[i,] = A[i, ]
+    }
+}
+
+A = read($1);
+B = as.matrix(sum(fun(A)));
+write(B, $2);
+


Reply via email to