This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit a8a21520f54d1911c2d760775ed122a7bef1be4a Author: Matthias Boehm <[email protected]> AuthorDate: Tue Dec 13 21:37:13 2022 +0100 [SYSTEMDS-3477] Fix missing list export before remote_parfor job Dirty matrices and frames (i.e., in-memory objects that were not yet materialized on hdfs) used in a body of a remote_parfor job need to be exported or broadcast before spawning the spark job. Recently, we added support for list inputs and outputs in parfor, but the export of matrices and frames passed in surrounding list objects was missing. This fix simply adds the basic support (with additional room for further performance improvements). --- .../runtime/controlprogram/ParForProgramBlock.java | 26 ++++++++++++---------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java index cc5eb68892..6dbfc34123 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java @@ -1154,30 +1154,32 @@ public class ParForProgramBlock extends ForProgramBlock private void exportMatricesToHDFS(ExecutionContext ec, Set<String> excludeNames) { ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock(); - if( LIVEVAR_AWARE_EXPORT && sb != null) - { + if( LIVEVAR_AWARE_EXPORT && sb != null ) { //optimization to prevent unnecessary export of matrices //export only variables that are read in the body VariableSet varsRead = sb.variablesRead(); for (String key : ec.getVariables().keySet() ) { - if( varsRead.containsVariable(key) && !excludeNames.contains(key) ) { - Data d = ec.getVariable(key); - if( d.getDataType().isMatrixOrFrame() ) - ((CacheableData<?>)d).exportData(_replicationExport); - } + if( varsRead.containsVariable(key) && !excludeNames.contains(key) ) + exportDataToHDFS(ec.getVariable(key), key); } } else { //export all matrices in symbol table for (String key : ec.getVariables().keySet() ) { - if( !excludeNames.contains(key) ) { - Data d = ec.getVariable(key); - if( d.getDataType().isMatrixOrFrame() ) - ((CacheableData<?>)d).exportData(_replicationExport); - } + if( !excludeNames.contains(key) ) + exportDataToHDFS(ec.getVariable(key), key); } } } + + private void exportDataToHDFS(Data data, String key) { + if( data.getDataType().isMatrixOrFrame() ) + ((CacheableData<?>)data).exportData(_replicationExport); + if( data.getDataType().isList() ) { + for( Data ldata : ((ListObject)data).getData() ) + exportDataToHDFS(ldata, key); + } + } private void cleanupSharedVariables( ExecutionContext ec, boolean[] varState ) { //TODO needs as precondition a systematic treatment of persistent read information.
