This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new ef0cc71  [SYSTEMDS-2814] Fix frame RDD caching logic (avoid excessive 
GC)
ef0cc71 is described below

commit ef0cc71a75d32c977889bde5191c61617dbb0927
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Jan 29 23:10:02 2021 +0100

    [SYSTEMDS-2814] Fix frame RDD caching logic (avoid excessive GC)
    
    This patch fixes the partially incorrect caching logic for distributed
    frame RDDs. So far, we always injected checkpoint instructions (caching
    directives) after persistent reads or whenever a reblock is required.
    However, for example a csv frame reblock after persistent read but
    before transformencode does not cause shuffle but caching it can cause
    severe garbage collection overhead due to many string objects.
    
    On the 1TB criteo dataset, this problem even lead to aborted jobs
    because GC pauses were so long that the heartbeat interval was exeeded
    and thus, executors got restarted.
---
 src/main/java/org/apache/sysds/common/Types.java     |  2 +-
 src/main/java/org/apache/sysds/hops/DataOp.java      | 20 ++++++++++----------
 src/main/java/org/apache/sysds/hops/Hop.java         |  2 +-
 .../java/org/apache/sysds/hops/OptimizerUtils.java   |  6 +++---
 .../org/apache/sysds/hops/recompile/Recompiler.java  |  2 +-
 .../apache/sysds/hops/rewrite/HopRewriteUtils.java   |  2 +-
 .../hops/rewrite/RewriteBlockSizeAndReblock.java     |  4 ++--
 .../RewriteInjectSparkPReadCheckpointing.java        | 18 +++++++++++-------
 .../hops/rewrite/RewriteSplitDagUnknownCSVRead.java  |  2 +-
 .../java/org/apache/sysds/parser/DMLTranslator.java  |  8 ++++----
 ...MultiReturnParameterizedBuiltinSPInstruction.java |  3 ++-
 11 files changed, 37 insertions(+), 32 deletions(-)

diff --git a/src/main/java/org/apache/sysds/common/Types.java 
b/src/main/java/org/apache/sysds/common/Types.java
index ed651b9..6c179cb 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -500,7 +500,7 @@ public class Types
                FEDERATED, // A federated matrix
                PROTO;  // protocol buffer representation
                
-               public boolean isIJVFormat() {
+               public boolean isIJV() {
                        return this == TEXT || this == MM;
                }
                
diff --git a/src/main/java/org/apache/sysds/hops/DataOp.java 
b/src/main/java/org/apache/sysds/hops/DataOp.java
index 114006f..e3467c5 100644
--- a/src/main/java/org/apache/sysds/hops/DataOp.java
+++ b/src/main/java/org/apache/sysds/hops/DataOp.java
@@ -98,7 +98,7 @@ public class DataOp extends Hop {
                setNnz(nnz);
                
                if( dop == OpOpData.TRANSIENTREAD )
-                       setInputFormatType(FileFormat.BINARY);
+                       setFileFormat(FileFormat.BINARY);
        }
 
        public DataOp(String l, DataType dt, ValueType vt, OpOpData dop,
@@ -136,7 +136,7 @@ public class DataOp extends Hop {
                        index++;
                }
                if (dop == OpOpData.TRANSIENTREAD ){
-                       setInputFormatType(FileFormat.BINARY);
+                       setFileFormat(FileFormat.BINARY);
                }
                
                if( params.containsKey(DataExpression.READROWPARAM) )
@@ -158,7 +158,7 @@ public class DataOp extends Hop {
                _fileName = fname;
 
                if (dop == OpOpData.TRANSIENTWRITE || dop == 
OpOpData.FUNCTIONOUTPUT )
-                       setInputFormatType(FileFormat.BINARY);
+                       setFileFormat(FileFormat.BINARY);
        }
        
        /**
@@ -196,7 +196,7 @@ public class DataOp extends Hop {
                }
 
                if (dop == OpOpData.TRANSIENTWRITE)
-                       setInputFormatType(FileFormat.BINARY);
+                       setFileFormat(FileFormat.BINARY);
        }
 
        /** Check for N (READ) or N+1 (WRITE) inputs. */
@@ -280,27 +280,27 @@ public class DataOp extends Hop {
                {
                        case TRANSIENTREAD:
                                l = new Data(_op, null, inputLops, getName(), 
null, 
-                                               getDataType(), getValueType(), 
getInputFormatType());
+                                               getDataType(), getValueType(), 
getFileFormat());
                                setOutputDimensions(l);
                                break;
                                
                        case PERSISTENTREAD:
                                l = new Data(_op, null, inputLops, getName(), 
null, 
-                                               getDataType(), getValueType(), 
getInputFormatType());
+                                               getDataType(), getValueType(), 
getFileFormat());
                                
l.getOutputParameters().setDimensions(getDim1(), getDim2(), _inBlocksize, 
getNnz(), getUpdateType());
                                break;
                                
                        case PERSISTENTWRITE:
                        case FUNCTIONOUTPUT:
                                l = new Data(_op, 
getInput().get(0).constructLops(), inputLops, getName(), null, 
-                                       getDataType(), getValueType(), 
getInputFormatType());
+                                       getDataType(), getValueType(), 
getFileFormat());
                                ((Data)l).setExecType(et);
                                setOutputDimensions(l);
                                break;
                                
                        case TRANSIENTWRITE:
                                l = new Data(_op, 
getInput().get(0).constructLops(), inputLops, getName(), null,
-                                               getDataType(), getValueType(), 
getInputFormatType());
+                                               getDataType(), getValueType(), 
getFileFormat());
                                setOutputDimensions(l);
                                break;
                                
