This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new b6c076d [SYSTEMDS-3147,3160,3161] Fix binary write block-size handling
b6c076d is described below
commit b6c076db6a812ce16f323fda4f0847660910c43f
Author: Matthias Boehm <[email protected]>
AuthorDate: Tue Oct 12 23:12:32 2021 +0200
[SYSTEMDS-3147,3160,3161] Fix binary write block-size handling
This patch fixes long standing issues with writing binary matrices in
non-default block sizes for special cases such as forced singlenode
execution (no reblocks), and direct writes of persistent reads.
For example, reading the amazon books reviews dataset in binary format
(blocksize 16K) and directly writing into sparse text in singlenode
execution, took the 16K input object (never read), set the blocksize to
-1 (for text), and the binary read (triggered on the persistent write)
used a default block size of 1K (for robustness) and thus misplaced
values or ran index out of bounds.
Instead of modifying the metadata in-place before the write, or creating
a shallow copy like in-memory reblock would do, we now properly manage
this meta data during the write process itself and leave the input meta
data untouched. In contrast to the in-memory reblock, this approach
avoids unnecessary evictions of large input matrices.
---
.../runtime/controlprogram/caching/CacheableData.java | 15 +++++++--------
.../runtime/controlprogram/caching/MatrixObject.java | 5 +++--
.../runtime/instructions/cp/VariableCPInstruction.java | 12 +++++-------
.../org/apache/sysds/runtime/io/FileFormatProperties.java | 13 +++++++++++++
.../sysds/runtime/io/FileFormatPropertiesLIBSVM.java | 3 ++-
5 files changed, 30 insertions(+), 18 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index e44d061..773c92c 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -799,10 +799,6 @@ public abstract class CacheableData<T extends CacheBlock>
extends Data
exportData(fName, outputFormat, -1, formatProperties);
}
- public synchronized void exportData (String fName, String outputFormat,
int replication, FileFormatProperties formatProperties) {
- exportData(fName, outputFormat, replication, formatProperties,
null);
- }
-
/**
* Synchronized because there might be parallel threads (parfor local)
that
* access the same object (in case it was created before the loop).
@@ -818,9 +814,8 @@ public abstract class CacheableData<T extends CacheBlock>
extends Data
* @param outputFormat format
* @param replication ?
* @param formatProperties file format properties
- * @param opcode instruction opcode if available
*/
- public synchronized void exportData (String fName, String outputFormat,
int replication, FileFormatProperties formatProperties, String opcode) {
+ public synchronized void exportData (String fName, String outputFormat,
int replication, FileFormatProperties formatProperties) {
if( LOG.isTraceEnabled() )
LOG.trace("Export data "+hashCode()+" "+fName);
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -850,11 +845,13 @@ public abstract class CacheableData<T extends CacheBlock>
extends Data
setHDFSFileExists(true);
//check for common file scheme (otherwise no copy/rename)
+ int blen = (formatProperties == null) ?
+ ConfigurationManager.getBlocksize() :
formatProperties.getBlocksize();
boolean eqScheme = IOUtilFunctions.isSameFileScheme(
new Path(_hdfsFileName), new Path(fName));
boolean eqFormat = isEqualOutputFormat(outputFormat);
- boolean eqBlksize = (outputFormat == null ||
outputFormat.equals("binary"))
- && ConfigurationManager.getBlocksize() !=
getBlocksize();
+ boolean eqBlksize = (getBlocksize() != blen)
+ && (outputFormat == null ||
outputFormat.equals("binary"));
//actual export (note: no direct transfer of local copy in
order to ensure blocking (and hence, parallelism))
if( isDirty() || !eqScheme || isFederated() ||
@@ -1094,6 +1091,8 @@ public abstract class CacheableData<T extends CacheBlock>
extends Data
if ( fmt != FileFormat.MM ) {
// Get the dimension information from the metadata
stored within MatrixObject
DataCharacteristics dc = iimd.getDataCharacteristics();
+ if( formatProperties != null &&
formatProperties.knownBlocksize() )
+
dc.setBlocksize(formatProperties.getBlocksize());
// when outputFormat is binaryblock, make sure that
matrixCharacteristics has correct blocking dimensions
// note: this is only required if singlenode (due to
binarycell default)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index 3194aa8..4da0675 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -419,6 +419,7 @@ public class MatrixObject extends
CacheableData<MatrixBlock> {
long clen = dims[1];
MetaDataFormat iimd = (MetaDataFormat) _metaData;
DataCharacteristics mc = iimd.getDataCharacteristics();
+ System.out.println(mc);
long begin = 0;
if(LOG.isTraceEnabled()) {
@@ -549,8 +550,8 @@ public class MatrixObject extends
CacheableData<MatrixBlock> {
DataCharacteristics mc = iimd.getDataCharacteristics();
// Write the matrix to HDFS in requested format
FileFormat fmt = (ofmt != null ?
FileFormat.safeValueOf(ofmt) : iimd.getFileFormat());
- mc = (fmt == FileFormat.BINARY && mc.getBlocksize() >
0) ? mc : new MatrixCharacteristics(mc)
-
.setBlocksize(ConfigurationManager.getBlocksize());
+ if( fmt == FileFormat.BINARY && fprop != null )
+ mc = new
MatrixCharacteristics(mc).setBlocksize(fprop.getBlocksize());
DataConverter.writeMatrixToHDFS(_data, fname, fmt, mc,
rep, fprop, _diag);
if(LOG.isTraceEnabled())
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index fdd7bf5..3965656 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -475,10 +475,10 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
return new
VariableCPInstruction(VariableOperationCode.CreateVariable,
in1, in2, in3, iimd, updateType,
fmtProperties, schema, opcode, str);
}
-
else {
return new
VariableCPInstruction(VariableOperationCode.CreateVariable, in1, in2, in3,
iimd, updateType, schema, opcode, str);
}
+
case AssignVariable:
in1 = new CPOperand(parts[1]);
in2 = new CPOperand(parts[2]);
@@ -997,17 +997,15 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
writeMMFile(ec, fname);
else if( fmt == FileFormat.CSV )
writeCSVFile(ec, fname);
- else if(fmt == FileFormat.LIBSVM)
- writeLIBSVMFile(ec, fname);
+ else if(fmt == FileFormat.LIBSVM)
+ writeLIBSVMFile(ec, fname);
else if(fmt == FileFormat.HDF5)
writeHDF5File(ec, fname);
else {
- // Default behavior
+ // Default behavior (text, binary)
MatrixObject mo =
ec.getMatrixObject(getInput1().getName());
int blen =
Integer.parseInt(getInput4().getName());
- if( mo.getBlocksize() != blen )
-
mo.getMetaData().getDataCharacteristics().setBlocksize(blen);
- mo.exportData(fname, fmtStr, _formatProperties);
+ mo.exportData(fname, fmtStr, new
FileFormatProperties(blen));
}
// Set privacy constraint of write instruction to the
same as that of the input
setPrivacyConstraint(ec.getMatrixObject(getInput1().getName()).getPrivacyConstraint());
diff --git
a/src/main/java/org/apache/sysds/runtime/io/FileFormatProperties.java
b/src/main/java/org/apache/sysds/runtime/io/FileFormatProperties.java
index fe51f10..178f571 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FileFormatProperties.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FileFormatProperties.java
@@ -22,9 +22,22 @@ package org.apache.sysds.runtime.io;
public class FileFormatProperties
{
private String description;
+ private final int _blen;
public FileFormatProperties() {
+ this(-1);
+ }
+
+ public FileFormatProperties(int blen) {
+ _blen = blen;
+ }
+
+ public int getBlocksize() {
+ return _blen;
+ }
+ public boolean knownBlocksize() {
+ return _blen != -1;
}
public String getDescription() {
diff --git
a/src/main/java/org/apache/sysds/runtime/io/FileFormatPropertiesLIBSVM.java
b/src/main/java/org/apache/sysds/runtime/io/FileFormatPropertiesLIBSVM.java
index 7ebb455..05afda7 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FileFormatPropertiesLIBSVM.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FileFormatPropertiesLIBSVM.java
@@ -79,7 +79,8 @@ public class FileFormatPropertiesLIBSVM extends
FileFormatProperties implements
return sparse;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" delim " + delim);
sb.append(" indexDelim " + indexDelim);