This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit a8a21520f54d1911c2d760775ed122a7bef1be4a
Author: Matthias Boehm <[email protected]>
AuthorDate: Tue Dec 13 21:37:13 2022 +0100

    [SYSTEMDS-3477] Fix missing list export before remote_parfor job
    
    Dirty matrices and frames (i.e., in-memory objects that were not yet
    materialized on hdfs) used in a body of a remote_parfor job need to be
    exported or broadcast before spawning the spark job. Recently, we added
    support for list inputs and outputs in parfor, but the export of
    matrices and frames passed in surrounding list objects was missing.
    This fix simply adds the basic support (with additional room for
    further performance improvements).
---
 .../runtime/controlprogram/ParForProgramBlock.java | 26 ++++++++++++----------
 1 file changed, 14 insertions(+), 12 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index cc5eb68892..6dbfc34123 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -1154,30 +1154,32 @@ public class ParForProgramBlock extends ForProgramBlock
        private void exportMatricesToHDFS(ExecutionContext ec, Set<String> 
excludeNames)  {
                ParForStatementBlock sb = 
(ParForStatementBlock)getStatementBlock();
                
-               if( LIVEVAR_AWARE_EXPORT && sb != null)
-               {
+               if( LIVEVAR_AWARE_EXPORT && sb != null ) {
                        //optimization to prevent unnecessary export of matrices
                        //export only variables that are read in the body
                        VariableSet varsRead = sb.variablesRead();
                        for (String key : ec.getVariables().keySet() ) {
-                               if( varsRead.containsVariable(key) && 
!excludeNames.contains(key) ) {
-                                       Data d = ec.getVariable(key);
-                                       if( d.getDataType().isMatrixOrFrame() )
-                                               
((CacheableData<?>)d).exportData(_replicationExport);
-                               }
+                               if( varsRead.containsVariable(key) && 
!excludeNames.contains(key) )
+                                       exportDataToHDFS(ec.getVariable(key), 
key);
                        }
                }
                else {
                        //export all matrices in symbol table
                        for (String key : ec.getVariables().keySet() ) {
-                               if( !excludeNames.contains(key) ) {
-                                       Data d = ec.getVariable(key);
-                                       if( d.getDataType().isMatrixOrFrame() )
-                                               
((CacheableData<?>)d).exportData(_replicationExport);
-                               }
+                               if( !excludeNames.contains(key) )
+                                       exportDataToHDFS(ec.getVariable(key), 
key);
                        }
                }
        }
+       
+       private void exportDataToHDFS(Data data, String key) {
+               if( data.getDataType().isMatrixOrFrame() )
+                       ((CacheableData<?>)data).exportData(_replicationExport);
+               if( data.getDataType().isList() ) {
+                       for( Data ldata : ((ListObject)data).getData() )
+                               exportDataToHDFS(ldata, key);
+               }
+       }
 
        private void cleanupSharedVariables( ExecutionContext ec, boolean[] 
varState ) {
                //TODO needs as precondition a systematic treatment of 
persistent read information.

Reply via email to