This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new c0d8456  [SYSTEMDS-2953] Fix parfor spark runtime (frame support, 
singlenode)
c0d8456 is described below

commit c0d84560067365ce144713d49559494786637c84
Author: Matthias Boehm <[email protected]>
AuthorDate: Sun May 23 19:02:11 2021 +0200

    [SYSTEMDS-2953] Fix parfor spark runtime (frame support, singlenode)
    
    This patch fixes a few issues of forced remote spark parfor loops,
    especially for basic frame input support, execution context and
    blocksizes in forced singlenode execution, and foramt handling.
    
    The prime use cases are data cleaning pipelines which deal with matrices
    and frames and need to run both at logical and physical pipeline level
    in parfor and parfor remote if the dataset is small enough.
---
 src/main/java/org/apache/sysds/common/Types.java   |   3 +
 .../java/org/apache/sysds/parser/DMLProgram.java   |  10 +++
 .../apache/sysds/parser/ParForStatementBlock.java  |   6 ++
 .../runtime/controlprogram/ParForProgramBlock.java |   9 +-
 .../controlprogram/caching/FrameObject.java        |   4 +-
 .../controlprogram/caching/MatrixObject.java       |   3 +-
 .../context/ExecutionContextFactory.java           |   3 +-
 .../sysds/runtime/util/ProgramConverter.java       |  34 ++++++-
 .../misc/ParForFunctionSerializationTest.java      |   5 +-
 .../parfor/misc/ParForRemoteRobustnessTest.java    | 100 +++++++++++++++++++++
 .../scripts/functions/parfor/parfor_remote1.dml    |  33 +++++++
 .../scripts/functions/parfor/parfor_remote2.dml    |  36 ++++++++
 12 files changed, 233 insertions(+), 13 deletions(-)

diff --git a/src/main/java/org/apache/sysds/common/Types.java 
b/src/main/java/org/apache/sysds/common/Types.java
index 284ba9c..2dd5ca9 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -55,6 +55,9 @@ public class Types
                public boolean isFrame() {
                        return this == FRAME;
                }
+               public boolean isMatrixOrFrame() {
+                       return isMatrix() | isFrame();
+               }
                public boolean isScalar() {
                        return this == SCALAR;
                }
diff --git a/src/main/java/org/apache/sysds/parser/DMLProgram.java 
b/src/main/java/org/apache/sysds/parser/DMLProgram.java
index ea9f306..498a59d 100644
--- a/src/main/java/org/apache/sysds/parser/DMLProgram.java
+++ b/src/main/java/org/apache/sysds/parser/DMLProgram.java
@@ -35,10 +35,12 @@ public class DMLProgram
        
        private ArrayList<StatementBlock> _blocks;
        private Map<String, FunctionDictionary<FunctionStatementBlock>> 
_namespaces;
+       private boolean _containsRemoteParfor;
        
        public DMLProgram(){
                _blocks = new ArrayList<>();
                _namespaces = new HashMap<>();
+               _containsRemoteParfor = false;
        }
        
        public DMLProgram(String namespace) {
@@ -58,6 +60,14 @@ public class DMLProgram
                return _blocks.size();
        }
        
+       public void setContainsRemoteParfor(boolean flag) {
+               _containsRemoteParfor = flag;
+       }
+       
+       public boolean containsRemoteParfor() {
+               return _containsRemoteParfor;
+       }
+       
        public static boolean isInternalNamespace(String namespace) {
                return DEFAULT_NAMESPACE.equals(namespace)
                        || BUILTIN_NAMESPACE.equals(namespace)
diff --git a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java 
b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
index 9219ba1..4b47148 100644
--- a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
@@ -223,6 +223,12 @@ public class ParForStatementBlock extends ForStatementBlock
                                        else //default case
                                                params.put(key, 
_paramDefaults.get(key));
                                }
