This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new feba13d  [SYSTEMDS-3089] Fix parfor function calls in unoptimized eval 
scope
feba13d is described below

commit feba13de3012c6b24ed9d9ac6bf8ba33f7a9f3f5
Author: Matthias Boehm <[email protected]>
AuthorDate: Thu Aug 12 17:44:55 2021 +0200

    [SYSTEMDS-3089] Fix parfor function calls in unoptimized eval scope
    
    Local parallel for loops (parfor) use per worker copies of functions in
    order to allow for uncontented recompilation. When a function is called
    through eval, unoptimized functions are called that have not been
    optimized according to IPA parameter propagation to ensure result
    correctness.
    
    This patch addresses issues with non-existing unoptimized functions for
    the specific case of calling a complex function through eval which
    internally has a parfor loop with normal function calls.
---
 .../apache/sysds/parser/FunctionDictionary.java    | 15 +++++
 .../sysds/runtime/util/ProgramConverter.java       | 42 ++++++-------
 .../test/functions/misc/FunctionPotpourriTest.java |  6 ++
 .../misc/FunPotpourriParforEvalBuiltin.dml         | 73 ++++++++++++++++++++++
 4 files changed, 114 insertions(+), 22 deletions(-)

diff --git a/src/main/java/org/apache/sysds/parser/FunctionDictionary.java 
b/src/main/java/org/apache/sysds/parser/FunctionDictionary.java
index 7eabc00..32e2098 100644
--- a/src/main/java/org/apache/sysds/parser/FunctionDictionary.java
+++ b/src/main/java/org/apache/sysds/parser/FunctionDictionary.java
@@ -116,4 +116,19 @@ public class FunctionDictionary<T extends FunctionBlock> {
                                if( !_funsOrig.containsKey(e.getKey()) )
                                        _funsOrig.put(e.getKey(), e.getValue());
        }
+       
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder("Function Dictionary:");
+               sb.append("----------------------------------------");
+               int pos = 0;
+               for( Entry<String, T> e : _funs.entrySet() ) {
+                       sb.append("-- [");
+                       sb.append(pos++);
+                       sb.append("]: ");
+                       sb.append(e.getKey());
+                       sb.append("\n");
+               }
+               return sb.toString();
+       }
 }
diff --git a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java 
b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
index e3213f4..f1adc3a 100644
--- a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
@@ -372,7 +372,9 @@ public class ProgramConverter
        public static void createDeepCopyFunctionProgramBlock(String namespace, 
String oldName, long pid, int IDPrefix, Program prog, Set<String> fnStack, 
Set<String> fnCreated, boolean plain) 
        {
                //fpb guaranteed to be non-null (checked inside 
getFunctionProgramBlock)
-               FunctionProgramBlock fpb = 
prog.getFunctionProgramBlock(namespace, oldName);
+               FunctionProgramBlock fpb1 = 
prog.getFunctionProgramBlock(namespace, oldName, true);
+               FunctionProgramBlock fpb2 = 
prog.containsFunctionProgramBlock(namespace, oldName, false) ?
+                       prog.getFunctionProgramBlock(namespace, oldName, false) 
: null;
                String fnameNew = (plain)? oldName 
:(oldName+Lop.CP_CHILD_THREAD+pid); 
                String fnameNewKey = 
DMLProgram.constructFunctionKey(namespace,fnameNew);
 
@@ -380,37 +382,35 @@ public class ProgramConverter
                        return; //prevent redundant deep copy if already 
existent
                
                //create deep copy
-               FunctionProgramBlock copy = null;
-               ArrayList<DataIdentifier> tmp1 = new ArrayList<>();
-               ArrayList<DataIdentifier> tmp2 = new ArrayList<>();
-               if( fpb.getInputParams()!= null )
-                       tmp1.addAll(fpb.getInputParams());
-               if( fpb.getOutputParams()!= null )
-                       tmp2.addAll(fpb.getOutputParams());
-               
-               
+               FunctionProgramBlock copy1 = null;
                if( !fnStack.contains(fnameNewKey) ) {
                        fnStack.add(fnameNewKey);
-                       copy = new FunctionProgramBlock(prog, tmp1, tmp2);
-                       copy.setChildBlocks( 
rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, 
fnCreated, plain, fpb.isRecompileOnce()) );
-                       copy.setRecompileOnce( fpb.isRecompileOnce() );
-                       copy.setThreadID(pid);
+                       copy1 = createDeepCopyFunctionProgramBlock(fpb1, 
fnStack, fnCreated, pid, IDPrefix, plain);
                        fnStack.remove(fnameNewKey);
                }
                else //stop deep copy for recursive function calls
-                       copy = fpb;
+                       copy1 = fpb1;
                
                //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); 
//implicit cloning
                //note: instructions not used by function program block
                
                //put if not existing (recursive processing might have added it)
                if( !prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) 
{
-                       prog.addFunctionProgramBlock(namespace, fnameNew, copy);
+                       prog.addFunctionProgramBlock(namespace, fnameNew, 
copy1, true);
+                       if( fpb2 != null ) {
+                               FunctionProgramBlock copy2 = 
createDeepCopyFunctionProgramBlock(
+                                       fpb2, fnStack, fnCreated, pid, 
IDPrefix, plain);
+                               prog.addFunctionProgramBlock(namespace, 
fnameNew, copy2, false);
+                       }
                        
fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew));
                }
        }
 