@@ -327,11 +327,11 @@ public class DataOp extends Hop {
 
        }
 
-       public void setInputFormatType(FileFormat ft) {
+       public void setFileFormat(FileFormat ft) {
                _inFormat = ft;
        }
 
-       public FileFormat getInputFormatType() {
+       public FileFormat getFileFormat() {
                return _inFormat;
        }
        
diff --git a/src/main/java/org/apache/sysds/hops/Hop.java 
b/src/main/java/org/apache/sysds/hops/Hop.java
index 1eb319f..5be1a63 100644
--- a/src/main/java/org/apache/sysds/hops/Hop.java
+++ b/src/main/java/org/apache/sysds/hops/Hop.java
@@ -311,7 +311,7 @@ public abstract class Hop implements ParseInfo {
                        {
                                if( this instanceof DataOp  // CSV
                                        && ((DataOp)this).getOp() == 
OpOpData.PERSISTENTREAD
-                                       && ((DataOp)this).getInputFormatType() 
== FileFormat.CSV  )
+                                       && ((DataOp)this).getFileFormat() == 
FileFormat.CSV  )
                                {
                                        reblock = new CSVReBlock( input, 
getBlocksize(), 
                                                getDataType(), getValueType(), 
et);
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index c05eaa3..dce33f2 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -775,7 +775,7 @@ public class OptimizerUtils
 
        public static long estimateSizeTextOutput(long rows, long cols, long 
nnz, FileFormat fmt) {
                long bsize = MatrixBlock.estimateSizeOnDisk(rows, cols, nnz);
-               if( fmt.isIJVFormat() )
+               if( fmt.isIJV() )
                        return bsize * 3;
                else if( fmt == FileFormat.LIBSVM )
                        return Math.round(bsize * 2.5);
@@ -941,8 +941,8 @@ public class OptimizerUtils
                        ret &= ( p.getExecType()==ExecType.CP 
                                ||(p instanceof AggBinaryOp && 
allowsToFilterEmptyBlockOutputs(p) )
                                ||(HopRewriteUtils.isReorg(p, ReOrgOp.RESHAPE, 
ReOrgOp.TRANS) && allowsToFilterEmptyBlockOutputs(p) )
-                               ||(HopRewriteUtils.isData(p, 
OpOpData.PERSISTENTWRITE) && ((DataOp)p).getInputFormatType()==FileFormat.TEXT))
-                               && !(p instanceof FunctionOp || (p instanceof 
DataOp && ((DataOp)p).getInputFormatType()!=FileFormat.TEXT) ); //no function 
call or transient write
+                               ||(HopRewriteUtils.isData(p, 
OpOpData.PERSISTENTWRITE) && ((DataOp)p).getFileFormat()==FileFormat.TEXT))
+                               && !(p instanceof FunctionOp || (p instanceof 
DataOp && ((DataOp)p).getFileFormat()!=FileFormat.TEXT) ); //no function call 
or transient write
                }
                return ret;
        }
diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java 
b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index c57fbc8..3b15b44 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -1344,7 +1344,7 @@ public class Recompiler
                }
                //special case for persistent reads with unknown size 
(read-after-write)
                else if( HopRewriteUtils.isData(hop, OpOpData.PERSISTENTREAD)
-                       && !hop.dimsKnown() && 
((DataOp)hop).getInputFormatType()!=FileFormat.CSV
+                       && !hop.dimsKnown() && 
((DataOp)hop).getFileFormat()!=FileFormat.CSV
                        && 
!ConfigurationManager.getCompilerConfigFlag(ConfigType.IGNORE_READ_WRITE_METADATA)
 )
                {
                        //update hop with read meta data
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java 
b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
index fc7a818..d21b16c 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
@@ -1377,7 +1377,7 @@ public class HopRewriteUtils
        public static boolean alwaysRequiresReblock(Hop hop) {
                return (hop instanceof DataOp
                        && ((DataOp)hop).getOp()==OpOpData.PERSISTENTREAD
-                        && 
((DataOp)hop).getInputFormatType()!=FileFormat.BINARY);
+                        && ((DataOp)hop).getFileFormat()!=FileFormat.BINARY);
        }
        
        public static boolean containsOp(ArrayList<Hop> candidates, Class<? 
extends Hop> clazz) {
diff --git 
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java 
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java
index cc24919..c93e6df 100644
--- 
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java
+++ 
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java
@@ -92,8 +92,8 @@ public class RewriteBlockSizeAndReblock extends HopRewriteRule
                        // if block size does not match
                        if( canReblock && 
                                ( (dop.getDataType() == DataType.MATRIX && 
(dop.getBlocksize() != blocksize))
-                               ||(dop.getDataType() == DataType.FRAME && 
OptimizerUtils.isSparkExecutionMode() && 
(dop.getInputFormatType()==FileFormat.TEXT
-                                                 || 
dop.getInputFormatType()==FileFormat.CSV))) )
+                               ||(dop.getDataType() == DataType.FRAME && 
OptimizerUtils.isSparkExecutionMode() && (dop.getFileFormat()==FileFormat.TEXT
+                                                 || 
dop.getFileFormat()==FileFormat.CSV))) )
                        {
                                if( dop.getOp() == OpOpData.PERSISTENTREAD)
                                {
diff --git 
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
 
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
index 6b3c041..6ba8ae1 100644
--- 
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
+++ 
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
@@ -60,17 +60,21 @@ public class RewriteInjectSparkPReadCheckpointing extends 
HopRewriteRule
                if(hop.isVisited())
                        return;
                
-               // The reblocking is performed after transform, and hence 
checkpoint only non-transformed reads.
-               if( (hop instanceof DataOp && 
((DataOp)hop).getOp()==OpOpData.PERSISTENTREAD)
-                       || hop.requiresReblock() )
-               {
+               // Inject checkpoints after persistent reads (for binary 
matrices only), or
+               // after reblocks that cause expensive shuffling. However, 
carefully avoid
+               // unnecessary frame checkpoints (e.g., binary data or csv that 
do not cause 
+               // shuffle) in order to prevent excessive garbage collection 
due to possibly
+               // many small string objects. An alternative would be 
serialized caching.
+               boolean isMatrix = hop.getDataType().isMatrix();
+               boolean isPRead = hop instanceof DataOp  && 
((DataOp)hop).getOp()==OpOpData.PERSISTENTREAD;
+               boolean isFrameException = hop.getDataType().isFrame() && 
isPRead && !((DataOp)hop).getFileFormat().isIJV();
+               
+               if( (isMatrix && isPRead) || (hop.requiresReblock() && 
!isFrameException) ) {
                        //make given hop for checkpointing (w/ default storage 
level)
                        //note: we do not recursively process childs here in 
order to prevent unnecessary checkpoints
                        hop.setRequiresCheckpoint(true);
                }
-               else
-               {
-                       //process childs
+               else {
                        if( hop.getInput() != null ) {
                                //process all childs (prevent concurrent 
modification by index access)
                                for( int i=0; i<hop.getInput().size(); i++ )
diff --git 
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
 
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
index 8f2d9b9..a6cf158 100644
--- 
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
+++ 
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
@@ -152,7 +152,7 @@ public class RewriteSplitDagUnknownCSVRead extends 
StatementBlockRewriteRule
                {
                        DataOp dop = (DataOp) hop;
                        if(    dop.getOp() == OpOpData.PERSISTENTREAD
-                               && dop.getInputFormatType() == FileFormat.CSV
+                               && dop.getFileFormat() == FileFormat.CSV
                                && !dop.dimsKnown()
                                && !HopRewriteUtils.hasOnlyWriteParents(dop, 
true, false) )
                        {
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java 
b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index dafc993..0f994d6 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -1036,13 +1036,13 @@ public class DMLTranslator
                                
                                DataOp ae = (DataOp)processExpression(source, 
target, ids);
                                String formatName = 
os.getExprParam(DataExpression.FORMAT_TYPE).toString();
-                               
ae.setInputFormatType(Expression.convertFormatType(formatName));
+                               
ae.setFileFormat(Expression.convertFormatType(formatName));
 
                                if (ae.getDataType() == DataType.SCALAR ) {
                                        ae.setOutputParams(ae.getDim1(), 
ae.getDim2(), ae.getNnz(), ae.getUpdateType(), -1);
                                }
                                else {
-                                       switch(ae.getInputFormatType()) {
+                                       switch(ae.getFileFormat()) {
                                        case TEXT:
                                        case MM:
                                        case CSV:
@@ -1059,7 +1059,7 @@ public class DMLTranslator
                                                
ae.setOutputParams(ae.getDim1(), ae.getDim2(), -1, ae.getUpdateType(), -1);
                                                break;
                                                default:
-                                                       throw new 
LanguageException("Unrecognized file format: " + ae.getInputFormatType());
+                                                       throw new 
LanguageException("Unrecognized file format: " + ae.getFileFormat());
                                        }
                                }
                                
@@ -1553,7 +1553,7 @@ public class DMLTranslator
                                if (ae instanceof DataOp && ((DataOp) 
ae).getOp() != OpOpData.SQLREAD &&
                                                ((DataOp) ae).getOp() != 
OpOpData.FEDERATED) {
                                        String formatName = 
((DataExpression)source).getVarParam(DataExpression.FORMAT_TYPE).toString();
-                                       
((DataOp)ae).setInputFormatType(Expression.convertFormatType(formatName));
+                                       
((DataOp)ae).setFileFormat(Expression.convertFormatType(formatName));
                                }
                                return ae;
                        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index 9dd4e6d..bb290fc 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -158,7 +158,8 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                        JavaPairRDD<Long,FrameBlock> tmp = in
                                .mapToPair(new RDDTransformApplyFunction(bmeta, 
bomap));
                        JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
FrameRDDConverterUtils
-                               .binaryBlockToMatrixBlock(tmp, mcOut, mcOut);
+                               .binaryBlockToMatrixBlock(tmp, mcOut, mcOut)
+                               .cache(); //best effort cache as reblock not at 
hop level
                        
                        //set output and maintain lineage/output characteristics
                        sec.setRDDHandleForVariable(_outputs.get(0).getName(), 
out);

Reply via email to