This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new c38aa4e25f [SYSTEMDS-3622] Fix conditions for local in-memory reblocks
c38aa4e25f is described below

commit c38aa4e25f673db737a794b2d76937e5650d513b
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Sep 22 17:43:04 2023 +0200

    [SYSTEMDS-3622] Fix conditions for local in-memory reblocks
    
    This patch improves the local reblock logic for large text files, which
    normally are reblocked via distributed spark instructions. The "bug" of
    the jira task show up because single-threaded I/O also lead to a
    reduced reblock threshold and thus spark reblock on reading matrix
    market. Again, due to missing log-configuration, did not show that
    explicitly which was very slow for ultra sparse matrices. We now
    prefer local reblocks for ultra-sprase matrices in local mode if the
    data at least fits into memory.
---
 src/main/java/org/apache/sysds/hops/recompile/Recompiler.java | 10 +++++++++-
 src/main/java/org/apache/sysds/runtime/io/ReaderTextCell.java | 11 +++++------
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java 
b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index ea707be1ca..811b7593e9 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -86,6 +86,7 @@ import 
org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.controlprogram.parfor.opt.OptTreeConverter;
 import org.apache.sysds.runtime.instructions.Instruction;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
@@ -1598,6 +1599,7 @@ public class Recompiler {
                //check valid dimensions and memory requirements
                double sp = OptimizerUtils.getSparsity(rows, cols, nnz);
                double mem = MatrixBlock.estimateSizeInMemory(rows, cols, sp);
+               
                if(    !OptimizerUtils.isValidCPDimensions(rows, cols)
                        || !OptimizerUtils.isValidCPMatrixSize(rows, cols, sp)
                        || mem >= OptimizerUtils.getLocalMemBudget() ) 
@@ -1609,8 +1611,14 @@ public class Recompiler {
                long estFilesize = (long)(3.5 * mem); //conservative estimate
                long cpThreshold = CP_REBLOCK_THRESHOLD_SIZE * 
                        OptimizerUtils.getParallelTextReadParallelism();
-               return (iimd.getFileFormat() == FileFormat.BINARY
+               boolean ret = (iimd.getFileFormat() == FileFormat.BINARY
                        || estFilesize < cpThreshold); //for text conservative
+               
+               // for reading ultra-sparse in local mode (1 executor) always 
avoid spark reblock
+               if( !ret && sp < MatrixBlock.ULTRA_SPARSITY_TURN_POINT ) // but 
qualifies
+                       ret = (SparkExecutionContext.getNumExecutors() == 1);
+               
+               return ret;
        }
        
        public static boolean checkCPCheckpoint(DataCharacteristics dc) {
diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCell.java 
b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCell.java
index 7ce872e595..e8cce53d77 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCell.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCell.java
@@ -135,10 +135,10 @@ public class ReaderTextCell extends MatrixReader
                                        if( sparse ) { //SPARSE<-value
                                                while( reader.next(key, value) 
) {
                                                        cell = 
parseCell(value.toString(), st, cell, _mmProps);
-                                                       appendCell(cell, dest, 
_mmProps);
+                                                       nnz += appendCell(cell, 
dest, _mmProps);
                                                }
                                                dest.sortSparseRows();
-                                       } 
+                                       }
                                        else { //DENSE<-value
                                                DenseBlock a = 
dest.getDenseBlock();
                                                while( reader.next(key, value) 
) {
@@ -152,8 +152,7 @@ public class ReaderTextCell extends MatrixReader
                                }
                        }
                        
-                       if( !dest.isInSparseFormat() )
-                               dest.setNonZeros(nnz);
+                       dest.setNonZeros(nnz);
                }
                catch(Exception ex) {
                        //post-mortem error handling and bounds checking
@@ -244,7 +243,7 @@ public class ReaderTextCell extends MatrixReader
                        if( sparse ) { //SPARSE<-value
                                while( (value=br.readLine())!=null ) {
                                        cell = parseCell(value.toString(), st, 
cell, mmProps);
-                                       appendCell(cell, dest, mmProps);
+                                       nnz += appendCell(cell, dest, mmProps);
                                }
                                dest.sortSparseRows();
                        } 
@@ -254,8 +253,8 @@ public class ReaderTextCell extends MatrixReader
                                        cell = parseCell(value.toString(), st, 
cell, mmProps);
                                        nnz += appendCell(cell, a, mmProps);
                                }
-                               dest.setNonZeros(nnz);
                        }
+                       dest.setNonZeros(nnz);
                }
                catch(Exception ex) {
                        //post-mortem error handling and bounds checking

Reply via email to