-       public static FunctionProgramBlock 
createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set<String> 
fnStack, Set<String> fnCreated) 
+       public static FunctionProgramBlock 
createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set<String> 
fnStack, Set<String> fnCreated) {
+               return createDeepCopyFunctionProgramBlock(fpb, fnStack, 
fnCreated, 0, -1, true);
+       }
+       
+       public static FunctionProgramBlock 
createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set<String> 
fnStack, Set<String> fnCreated, long pid, int IDPrefix, boolean plain) 
        {
                if( fpb == null )
                        throw new DMLRuntimeException("Unable to create a deep 
copy of a non-existing FunctionProgramBlock.");
@@ -425,15 +425,13 @@ public class ProgramConverter
                        tmp2.addAll(fpb.getOutputParams());
                
                copy = new FunctionProgramBlock(fpb.getProgram(), tmp1, tmp2);
-               copy.setChildBlocks( 
rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), 0, -1, fnStack, fnCreated, 
true, fpb.isRecompileOnce()) );
+               copy.setChildBlocks( 
rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, 
fnCreated, plain, fpb.isRecompileOnce()) );
                copy.setStatementBlock( fpb.getStatementBlock() );
                copy.setRecompileOnce(fpb.isRecompileOnce());
-               //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); 
//implicit cloning
-               //note: instructions not used by function program block
-       
+               copy.setThreadID(pid);
+               
                return copy;
        }
-
        
        /**
         * Creates a deep copy of an array of instructions and replaces the 
placeholders of parworker
diff --git 
a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java 
b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
index f0e230f..214730b 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
@@ -57,6 +57,7 @@ public class FunctionPotpourriTest extends AutomatedTestBase
                "FunPotpourriEvalList2Arg",
                "FunPotpourriEvalNamespace",
                "FunPotpourriBuiltinPrecedence",
+               "FunPotpourriParforEvalBuiltin",
        };
        
        private final static String TEST_DIR = "functions/misc/";
@@ -199,6 +200,11 @@ public class FunctionPotpourriTest extends 
AutomatedTestBase
                runFunctionTest( TEST_NAMES[25], null );
        }
        
+       @Test
+       public void testFunctionParforEvalBuiltin() {
+               runFunctionTest( TEST_NAMES[26], null );
+       }
+       
        private void runFunctionTest(String testName, Class<?> error) {
                TestConfiguration config = getTestConfiguration(testName);
                loadTestConfiguration(config);
diff --git a/src/test/scripts/functions/misc/FunPotpourriParforEvalBuiltin.dml 
b/src/test/scripts/functions/misc/FunPotpourriParforEvalBuiltin.dml
new file mode 100644
index 0000000..97be37e
--- /dev/null
+++ b/src/test/scripts/functions/misc/FunPotpourriParforEvalBuiltin.dml
@@ -0,0 +1,73 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+crossV = function(Matrix[double] X, Matrix[double] y, Integer k, 
Matrix[Double] MLhp, Boolean isWeighted) 
+  return (Matrix[Double] accuracyMatrix)
+{
+  accuracyMatrix = matrix(0, k, 1)
+  dataList = list()
+  testL = list()
+  data = order(target = cbind(y, X),  by = 1, decreasing=FALSE, 
index.return=FALSE)
+  classes = table(data[, 1], 1)
+  ins_per_fold = classes/k
+  start_fold = matrix(1, rows=nrow(ins_per_fold), cols=1)
+  fold_idxes = cbind(start_fold, ins_per_fold)
+
+  start_i = 0; end_i = 0; idx_fold = 1;
+  for(i in 1:k) {
+    fold_i = matrix(0, 0, ncol(data))
+    start=0; end=0; 
+    for(j in 1:nrow(classes)) {
+      idx = as.scalar(classes[j, 1])
+      start = end + 1;
+      end = end + idx
+      class_j =  data[start:end, ]
+      start_i = as.scalar(fold_idxes[j, 1]);
+      end_i = as.scalar(fold_idxes[j, 2])
+      fold_i = rbind(fold_i, class_j[start_i:end_i, ])
+    }
+    dataList = append(dataList, fold_i)
+    fold_idxes[, 1] = fold_idxes[, 2] + 1
+    fold_idxes[, 2] += ins_per_fold
+  }
+
+  parfor(i in seq(1,k)) {
+    [trainList, hold_out] = remove(dataList, i)
+    trainset = rbind(trainList)
+    testset = as.matrix(hold_out)
+    trainX = trainset[, 2:ncol(trainset)]
+    trainy = trainset[, 1]
+    testX = testset[, 2:ncol(testset)]
+    testy = testset[, 1]
+    beta = multiLogReg(X=trainX, Y=trainy, icpt=as.scalar(MLhp[1,1]), 
reg=as.scalar(MLhp[1,2]), tol=as.scalar(MLhp[1,3]), 
+    maxi=as.scalar(MLhp[1,4]), maxii=50, verbose=FALSE);
+    [prob, yhat, acc] = multiLogRegPredict(testX, beta, testy, FALSE)
+    accuracy = getAccuracy(testy, yhat, isWeighted)
+    accuracyMatrix[i] = accuracy
+  }
+}
+
+X = rand(rows=100, cols=100)
+Y = sample(2, 100, TRUE)
+hp = matrix("1 1e-4 1e-6 100", rows=1, cols=4)
+
+acc = eval("crossV", list(X=X, y=Y, k=3, MLhp=hp, isWeighted=FALSE))
+print("CV accuracy: "+mean(acc))

Reply via email to