This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cc90b2798 [SYSTEMDS-3597] Fix race conditions in double buffering 
output streams
0cc90b2798 is described below

commit 0cc90b2798e01874c16fd5b6bd7e05f73422ff59
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Jul 21 16:51:50 2023 +0200

    [SYSTEMDS-3597] Fix race conditions in double buffering output streams
    
    This patch fixes race conditions in the new double buffering, where
    the lambda function is not immediately executed leading to false
    orderings of array references. We now pass dedicated write tasks to
    the thread pool.
---
 .../runtime/util/DoubleBufferingOutputStream.java  | 23 +++++++++++++++++++---
 .../runtime/util/FastBufferedDataOutputStream.java |  6 +++---
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java 
b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
index 0a2ff6bfa4..f1d2fadd93 100644
--- 
a/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
+++ 
b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
@@ -22,6 +22,7 @@ package org.apache.sysds.runtime.util;
 import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -29,8 +30,8 @@ import org.apache.commons.lang3.concurrent.ConcurrentUtils;
 
 public class DoubleBufferingOutputStream extends FilterOutputStream
 {
-       ExecutorService _pool = CommonThreadPool.get(1); //no outrun
-       Future<?>[] _locks;
+       protected ExecutorService _pool = CommonThreadPool.get(1);
+       protected Future<?>[] _locks;
        protected byte[][] _buff;
        private int _pos;
        
@@ -68,7 +69,7 @@ public class DoubleBufferingOutputStream extends 
FilterOutputStream
                                System.arraycopy(b, off, _buff[_pos], 0, len);
                                
                                //submit write request 
-                               _locks[_pos] = _pool.submit(() -> 
writeBuffer(_buff[_pos], 0, len));
+                               _locks[_pos] = _pool.submit(new 
WriteTask(_buff[_pos], len));
                                _pos = (_pos+1) % _buff.length;
                        }
                }
@@ -105,4 +106,20 @@ public class DoubleBufferingOutputStream extends 
FilterOutputStream
                _pool.shutdown();
                super.close();
        }
+       
+       private class WriteTask implements Callable<Object> {
+               private final byte[] _b;
+               private final int _len;
+               
+               protected WriteTask(byte[] buff, int len) {
+                       _b = buff;
+                       _len = len;
+               }
+               
+               @Override
+               public Object call() {
+                       writeBuffer(_b, 0, _len);
+                       return null;
+               }
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java 
b/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java
index 9c84ac213f..adf9f0abd5 100644
--- 
a/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java
+++ 
b/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java
@@ -229,7 +229,7 @@ public class FastBufferedDataOutputStream extends 
FilterOutputStream implements
                                long tmp = 
Double.doubleToRawLongBits(varr[i+j]);
                                longToBa(tmp, _buff, _count);
                                _count += 8;
-                       }       
+                       }
                        
                        //flush buffer for current block
                        flushBuffer(); //based on count
@@ -258,7 +258,7 @@ public class FastBufferedDataOutputStream extends 
FilterOutputStream implements
                                if( alen2 < _bufflen )
                                {
                                        if (_count+alen2 > _bufflen) 
-                                           flushBuffer();
+                                               flushBuffer();
                                        
                                        for( int j=apos; j<apos+alen; j++ )
                                        {
@@ -274,7 +274,7 @@ public class FastBufferedDataOutputStream extends 
FilterOutputStream implements
                                        for( int j=apos; j<apos+alen; j++ )
                                        {
                                                if (_count+12 > _bufflen) 
-                                                   flushBuffer();
+                                                       flushBuffer();
                                                
                                                long tmp2 = 
Double.doubleToRawLongBits(avals[j]);
                                                intToBa(aix[j], _buff, _count);

Reply via email to