Baunsgaard commented on PR #2019:
URL: https://github.com/apache/systemds/pull/2019#issuecomment-2059085091

   In more detail:
   
   ```java
   
   public static void writeWritableToLocal(String fname, Writable mb, boolean 
doubleBuffering)
                throws IOException
        {
                OutputStream fos = new FileOutputStream( fname );
                if( doubleBuffering )
                        fos = new DoubleBufferingOutputStream(fos, 2, 
BUFFER_SIZE);
                FastBufferedDataOutputStream out = new 
FastBufferedDataOutputStream(fos, BUFFER_SIZE);
                
                try {
                        mb.write(out);
                }
                finally {
                        IOUtilFunctions.closeSilently(out); //incl double 
buffering
                        IOUtilFunctions.closeSilently(fos);
                }
        }
   
   ```
   
   When we enable the DoubleBuffering it writes from a 
FastBufferedDataOutputStream.
   
   the FastBufferedDataOutputStream forwards byte[]'s if it is larger than it's 
buffer size of 8192.
   
   ```java
   public void write(byte[] b, int off, int len) throws IOException {
        if(len > _bufflen) {
                // If we write a byte array that is larger than the buffer
                flushBuffer(); // flush the buffer first and
                // forward the array directly
                out.write(b, off, len);
        }
        else{
               ...
           }
   }
   ```
   
   Therefore the DoubleBufferingOutputStream can encounter inputs that are 
longer than it's allocated buffer.
   
   to handle it i added a second case for it:
   
   ```java@Override
   public void write(byte[] b, int off, int len) throws IOException {
        try {
                synchronized(_buff) {
                        final byte[] b_pos = _buff[_pos];
                        // block until buffer is free to use
                        _locks[_pos].get();
                        if(b_pos.length > len) {
                                // copy the block into the buffer.
                                System.arraycopy(b, off, b_pos, 0, len);
                                // submit write request guaranteed to be 
sequential since it is using a single thread.
                                _locks[_pos] = _pool.submit(() -> 
writeBuffer(b_pos, 0, len));
                                // copy for asynchronous write because b is 
reused higher up
                        }
                        else {
                                // The given byte array is longer than the 
buffer.
                                // This means that the async buffer would 
overflow and therefore not work.
                                // To avoid this we simply write the given byte 
array without a buffer.
                                // This approach only works if the caller 
adhere to not modify the byte array given
                                _locks[_pos] = _pool.submit(() -> 
writeBuffer(b, off, len));
                        }
                        _pos = (_pos + 1) % _buff.length;
                }
        }
        catch(Exception ex) {
                throw new IOException(ex);
        }
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@systemds.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to