Baunsgaard commented on PR #2019: URL: https://github.com/apache/systemds/pull/2019#issuecomment-2059085091
In more detail: ```java public static void writeWritableToLocal(String fname, Writable mb, boolean doubleBuffering) throws IOException { OutputStream fos = new FileOutputStream( fname ); if( doubleBuffering ) fos = new DoubleBufferingOutputStream(fos, 2, BUFFER_SIZE); FastBufferedDataOutputStream out = new FastBufferedDataOutputStream(fos, BUFFER_SIZE); try { mb.write(out); } finally { IOUtilFunctions.closeSilently(out); //incl double buffering IOUtilFunctions.closeSilently(fos); } } ``` When we enable the DoubleBuffering it writes from a FastBufferedDataOutputStream. the FastBufferedDataOutputStream forwards byte[]'s if it is larger than it's buffer size of 8192. ```java public void write(byte[] b, int off, int len) throws IOException { if(len > _bufflen) { // If we write a byte array that is larger than the buffer flushBuffer(); // flush the buffer first and // forward the array directly out.write(b, off, len); } else{ ... } } ``` Therefore the DoubleBufferingOutputStream can encounter inputs that are longer than it's allocated buffer. to handle it i added a second case for it: ```java@Override public void write(byte[] b, int off, int len) throws IOException { try { synchronized(_buff) { final byte[] b_pos = _buff[_pos]; // block until buffer is free to use _locks[_pos].get(); if(b_pos.length > len) { // copy the block into the buffer. System.arraycopy(b, off, b_pos, 0, len); // submit write request guaranteed to be sequential since it is using a single thread. _locks[_pos] = _pool.submit(() -> writeBuffer(b_pos, 0, len)); // copy for asynchronous write because b is reused higher up } else { // The given byte array is longer than the buffer. // This means that the async buffer would overflow and therefore not work. // To avoid this we simply write the given byte array without a buffer. // This approach only works if the caller adhere to not modify the byte array given _locks[_pos] = _pool.submit(() -> writeBuffer(b, off, len)); } _pos = (_pos + 1) % _buff.length; } } catch(Exception ex) { throw new IOException(ex); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@systemds.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org