This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new ef0cc71 [SYSTEMDS-2814] Fix frame RDD caching logic (avoid excessive
GC)
ef0cc71 is described below
commit ef0cc71a75d32c977889bde5191c61617dbb0927
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Jan 29 23:10:02 2021 +0100
[SYSTEMDS-2814] Fix frame RDD caching logic (avoid excessive GC)
This patch fixes the partially incorrect caching logic for distributed
frame RDDs. So far, we always injected checkpoint instructions (caching
directives) after persistent reads or whenever a reblock is required.
However, for example a csv frame reblock after persistent read but
before transformencode does not cause shuffle but caching it can cause
severe garbage collection overhead due to many string objects.
On the 1TB criteo dataset, this problem even lead to aborted jobs
because GC pauses were so long that the heartbeat interval was exeeded
and thus, executors got restarted.
---
src/main/java/org/apache/sysds/common/Types.java | 2 +-
src/main/java/org/apache/sysds/hops/DataOp.java | 20 ++++++++++----------
src/main/java/org/apache/sysds/hops/Hop.java | 2 +-
.../java/org/apache/sysds/hops/OptimizerUtils.java | 6 +++---
.../org/apache/sysds/hops/recompile/Recompiler.java | 2 +-
.../apache/sysds/hops/rewrite/HopRewriteUtils.java | 2 +-
.../hops/rewrite/RewriteBlockSizeAndReblock.java | 4 ++--
.../RewriteInjectSparkPReadCheckpointing.java | 18 +++++++++++-------
.../hops/rewrite/RewriteSplitDagUnknownCSVRead.java | 2 +-
.../java/org/apache/sysds/parser/DMLTranslator.java | 8 ++++----
...MultiReturnParameterizedBuiltinSPInstruction.java | 3 ++-
11 files changed, 37 insertions(+), 32 deletions(-)
diff --git a/src/main/java/org/apache/sysds/common/Types.java
b/src/main/java/org/apache/sysds/common/Types.java
index ed651b9..6c179cb 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -500,7 +500,7 @@ public class Types
FEDERATED, // A federated matrix
PROTO; // protocol buffer representation
- public boolean isIJVFormat() {
+ public boolean isIJV() {
return this == TEXT || this == MM;
}
diff --git a/src/main/java/org/apache/sysds/hops/DataOp.java
b/src/main/java/org/apache/sysds/hops/DataOp.java
index 114006f..e3467c5 100644
--- a/src/main/java/org/apache/sysds/hops/DataOp.java
+++ b/src/main/java/org/apache/sysds/hops/DataOp.java
@@ -98,7 +98,7 @@ public class DataOp extends Hop {
setNnz(nnz);
if( dop == OpOpData.TRANSIENTREAD )
- setInputFormatType(FileFormat.BINARY);
+ setFileFormat(FileFormat.BINARY);
}
public DataOp(String l, DataType dt, ValueType vt, OpOpData dop,
@@ -136,7 +136,7 @@ public class DataOp extends Hop {
index++;
}
if (dop == OpOpData.TRANSIENTREAD ){
- setInputFormatType(FileFormat.BINARY);
+ setFileFormat(FileFormat.BINARY);
}
if( params.containsKey(DataExpression.READROWPARAM) )
@@ -158,7 +158,7 @@ public class DataOp extends Hop {
_fileName = fname;
if (dop == OpOpData.TRANSIENTWRITE || dop ==
OpOpData.FUNCTIONOUTPUT )
- setInputFormatType(FileFormat.BINARY);
+ setFileFormat(FileFormat.BINARY);
}
/**
@@ -196,7 +196,7 @@ public class DataOp extends Hop {
}
if (dop == OpOpData.TRANSIENTWRITE)
- setInputFormatType(FileFormat.BINARY);
+ setFileFormat(FileFormat.BINARY);
}
/** Check for N (READ) or N+1 (WRITE) inputs. */
@@ -280,27 +280,27 @@ public class DataOp extends Hop {
{
case TRANSIENTREAD:
l = new Data(_op, null, inputLops, getName(),
null,
- getDataType(), getValueType(),
getInputFormatType());
+ getDataType(), getValueType(),
getFileFormat());
setOutputDimensions(l);
break;
case PERSISTENTREAD:
l = new Data(_op, null, inputLops, getName(),
null,
- getDataType(), getValueType(),
getInputFormatType());
+ getDataType(), getValueType(),
getFileFormat());
l.getOutputParameters().setDimensions(getDim1(), getDim2(), _inBlocksize,
getNnz(), getUpdateType());
break;
case PERSISTENTWRITE:
case FUNCTIONOUTPUT:
l = new Data(_op,
getInput().get(0).constructLops(), inputLops, getName(), null,
- getDataType(), getValueType(),
getInputFormatType());
+ getDataType(), getValueType(),
getFileFormat());
((Data)l).setExecType(et);
setOutputDimensions(l);
break;
case TRANSIENTWRITE:
l = new Data(_op,
getInput().get(0).constructLops(), inputLops, getName(), null,
- getDataType(), getValueType(),
getInputFormatType());
+ getDataType(), getValueType(),
getFileFormat());
setOutputDimensions(l);
break;
@@ -327,11 +327,11 @@ public class DataOp extends Hop {
}
- public void setInputFormatType(FileFormat ft) {
+ public void setFileFormat(FileFormat ft) {
_inFormat = ft;
}
- public FileFormat getInputFormatType() {
+ public FileFormat getFileFormat() {
return _inFormat;
}
diff --git a/src/main/java/org/apache/sysds/hops/Hop.java
b/src/main/java/org/apache/sysds/hops/Hop.java
index 1eb319f..5be1a63 100644
--- a/src/main/java/org/apache/sysds/hops/Hop.java
+++ b/src/main/java/org/apache/sysds/hops/Hop.java
@@ -311,7 +311,7 @@ public abstract class Hop implements ParseInfo {
{
if( this instanceof DataOp // CSV
&& ((DataOp)this).getOp() ==
OpOpData.PERSISTENTREAD
- && ((DataOp)this).getInputFormatType()
== FileFormat.CSV )
+ && ((DataOp)this).getFileFormat() ==
FileFormat.CSV )
{
reblock = new CSVReBlock( input,
getBlocksize(),
getDataType(), getValueType(),
et);
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index c05eaa3..dce33f2 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -775,7 +775,7 @@ public class OptimizerUtils
public static long estimateSizeTextOutput(long rows, long cols, long
nnz, FileFormat fmt) {
long bsize = MatrixBlock.estimateSizeOnDisk(rows, cols, nnz);
- if( fmt.isIJVFormat() )
+ if( fmt.isIJV() )
return bsize * 3;
else if( fmt == FileFormat.LIBSVM )
return Math.round(bsize * 2.5);
@@ -941,8 +941,8 @@ public class OptimizerUtils
ret &= ( p.getExecType()==ExecType.CP
||(p instanceof AggBinaryOp &&
allowsToFilterEmptyBlockOutputs(p) )
||(HopRewriteUtils.isReorg(p, ReOrgOp.RESHAPE,
ReOrgOp.TRANS) && allowsToFilterEmptyBlockOutputs(p) )
- ||(HopRewriteUtils.isData(p,
OpOpData.PERSISTENTWRITE) && ((DataOp)p).getInputFormatType()==FileFormat.TEXT))
- && !(p instanceof FunctionOp || (p instanceof
DataOp && ((DataOp)p).getInputFormatType()!=FileFormat.TEXT) ); //no function
call or transient write
+ ||(HopRewriteUtils.isData(p,
OpOpData.PERSISTENTWRITE) && ((DataOp)p).getFileFormat()==FileFormat.TEXT))
+ && !(p instanceof FunctionOp || (p instanceof
DataOp && ((DataOp)p).getFileFormat()!=FileFormat.TEXT) ); //no function call
or transient write
}
return ret;
}
diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index c57fbc8..3b15b44 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -1344,7 +1344,7 @@ public class Recompiler
}
//special case for persistent reads with unknown size
(read-after-write)
else if( HopRewriteUtils.isData(hop, OpOpData.PERSISTENTREAD)
- && !hop.dimsKnown() &&
((DataOp)hop).getInputFormatType()!=FileFormat.CSV
+ && !hop.dimsKnown() &&
((DataOp)hop).getFileFormat()!=FileFormat.CSV
&&
!ConfigurationManager.getCompilerConfigFlag(ConfigType.IGNORE_READ_WRITE_METADATA)
)
{
//update hop with read meta data
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
index fc7a818..d21b16c 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
@@ -1377,7 +1377,7 @@ public class HopRewriteUtils
public static boolean alwaysRequiresReblock(Hop hop) {
return (hop instanceof DataOp
&& ((DataOp)hop).getOp()==OpOpData.PERSISTENTREAD
- &&
((DataOp)hop).getInputFormatType()!=FileFormat.BINARY);
+ && ((DataOp)hop).getFileFormat()!=FileFormat.BINARY);
}
public static boolean containsOp(ArrayList<Hop> candidates, Class<?
extends Hop> clazz) {
diff --git
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java
index cc24919..c93e6df 100644
---
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java
+++
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteBlockSizeAndReblock.java
@@ -92,8 +92,8 @@ public class RewriteBlockSizeAndReblock extends HopRewriteRule
// if block size does not match
if( canReblock &&
( (dop.getDataType() == DataType.MATRIX &&
(dop.getBlocksize() != blocksize))
- ||(dop.getDataType() == DataType.FRAME &&
OptimizerUtils.isSparkExecutionMode() &&
(dop.getInputFormatType()==FileFormat.TEXT
- ||
dop.getInputFormatType()==FileFormat.CSV))) )
+ ||(dop.getDataType() == DataType.FRAME &&
OptimizerUtils.isSparkExecutionMode() && (dop.getFileFormat()==FileFormat.TEXT
+ ||
dop.getFileFormat()==FileFormat.CSV))) )
{
if( dop.getOp() == OpOpData.PERSISTENTREAD)
{
diff --git
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
index 6b3c041..6ba8ae1 100644
---
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
+++
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
@@ -60,17 +60,21 @@ public class RewriteInjectSparkPReadCheckpointing extends
HopRewriteRule
if(hop.isVisited())
return;
- // The reblocking is performed after transform, and hence
checkpoint only non-transformed reads.
- if( (hop instanceof DataOp &&
((DataOp)hop).getOp()==OpOpData.PERSISTENTREAD)
- || hop.requiresReblock() )
- {
+ // Inject checkpoints after persistent reads (for binary
matrices only), or
+ // after reblocks that cause expensive shuffling. However,
carefully avoid
+ // unnecessary frame checkpoints (e.g., binary data or csv that
do not cause
+ // shuffle) in order to prevent excessive garbage collection
due to possibly
+ // many small string objects. An alternative would be
serialized caching.
+ boolean isMatrix = hop.getDataType().isMatrix();
+ boolean isPRead = hop instanceof DataOp &&
((DataOp)hop).getOp()==OpOpData.PERSISTENTREAD;
+ boolean isFrameException = hop.getDataType().isFrame() &&
isPRead && !((DataOp)hop).getFileFormat().isIJV();
+
+ if( (isMatrix && isPRead) || (hop.requiresReblock() &&
!isFrameException) ) {
//make given hop for checkpointing (w/ default storage
level)
//note: we do not recursively process childs here in
order to prevent unnecessary checkpoints
hop.setRequiresCheckpoint(true);
}
- else
- {
- //process childs
+ else {
if( hop.getInput() != null ) {
//process all childs (prevent concurrent
modification by index access)
for( int i=0; i<hop.getInput().size(); i++ )
diff --git
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
index 8f2d9b9..a6cf158 100644
---
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
+++
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
@@ -152,7 +152,7 @@ public class RewriteSplitDagUnknownCSVRead extends
StatementBlockRewriteRule
{
DataOp dop = (DataOp) hop;
if( dop.getOp() == OpOpData.PERSISTENTREAD
- && dop.getInputFormatType() == FileFormat.CSV
+ && dop.getFileFormat() == FileFormat.CSV
&& !dop.dimsKnown()
&& !HopRewriteUtils.hasOnlyWriteParents(dop,
true, false) )
{
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index dafc993..0f994d6 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -1036,13 +1036,13 @@ public class DMLTranslator
DataOp ae = (DataOp)processExpression(source,
target, ids);
String formatName =
os.getExprParam(DataExpression.FORMAT_TYPE).toString();
-
ae.setInputFormatType(Expression.convertFormatType(formatName));
+
ae.setFileFormat(Expression.convertFormatType(formatName));
if (ae.getDataType() == DataType.SCALAR ) {
ae.setOutputParams(ae.getDim1(),
ae.getDim2(), ae.getNnz(), ae.getUpdateType(), -1);
}
else {
- switch(ae.getInputFormatType()) {
+ switch(ae.getFileFormat()) {
case TEXT:
case MM:
case CSV:
@@ -1059,7 +1059,7 @@ public class DMLTranslator
ae.setOutputParams(ae.getDim1(), ae.getDim2(), -1, ae.getUpdateType(), -1);
break;
default:
- throw new
LanguageException("Unrecognized file format: " + ae.getInputFormatType());
+ throw new
LanguageException("Unrecognized file format: " + ae.getFileFormat());
}
}
@@ -1553,7 +1553,7 @@ public class DMLTranslator
if (ae instanceof DataOp && ((DataOp)
ae).getOp() != OpOpData.SQLREAD &&
((DataOp) ae).getOp() !=
OpOpData.FEDERATED) {
String formatName =
((DataExpression)source).getVarParam(DataExpression.FORMAT_TYPE).toString();
-
((DataOp)ae).setInputFormatType(Expression.convertFormatType(formatName));
+
((DataOp)ae).setFileFormat(Expression.convertFormatType(formatName));
}
return ae;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index 9dd4e6d..bb290fc 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -158,7 +158,8 @@ public class MultiReturnParameterizedBuiltinSPInstruction
extends ComputationSPI
JavaPairRDD<Long,FrameBlock> tmp = in
.mapToPair(new RDDTransformApplyFunction(bmeta,
bomap));
JavaPairRDD<MatrixIndexes,MatrixBlock> out =
FrameRDDConverterUtils
- .binaryBlockToMatrixBlock(tmp, mcOut, mcOut);
+ .binaryBlockToMatrixBlock(tmp, mcOut, mcOut)
+ .cache(); //best effort cache as reblock not at
hop level
//set output and maintain lineage/output characteristics
sec.setRDDHandleForVariable(_outputs.get(0).getName(),
out);