This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 0cc90b2798 [SYSTEMDS-3597] Fix race conditions in double buffering
output streams
0cc90b2798 is described below
commit 0cc90b2798e01874c16fd5b6bd7e05f73422ff59
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Jul 21 16:51:50 2023 +0200
[SYSTEMDS-3597] Fix race conditions in double buffering output streams
This patch fixes race conditions in the new double buffering, where
the lambda function is not immediately executed leading to false
orderings of array references. We now pass dedicated write tasks to
the thread pool.
---
.../runtime/util/DoubleBufferingOutputStream.java | 23 +++++++++++++++++++---
.../runtime/util/FastBufferedDataOutputStream.java | 6 +++---
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
index 0a2ff6bfa4..f1d2fadd93 100644
---
a/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
+++
b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
@@ -22,6 +22,7 @@ package org.apache.sysds.runtime.util;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -29,8 +30,8 @@ import org.apache.commons.lang3.concurrent.ConcurrentUtils;
public class DoubleBufferingOutputStream extends FilterOutputStream
{
- ExecutorService _pool = CommonThreadPool.get(1); //no outrun
- Future<?>[] _locks;
+ protected ExecutorService _pool = CommonThreadPool.get(1);
+ protected Future<?>[] _locks;
protected byte[][] _buff;
private int _pos;
@@ -68,7 +69,7 @@ public class DoubleBufferingOutputStream extends
FilterOutputStream
System.arraycopy(b, off, _buff[_pos], 0, len);
//submit write request
- _locks[_pos] = _pool.submit(() ->
writeBuffer(_buff[_pos], 0, len));
+ _locks[_pos] = _pool.submit(new
WriteTask(_buff[_pos], len));
_pos = (_pos+1) % _buff.length;
}
}
@@ -105,4 +106,20 @@ public class DoubleBufferingOutputStream extends
FilterOutputStream
_pool.shutdown();
super.close();
}
+
+ private class WriteTask implements Callable<Object> {
+ private final byte[] _b;
+ private final int _len;
+
+ protected WriteTask(byte[] buff, int len) {
+ _b = buff;
+ _len = len;
+ }
+
+ @Override
+ public Object call() {
+ writeBuffer(_b, 0, _len);
+ return null;
+ }
+ }
}
diff --git
a/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java
b/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java
index 9c84ac213f..adf9f0abd5 100644
---
a/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java
+++
b/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java
@@ -229,7 +229,7 @@ public class FastBufferedDataOutputStream extends
FilterOutputStream implements
long tmp =
Double.doubleToRawLongBits(varr[i+j]);
longToBa(tmp, _buff, _count);
_count += 8;
- }
+ }
//flush buffer for current block
flushBuffer(); //based on count
@@ -258,7 +258,7 @@ public class FastBufferedDataOutputStream extends
FilterOutputStream implements
if( alen2 < _bufflen )
{
if (_count+alen2 > _bufflen)
- flushBuffer();
+ flushBuffer();
for( int j=apos; j<apos+alen; j++ )
{
@@ -274,7 +274,7 @@ public class FastBufferedDataOutputStream extends
FilterOutputStream implements
for( int j=apos; j<apos+alen; j++ )
{
if (_count+12 > _bufflen)
- flushBuffer();
+ flushBuffer();
long tmp2 =
Double.doubleToRawLongBits(avals[j]);
intToBa(aix[j], _buff, _count);