+                       
+                       //keep info if forced into remote exec
+                       if( constrained && params.containsKey(EXEC_MODE) )
+                               dmlProg.setContainsRemoteParfor(
+                                       
params.get(EXEC_MODE).equals(PExecMode.REMOTE_SPARK.name()) ||
+                                       
params.get(EXEC_MODE).equals(PExecMode.REMOTE_SPARK_DP.name()));
                }
                else {
                        //set all defaults
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 7e318ae..cb94c00 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -37,6 +37,7 @@ import org.apache.sysds.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysds.parser.StatementBlock;
 import org.apache.sysds.parser.VariableSet;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
@@ -1143,8 +1144,8 @@ public class ParForProgramBlock extends ForProgramBlock
                        for (String key : ec.getVariables().keySet() ) {
                                if( varsRead.containsVariable(key) && 
!excludeNames.contains(key) ) {
                                        Data d = ec.getVariable(key);
-                                       if( d.getDataType() == DataType.MATRIX )
-                                               
((MatrixObject)d).exportData(_replicationExport);
+                                       if( d.getDataType().isMatrixOrFrame() )
+                                               
((CacheableData<?>)d).exportData(_replicationExport);
                                }
                        }
                }
@@ -1153,8 +1154,8 @@ public class ParForProgramBlock extends ForProgramBlock
                        for (String key : ec.getVariables().keySet() ) {
                                if( !excludeNames.contains(key) ) {
                                        Data d = ec.getVariable(key);
-                                       if( d.getDataType() == DataType.MATRIX )
-                                               
((MatrixObject)d).exportData(_replicationExport);
+                                       if( d.getDataType().isMatrixOrFrame() )
+                                               
((CacheableData<?>)d).exportData(_replicationExport);
                                }
                        }
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
index 36cd0f0..388238c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
@@ -272,7 +272,9 @@ public class FrameObject extends CacheableData<FrameBlock>
        protected void writeBlobToHDFS(String fname, String ofmt, int rep, 
FileFormatProperties fprop)
                throws IOException, DMLRuntimeException 
        {
-               FileFormat fmt = FileFormat.safeValueOf(ofmt);
+               MetaDataFormat iimd = (MetaDataFormat) _metaData;
+               FileFormat fmt = (ofmt != null ? FileFormat.safeValueOf(ofmt) : 
iimd.getFileFormat());
+               
                FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt, 
fprop);
                writer.writeFrameToHDFS(_data, fname,  getNumRows(), 
getNumColumns());
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index 3001e44..428fe9a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -462,9 +462,10 @@ public class MatrixObject extends 
CacheableData<MatrixBlock>
 
                // Read matrix and maintain meta data, 
                // if the MatrixObject is federated there is nothing extra to 
read, and therefore only acquire read and release
+               int blen = mc.getBlocksize() <= 0 ? 
ConfigurationManager.getBlocksize() : mc.getBlocksize();
                MatrixBlock newData = isFederated() ? acquireReadAndRelease() :
                        DataConverter.readMatrixFromHDFS(fname, 
iimd.getFileFormat(), rlen,
-                       clen, mc.getBlocksize(), mc.getNonZeros(), 
getFileFormatProperties());
+                       clen, blen, mc.getNonZeros(), 
getFileFormatProperties());
                
                if(iimd.getFileFormat() == FileFormat.CSV){
                        _metaData = _metaData instanceof MetaDataFormat ?
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
index 66ff510..6f408b2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
@@ -54,7 +54,8 @@ public class ExecutionContextFactory
                        case SINGLE_NODE:
                                //NOTE: even in case of forced singlenode 
operations, users might still 
                                //want to run remote parfor which requires the 
correct execution context
-                               if( 
OptimizerUtils.getDefaultExecutionMode()==ExecMode.HYBRID)
+                               if( 
OptimizerUtils.getDefaultExecutionMode()==ExecMode.HYBRID
+                                       && !(prog.getDMLProg()!=null && 
prog.getDMLProg().containsRemoteParfor()))
                                        ec = new ExecutionContext(allocateVars, 
allocateLineage, prog);
                                else
                                        ec = new 
SparkExecutionContext(allocateVars, allocateLineage, prog);
diff --git a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java 
b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
index 16472c7..f595137 100644
--- a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
@@ -64,6 +64,7 @@ import 
org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PartitionForma
 import org.apache.sysds.runtime.controlprogram.Program;
 import org.apache.sysds.runtime.controlprogram.ProgramBlock;
 import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -160,8 +161,6 @@ public class ProgramConverter
        public static final String PSBODY_END = LEVELOUT + CDATA_END;
        
        //exception msgs
-       public static final String NOT_SUPPORTED_EXTERNALFUNCTION_PB = "Not 
supported: ExternalFunctionProgramBlock contains MR instructions. " +
-                                                                               
                                                        
"(ExternalFunctionPRogramBlockCP can be used)";
        public static final String NOT_SUPPORTED_SPARK_INSTRUCTION   = "Not 
supported: Instructions of type other than CP instructions";
        public static final String NOT_SUPPORTED_SPARK_PARFOR        = "Not 
supported: Nested ParFOR REMOTE_SPARK due to possible deadlocks." +
                                                                                
                                                        "(LOCAL can be used for 
innner ParFOR)";
@@ -920,7 +919,7 @@ public class ProgramConverter
                                //name = so.getName();
                                value = so.getStringValue();
                                break;
