This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 1fa2ebc7ba [MINOR] Frame Shallow Update
1fa2ebc7ba is described below
commit 1fa2ebc7bad9e6bb8006f70c8ae01a00cde74d5d
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Fri Apr 5 17:01:47 2024 +0200
[MINOR] Frame Shallow Update
This commit make minor modifications to the
shallow handling of Frames.
One instance is fast abort of isShallowSerialize.
Closes #2013
---
.../sysds/runtime/frame/data/FrameBlock.java | 61 ++++++++++++----------
1 file changed, 32 insertions(+), 29 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
index 3efafbb30b..312f88ca7d 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
@@ -106,6 +106,7 @@ public class FrameBlock implements CacheBlock<FrameBlock>,
Externalizable {
/** Locks on the columns not tied to the columns objects. */
private SoftReference<Object[]> _columnLocks = null;
+ /** Materialized number of rows in this FrameBlock */
private int _nRow = 0;
/** Cached size in memory to avoid repeated scans of string columns */
@@ -756,7 +757,8 @@ public class FrameBlock implements CacheBlock<FrameBlock>,
Externalizable {
public void write(DataOutput out) throws IOException {
final boolean isDefaultMeta = isColNamesDefault() &&
isColumnMetadataDefault();
// write header (rows, cols, default)
- out.writeInt(getNumRows());
+ final int nRow = getNumRows();
+ out.writeInt(nRow);
out.writeInt(getNumColumns());
out.writeBoolean(isDefaultMeta);
// write columns (value type, data)
@@ -767,7 +769,7 @@ public class FrameBlock implements CacheBlock<FrameBlock>,
Externalizable {
out.writeUTF(getColumnName(j));
_colmeta[j].write(out);
}
- if(type >= 0) // if allocated write column data
+ if(type >= 0 && nRow > 0) // if allocated write column
data
_coldata[j].write(out);
}
}
@@ -796,6 +798,8 @@ public class FrameBlock implements CacheBlock<FrameBlock>,
Externalizable {
isDefaultMeta ? null : new String[numCols]; // if meta
is default allocate on demand
_colmeta = (_colmeta != null && _colmeta.length == numCols) ?
_colmeta : new ColumnMetadata[numCols];
_coldata = (_coldata != null && _coldata.length == numCols) ?
_coldata : new Array[numCols];
+ if(_nRow == 0)
+ _coldata = null;
// read columns (value type, meta, data)
for(int j = 0; j < numCols; j++) {
byte type = in.readByte();
@@ -807,7 +811,7 @@ public class FrameBlock implements CacheBlock<FrameBlock>,
Externalizable {
else
_colmeta[j] = new ColumnMetadata(); // must be
allocated.
- if(type >= 0) // if in allocated column data then read
it
+ if(type >= 0 && _nRow > 0) // if in allocated column
data then read it
_coldata[j] = ArrayFactory.read(in, _nRow);
}
_msize = -1;
@@ -815,30 +819,12 @@ public class FrameBlock implements
CacheBlock<FrameBlock>, Externalizable {
@Override
public void writeExternal(ObjectOutput out) throws IOException {
-
- // if((out instanceof ObjectOutputStream)){
- // ObjectOutputStream oos = (ObjectOutputStream)out;
- // FastBufferedDataOutputStream fos = new
FastBufferedDataOutputStream(oos);
- // write(fos); //note: cannot close fos as this would
close oos
- // fos.flush();
- // }
- // else{
- write(out);
- // }
+ write(out);
}
@Override
public void readExternal(ObjectInput in) throws IOException {
- // if(in instanceof ObjectInputStream) {
- // // fast deserialize of dense/sparse blocks
- // ObjectInputStream ois = (ObjectInputStream) in;
- // FastBufferedDataInputStream fis = new
FastBufferedDataInputStream(ois);
- // readFields(fis); // note: cannot close fos as this
would close oos
- // }
- // else {
- // redirect deserialization to writable impl
- readFields(in);
- // }
+ readFields(in);
}
@Override
@@ -878,7 +864,7 @@ public class FrameBlock implements CacheBlock<FrameBlock>,
Externalizable {
for(int j = 0; j < clen; j++)
size +=
ArrayFactory.getInMemorySize(_schema[j], rlen, true);
else {// allocated
- if(rlen > 1000 && clen > 10 &&
ConfigurationManager.isParallelIOEnabled()) {
+ if((rlen > 1000 || clen > 10 )&&
ConfigurationManager.isParallelIOEnabled()) {
final ExecutorService pool =
CommonThreadPool.get();
try {
List<Future<Long>> f = new
ArrayList<>(clen);
@@ -893,6 +879,7 @@ public class FrameBlock implements CacheBlock<FrameBlock>,
Externalizable {
}
catch(InterruptedException | ExecutionException
e) {
LOG.error(e);
+ size = 0;
for(Array<?> aa : _coldata)
size += aa.getInMemorySize();
}
@@ -937,10 +924,10 @@ public class FrameBlock implements
CacheBlock<FrameBlock>, Externalizable {
public boolean isShallowSerialize(boolean inclConvert) {
// shallow serialize if non-string schema because a frame block
// is always dense but strings have large array overhead per
cell
- boolean ret = true;
- for(int j = 0; j < _schema.length && ret; j++)
- ret &= _coldata[j].isShallowSerialize();
- return ret;
+ for(int j = 0; j < _schema.length; j++)
+ if(!_coldata[j].isShallowSerialize())
+ return false;
+ return true;
}
@Override
@@ -1217,6 +1204,22 @@ public class FrameBlock implements
CacheBlock<FrameBlock>, Externalizable {
_msize = -1;
}
+ public FrameBlock copyShallow(){
+ FrameBlock ret = new FrameBlock();
+ ret._nRow = _nRow;
+ ret._msize = _msize;
+ final int nCol = getNumColumns();
+ if(_coldata != null)
+ ret._coldata = Arrays.copyOf(_coldata, nCol);
+ if(_colnames != null)
+ ret._colnames = Arrays.copyOf(_colnames, nCol);
+ if(_colmeta != null)
+ ret._colmeta = Arrays.copyOf(_colmeta, nCol);
+ if(_schema != null)
+ ret._schema = Arrays.copyOf(_schema, nCol);
+ return ret;
+ }
+
/**
* Copy src matrix into the index range of the existing current matrix.
*
@@ -1358,7 +1361,7 @@ public class FrameBlock implements
CacheBlock<FrameBlock>, Externalizable {
}
public final FrameBlock detectSchema(int k) {
- return FrameLibDetectSchema.detectSchema(this, k);
+ return FrameLibDetectSchema.detectSchema(this, 0.01, k);
}
public final FrameBlock detectSchema(double sampleFraction, int k) {