This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new c38aa4e25f [SYSTEMDS-3622] Fix conditions for local in-memory reblocks
c38aa4e25f is described below
commit c38aa4e25f673db737a794b2d76937e5650d513b
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Sep 22 17:43:04 2023 +0200
[SYSTEMDS-3622] Fix conditions for local in-memory reblocks
This patch improves the local reblock logic for large text files, which
normally are reblocked via distributed spark instructions. The "bug" of
the jira task show up because single-threaded I/O also lead to a
reduced reblock threshold and thus spark reblock on reading matrix
market. Again, due to missing log-configuration, did not show that
explicitly which was very slow for ultra sparse matrices. We now
prefer local reblocks for ultra-sprase matrices in local mode if the
data at least fits into memory.
---
src/main/java/org/apache/sysds/hops/recompile/Recompiler.java | 10 +++++++++-
src/main/java/org/apache/sysds/runtime/io/ReaderTextCell.java | 11 +++++------
2 files changed, 14 insertions(+), 7 deletions(-)
diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index ea707be1ca..811b7593e9 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -86,6 +86,7 @@ import
org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.controlprogram.parfor.opt.OptTreeConverter;
import org.apache.sysds.runtime.instructions.Instruction;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
@@ -1598,6 +1599,7 @@ public class Recompiler {
//check valid dimensions and memory requirements
double sp = OptimizerUtils.getSparsity(rows, cols, nnz);
double mem = MatrixBlock.estimateSizeInMemory(rows, cols, sp);
+
if( !OptimizerUtils.isValidCPDimensions(rows, cols)
|| !OptimizerUtils.isValidCPMatrixSize(rows, cols, sp)
|| mem >= OptimizerUtils.getLocalMemBudget() )
@@ -1609,8 +1611,14 @@ public class Recompiler {
long estFilesize = (long)(3.5 * mem); //conservative estimate
long cpThreshold = CP_REBLOCK_THRESHOLD_SIZE *
OptimizerUtils.getParallelTextReadParallelism();
- return (iimd.getFileFormat() == FileFormat.BINARY
+ boolean ret = (iimd.getFileFormat() == FileFormat.BINARY
|| estFilesize < cpThreshold); //for text conservative
+
+ // for reading ultra-sparse in local mode (1 executor) always
avoid spark reblock
+ if( !ret && sp < MatrixBlock.ULTRA_SPARSITY_TURN_POINT ) // but
qualifies
+ ret = (SparkExecutionContext.getNumExecutors() == 1);
+
+ return ret;
}
public static boolean checkCPCheckpoint(DataCharacteristics dc) {
diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCell.java
b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCell.java
index 7ce872e595..e8cce53d77 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCell.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCell.java
@@ -135,10 +135,10 @@ public class ReaderTextCell extends MatrixReader
if( sparse ) { //SPARSE<-value
while( reader.next(key, value)
) {
cell =
parseCell(value.toString(), st, cell, _mmProps);
- appendCell(cell, dest,
_mmProps);
+ nnz += appendCell(cell,
dest, _mmProps);
}
dest.sortSparseRows();
- }
+ }
else { //DENSE<-value
DenseBlock a =
dest.getDenseBlock();
while( reader.next(key, value)
) {
@@ -152,8 +152,7 @@ public class ReaderTextCell extends MatrixReader
}
}
- if( !dest.isInSparseFormat() )
- dest.setNonZeros(nnz);
+ dest.setNonZeros(nnz);
}
catch(Exception ex) {
//post-mortem error handling and bounds checking
@@ -244,7 +243,7 @@ public class ReaderTextCell extends MatrixReader
if( sparse ) { //SPARSE<-value
while( (value=br.readLine())!=null ) {
cell = parseCell(value.toString(), st,
cell, mmProps);
- appendCell(cell, dest, mmProps);
+ nnz += appendCell(cell, dest, mmProps);
}
dest.sortSparseRows();
}
@@ -254,8 +253,8 @@ public class ReaderTextCell extends MatrixReader
cell = parseCell(value.toString(), st,
cell, mmProps);
nnz += appendCell(cell, a, mmProps);
}
- dest.setNonZeros(nnz);
}
+ dest.setNonZeros(nnz);
}
catch(Exception ex) {
//post-mortem error handling and bounds checking