-                       case MATRIX:
+                       case MATRIX: {
                                MatrixObject mo = (MatrixObject) dat;
                                MetaDataFormat md = (MetaDataFormat) 
dat.getMetaData();
                                DataCharacteristics dc = 
md.getDataCharacteristics();
@@ -938,6 +937,21 @@ public class ProgramConverter
                                metaData[7] = 
String.valueOf(mo.isHDFSFileExists());
                                metaData[8] = 
String.valueOf(mo.isCleanupEnabled());
                                break;
+                       }
+                       case FRAME: {
+                               FrameObject fo = (FrameObject) dat;
+                               MetaDataFormat md = (MetaDataFormat) 
dat.getMetaData();
+                               DataCharacteristics dc = 
md.getDataCharacteristics();
+                               value = fo.getFileName();
+                               metaData = new String[6];
+                               metaData[0] = String.valueOf(dc.getRows());
+                               metaData[1] = String.valueOf(dc.getCols());
+                               metaData[2] = String.valueOf(dc.getBlocksize());
+                               metaData[3] = md.getFileFormat().toString();
+                               metaData[4] = 
String.valueOf(fo.isHDFSFileExists());
+                               metaData[5] = 
String.valueOf(fo.isCleanupEnabled());
+                               break;
+                       }
                        case LIST:
                                // SCHEMA: 
<name>|<datatype>|<valuetype>|value|<metadata>|<tab>element1<tab>element2<tab>element3
 (this is the list)
                                //         (for the element1) 
<listName-index>|<datatype>|<valuetype>|value
@@ -1639,6 +1653,20 @@ public class ProgramConverter
                                dat = mo;
                                break;
                        }
+                       case FRAME: {
+                               FrameObject mo = new FrameObject(valString);
+                               long rows = Long.parseLong(st.nextToken());
+                               long cols = Long.parseLong(st.nextToken());
+                               int blen = Integer.parseInt(st.nextToken());
+                               FileFormat fmt = 
FileFormat.safeValueOf(st.nextToken());
+                               MatrixCharacteristics mc = new 
MatrixCharacteristics(rows, cols, blen, -1);
+                               MetaDataFormat md = new MetaDataFormat(mc, fmt);
+                               mo.setMetaData( md );
+                               
mo.setHDFSFileExists(Boolean.valueOf(st.nextToken()));
+                               
mo.enableCleanup(Boolean.valueOf(st.nextToken()));
+                               dat = mo;
+                               break;
+                       }
                        case LIST:
                                int size = Integer.parseInt(st.nextToken());
                                String namesStr = st.nextToken();
diff --git 
a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForFunctionSerializationTest.java
 
b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForFunctionSerializationTest.java
index 6833b33..f75a2b0 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForFunctionSerializationTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForFunctionSerializationTest.java
@@ -69,9 +69,8 @@ public class ParForFunctionSerializationTest extends 
AutomatedTestBase
                
                fullRScriptName = HOME + TEST_NAME1 + ".R";
                rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " 
" + expectedDir();
-
-               long seed = System.nanoTime();
-               double[][] V = getRandomMatrix(rows, cols, 0, 1, sparsity, 
seed);
+               
+               double[][] V = getRandomMatrix(rows, cols, 0, 1, sparsity, 7);
                writeInputMatrix("V", V, true);
 
                boolean exceptionExpected = false;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRemoteRobustnessTest.java
 
