This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new 08ce6bc1f5 [MINOR] Double Buffering longer than buffer arrays 08ce6bc1f5 is described below commit 08ce6bc1f5da755b7c0d1bb6dce347ba28711263 Author: Sebastian Baunsgaard <baunsga...@apache.org> AuthorDate: Tue Apr 16 10:51:51 2024 +0200 [MINOR] Double Buffering longer than buffer arrays This commit fixes the double buffering of byte arrays to handle cases where the byte arrays given are larger than the sizes of the buffer. Previous to this commit these arrays made the buffer crash, while this commit fixes it to forward the buffers. Also contained is a bit of documentation in the FastBufferedDataOutput. Closes 2019 --- .../runtime/util/DoubleBufferingOutputStream.java | 67 ++++++++++------------ .../runtime/util/FastBufferedDataOutputStream.java | 32 +++++++---- .../apache/sysds/runtime/util/LocalFileUtils.java | 21 +++---- 3 files changed, 62 insertions(+), 58 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java index 16504e64ee..8d3dd7e994 100644 --- a/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java +++ b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java @@ -16,13 +16,12 @@ * specific language governing permissions and limitations * under the License. */ - + package org.apache.sysds.runtime.util; import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -34,7 +33,7 @@ public class DoubleBufferingOutputStream extends FilterOutputStream { protected Future<?>[] _locks; protected byte[][] _buff; private int _pos; - + public DoubleBufferingOutputStream(OutputStream out) { this(out, 2, 8192); } @@ -43,42 +42,52 @@ public class DoubleBufferingOutputStream extends FilterOutputStream { super(out); if(size <= 0) throw new IllegalArgumentException("Buffer size <= 0."); - if( size%8 != 0 ) + if(size % 8 != 0) throw new IllegalArgumentException("Buffer size not a multiple of 8."); _buff = new byte[num][size]; _locks = new Future<?>[num]; - for(int i=0; i<num; i++) + for(int i = 0; i < num; i++) // fill the futures to avoid null pointers. _locks[i] = ConcurrentUtils.constantFuture(null); } @Override public void write(int b) throws IOException { - throw new IOException("Not supported"); + throw new IOException("Not supported"); } @Override - public void write(byte[] b, int off, int len) - throws IOException - { + public void write(byte[] b, int off, int len) throws IOException { try { synchronized(_buff) { - //block until buffer is free to use + final byte[] b_pos = _buff[_pos]; + // block until buffer is free to use _locks[_pos].get(); - - //copy for asynchronous write because b is reused higher up - System.arraycopy(b, off, _buff[_pos], 0, len); - - //submit write request - _locks[_pos] = _pool.submit(new WriteTask(_buff[_pos], len)); - _pos = (_pos+1) % _buff.length; + if(b_pos.length >= len) { + // copy the block into the buffer. + System.arraycopy(b, off, b_pos, 0, len); + // submit write request guaranteed to be sequential since it is using a single thread. + _locks[_pos] = _pool.submit(() -> writeBuffer(b_pos, 0, len)); + // copy for asynchronous write because b is reused higher up + } + else { + // The given byte array is longer than the buffer. + // This means that the async buffer would overflow and therefore not work. + // To avoid this we simply write the given byte array without a buffer. + // This approach only works if the caller adhere to not modify the byte array given + _locks[_pos] = _pool.submit(() -> writeBuffer(b, off, len)); + // get the task to reduce the risk ( and at least block the current thread) + // to avoid race conditions from callers. + _locks[_pos].get(); + } + _pos = (_pos + 1) % _buff.length; } } catch(Exception ex) { throw new IOException(ex); } } - - public void writeBuffer(byte[] b, int off, int len) { + + private void writeBuffer(byte[] b, int off, int len) { try { out.write(b, off, len); } @@ -91,14 +100,14 @@ public class DoubleBufferingOutputStream extends FilterOutputStream { public void flush() throws IOException { try { synchronized(_buff) { - for(int i=0; i<_buff.length; i++) + for(int i = 0; i < _buff.length; i++) _locks[i].get(); } + out.flush(); } catch(Exception ex) { throw new IOException(ex); } - out.flush(); } @Override @@ -106,20 +115,4 @@ public class DoubleBufferingOutputStream extends FilterOutputStream { _pool.shutdown(); super.close(); } - - private class WriteTask implements Callable<Object> { - private final byte[] _b; - private final int _len; - - protected WriteTask(byte[] buff, int len) { - _b = buff; - _len = len; - } - - @Override - public Object call() { - writeBuffer(_b, 0, _len); - return null; - } - } } diff --git a/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java b/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java index 1804bc78e0..da9d380aa7 100644 --- a/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java +++ b/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java @@ -42,8 +42,11 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlockDataOutput; */ public class FastBufferedDataOutputStream extends FilterOutputStream implements DataOutput, MatrixBlockDataOutput { + /** The buffer to copy bytes into before writing out */ protected byte[] _buff; + /** The maximum size of the buffer */ protected int _bufflen; + /** The current fill amount of the buffer */ protected int _count; public FastBufferedDataOutputStream(OutputStream out) { @@ -54,7 +57,7 @@ public class FastBufferedDataOutputStream extends FilterOutputStream implements super(out); if(size <= 0) throw new IllegalArgumentException("Buffer size <= 0."); - if( size%8 != 0 ) + if(size % 8 != 0) throw new IllegalArgumentException("Buffer size not a multiple of 8."); _buff = new byte[size]; _bufflen = size; @@ -68,19 +71,21 @@ public class FastBufferedDataOutputStream extends FilterOutputStream implements } @Override - public void write(byte[] b, int off, int len) - throws IOException - { - if (len >= _bufflen) { - flushBuffer(); + public void write(byte[] b, int off, int len) throws IOException { + if(len > _bufflen) { + // If we write a byte array that is larger than the buffer + flushBuffer(); // flush the buffer first and + // forward the array directly out.write(b, off, len); - return; } - if (len > _bufflen - _count) { - flushBuffer(); + else{ + if (len > _bufflen - _count) + // if the write is larger than what is left in the buffer. + flushBuffer(); + + System.arraycopy(b, off, _buff, _count, len); + _count += len; } - System.arraycopy(b, off, _buff, _count, len); - _count += len; } @Override @@ -89,6 +94,11 @@ public class FastBufferedDataOutputStream extends FilterOutputStream implements out.flush(); } + /** + * Flush the buffer to empty the current content and reset the counting pointer to 0 + * + * @throws IOException Throws an IOException on errors. + */ private void flushBuffer() throws IOException { if(_count > 0) { out.write(_buff, 0, _count); diff --git a/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java b/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java index 5ab5a76e22..2b4f56f3e9 100644 --- a/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java +++ b/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java @@ -233,23 +233,24 @@ public class LocalFileUtils * @param doubleBuffering overlay serialization and I/O * @throws IOException if IOException occurs */ - public static void writeWritableToLocal(String fname, Writable mb, boolean doubleBuffering) - throws IOException - { - OutputStream fos = new FileOutputStream( fname ); - if( doubleBuffering ) - fos = new DoubleBufferingOutputStream(fos, 2, BUFFER_SIZE); - FastBufferedDataOutputStream out = new FastBufferedDataOutputStream(fos, BUFFER_SIZE); - + public static void writeWritableToLocal(String fname, Writable mb, boolean doubleBuffering) throws IOException { + final FastBufferedDataOutputStream out = // + new FastBufferedDataOutputStream(getOut(fname, doubleBuffering), BUFFER_SIZE); try { mb.write(out); } finally { - IOUtilFunctions.closeSilently(out); //incl double buffering - IOUtilFunctions.closeSilently(fos); + IOUtilFunctions.closeSilently(out); } } + private static OutputStream getOut(String fname, boolean doubleBuffering) throws IOException{ + if(doubleBuffering) + return new DoubleBufferingOutputStream(new FileOutputStream(fname), 2, BUFFER_SIZE); + else + return new FileOutputStream(fname); + } + public static void writeByteArrayToLocal( String fname, byte[] data ) throws IOException {