Baunsgaard commented on PR #2019:
URL: https://github.com/apache/systemds/pull/2019#issuecomment-2059085091
In more detail:
```java
public static void writeWritableToLocal(String fname, Writable mb, boolean
doubleBuffering)
throws IOException
{
OutputStream fos = new FileOutputStream( fname );
if( doubleBuffering )
fos = new DoubleBufferingOutputStream(fos, 2,
BUFFER_SIZE);
FastBufferedDataOutputStream out = new
FastBufferedDataOutputStream(fos, BUFFER_SIZE);
try {
mb.write(out);
}
finally {
IOUtilFunctions.closeSilently(out); //incl double
buffering
IOUtilFunctions.closeSilently(fos);
}
}
```
When we enable the DoubleBuffering it writes from a
FastBufferedDataOutputStream.
the FastBufferedDataOutputStream forwards byte[]'s if it is larger than it's
buffer size of 8192.
```java
public void write(byte[] b, int off, int len) throws IOException {
if(len > _bufflen) {
// If we write a byte array that is larger than the buffer
flushBuffer(); // flush the buffer first and
// forward the array directly
out.write(b, off, len);
}
else{
...
}
}
```
Therefore the DoubleBufferingOutputStream can encounter inputs that are
longer than it's allocated buffer.
to handle it i added a second case for it:
```java@Override
public void write(byte[] b, int off, int len) throws IOException {
try {
synchronized(_buff) {
final byte[] b_pos = _buff[_pos];
// block until buffer is free to use
_locks[_pos].get();
if(b_pos.length > len) {
// copy the block into the buffer.
System.arraycopy(b, off, b_pos, 0, len);
// submit write request guaranteed to be
sequential since it is using a single thread.
_locks[_pos] = _pool.submit(() ->
writeBuffer(b_pos, 0, len));
// copy for asynchronous write because b is
reused higher up
}
else {
// The given byte array is longer than the
buffer.
// This means that the async buffer would
overflow and therefore not work.
// To avoid this we simply write the given byte
array without a buffer.
// This approach only works if the caller
adhere to not modify the byte array given
_locks[_pos] = _pool.submit(() ->
writeBuffer(b, off, len));
}
_pos = (_pos + 1) % _buff.length;
}
}
catch(Exception ex) {
throw new IOException(ex);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]