b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRemoteRobustnessTest.java
new file mode 100644
index 0000000..a1599c6
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRemoteRobustnessTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.parfor.misc;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+
+public class ParForRemoteRobustnessTest extends AutomatedTestBase 
+{
+       private final static String TEST_NAME1 = "parfor_remote1";
+       private final static String TEST_NAME2 = "parfor_remote2";
+       private final static String TEST_DIR = "functions/parfor/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
ParForRemoteRobustnessTest.class.getSimpleName() + "/";
+       private final static double eps = 1e-10;
+       
+       private final static int rows = 20;
+       private final static int cols = 10;
+       private final static double sparsity = 1.0;
+       
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[]{"Rout"}));
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[]{"Rout"}));
+       }
+
+       @Test
+       public void testParForRemoteMatrixCP() {
+               runParforRemoteTest(TEST_NAME1, ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testParForRemoteMatrixHybrid() {
+               runParforRemoteTest(TEST_NAME1, ExecMode.HYBRID);
+       }
+       
+       @Test
+       public void testParForRemoteFrameCP() {
+               runParforRemoteTest(TEST_NAME2, ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testParForRemoteFrameHybrid() {
+               runParforRemoteTest(TEST_NAME2, ExecMode.HYBRID);
+       }
+       
+       private void runParforRemoteTest( String TEST_NAME, ExecMode type )
+       {
+               TestConfiguration config = getTestConfiguration(TEST_NAME);
+               config.addVariable("rows", rows);
+               config.addVariable("cols", cols);
+               loadTestConfiguration(config);
+               ExecMode oldExec = setExecMode(type);
+               if( type == ExecMode.SINGLE_NODE )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               
+               try {
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+                       programArgs = new String[]{"-explain","-args", 
input("V"), 
+                               Integer.toString(rows), Integer.toString(cols), 
output("R") };
+                       
+                       double[][] V = getRandomMatrix(rows, cols, 5, 5, 
sparsity, 3);
+                       writeInputMatrix("V", V, true);
+       
+                       boolean exceptionExpected = false;
+                       runTest(true, exceptionExpected, null, -1);
+                       
+                       //compare matrices
+                       HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromOutputDir("R");
+                       Assert.assertEquals(5d*rows*cols, dmlfile.get(new 
CellIndex(1,1)), eps);
+               }
+               finally {
+                       resetExecMode(oldExec);
+               }
+       }
+}
diff --git a/src/test/scripts/functions/parfor/parfor_remote1.dml 
b/src/test/scripts/functions/parfor/parfor_remote1.dml
new file mode 100644
index 0000000..6dd1af3
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_remote1.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($1, rows=$2, cols=$3, format="text");
+
+R = matrix(0, nrow(A), 1)
+parfor(i in 1:nrow(A), mode=REMOTE_SPARK, opt=CONSTRAINED) {
+  Ai = A[i, ];
+  if( sum(Ai) < 0 ) # compile spark
+    Ai = rand(rows=1e10, cols=1e4); 
+  R[i,1] = as.matrix(sum(Ai));
+}
+
+R2 = as.matrix(sum(R))
+write(R2, $4);
diff --git a/src/test/scripts/functions/parfor/parfor_remote2.dml 
b/src/test/scripts/functions/parfor/parfor_remote2.dml
new file mode 100644
index 0000000..c4f376a
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_remote2.dml
@@ -0,0 +1,36 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($1, rows=$2, cols=$3, data_type="frame", format="text");
+
+R = matrix(0, nrow(A), 1)
+parfor(i in 1:nrow(A), mode=REMOTE_SPARK, opt=CONSTRAINED) {
+  Ai = A[i, ];
+  Ai3 = as.matrix(Ai);
+  if( sum(Ai3) < 0 ) { # compile spark
+    [Ai2,M] = transformencode(target=Ai, spec="{recode:[1]}");
+    Ai3 = rand(rows=1e10, cols=1e4) + sum(Ai2); 
+  }
+  R[i,1] = as.matrix(sum(Ai3));
+}
+
+R2 = as.matrix(sum(R))
+write(R2, $4);

Reply via email to