ctubbsii commented on code in PR #3219:
URL: https://github.com/apache/accumulo/pull/3219#discussion_r1126693126
##########
core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java:
##########
@@ -78,29 +94,34 @@ public int read(byte[] b, int offset, int length) {
length = avail;
}
- System.arraycopy(buffer, cur, b, offset, length);
- cur += length;
+ System.arraycopy(buffer, currentValue, b, offset, length);
return length;
}
@Override
public long skip(long requestedSkip) {
- int actualSkip = max - cur;
- if (requestedSkip < actualSkip) {
- if (requestedSkip < 0) {
- actualSkip = 0;
- } else {
- actualSkip = (int) requestedSkip;
+
+ BiFunction<Integer,Integer,Integer> skipValue = (current, skip) -> {
+ int actual = max - current;
+ if (skip < actual) {
+ actual = Math.max(skip, 0);
}
- }
+ return actual;
+ };
+
+ IntBinaryOperator add = (cur1, skip) -> {
+ int actual = skipValue.apply(cur1, skip);
+ return cur1 + actual;
+ };
Review Comment:
```suggestion
// compute how much to advance, based on actual amount skipped
IntBinaryOperator add = (cur1, skip) -> cur1 + skipValue.apply(cur1,
skip);
```
##########
core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java:
##########
@@ -78,29 +94,34 @@ public int read(byte[] b, int offset, int length) {
length = avail;
}
- System.arraycopy(buffer, cur, b, offset, length);
- cur += length;
+ System.arraycopy(buffer, currentValue, b, offset, length);
return length;
}
@Override
public long skip(long requestedSkip) {
- int actualSkip = max - cur;
- if (requestedSkip < actualSkip) {
- if (requestedSkip < 0) {
- actualSkip = 0;
- } else {
- actualSkip = (int) requestedSkip;
+
+ BiFunction<Integer,Integer,Integer> skipValue = (current, skip) -> {
+ int actual = max - current;
+ if (skip < actual) {
+ actual = Math.max(skip, 0);
}
- }
+ return actual;
+ };
+
+ IntBinaryOperator add = (cur1, skip) -> {
+ int actual = skipValue.apply(cur1, skip);
+ return cur1 + actual;
+ };
+
+ int currentValue = cur.getAndAccumulate((int) requestedSkip, add);
Review Comment:
```suggestion
// advance the pointer and return the actual amount skipped
int currentValue = cur.getAndAccumulate((int) requestedSkip, add);
```
##########
core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java:
##########
@@ -78,29 +94,34 @@ public int read(byte[] b, int offset, int length) {
length = avail;
}
- System.arraycopy(buffer, cur, b, offset, length);
- cur += length;
+ System.arraycopy(buffer, currentValue, b, offset, length);
return length;
}
@Override
public long skip(long requestedSkip) {
- int actualSkip = max - cur;
- if (requestedSkip < actualSkip) {
- if (requestedSkip < 0) {
- actualSkip = 0;
- } else {
- actualSkip = (int) requestedSkip;
+
Review Comment:
```suggestion
// actual skip is at least 0, but no more than what's available
BiFunction<Integer,Integer,Integer> skipValue = (current, skip) ->
Math.max(0, Math.min(max - current, skip));
```
##########
core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java:
##########
@@ -41,14 +44,15 @@ public class SeekableByteArrayInputStream extends
InputStream {
// thread 2 sees all of thread 1 changes before setting the volatile.
@SuppressFBWarnings(value = "VO_VOLATILE_REFERENCE_TO_ARRAY",
justification = "see explanation above")
- private volatile byte[] buffer;
- private int cur;
- private int max;
+ private final byte[] buffer;
+ private final AtomicInteger cur = new AtomicInteger(0);
+ private final int max;
@Override
public int read() {
- if (cur < max) {
- return buffer[cur++] & 0xff;
+ final int currentValue = cur.getAndAccumulate(1, (v, x) -> v < max ? v + x
: v);
Review Comment:
```suggestion
// advance the pointer by 1 if we haven't reached the end
final int currentValue = cur.getAndAccumulate(1, (v, x) -> v < max ? v +
x : v);
```
##########
core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java:
##########
@@ -124,26 +145,24 @@ public void close() throws IOException {}
public SeekableByteArrayInputStream(byte[] buf) {
requireNonNull(buf, "bug argument was null");
this.buffer = buf;
- this.cur = 0;
this.max = buf.length;
}
public SeekableByteArrayInputStream(byte[] buf, int maxOffset) {
requireNonNull(buf, "bug argument was null");
this.buffer = buf;
- this.cur = 0;
this.max = maxOffset;
}
public void seek(int position) {
if (position < 0 || position >= max) {
throw new IllegalArgumentException("position = " + position + "
maxOffset = " + max);
}
- this.cur = position;
+ this.cur.set(position);
}
public int getPosition() {
- return this.cur;
+ return this.cur.get();
}
byte[] getBuffer() {
Review Comment:
This method looks dangerous. We can definitely get into trouble if another
thread is altering the buffer returned by this. However... I don't think
marking it volatile would have protected us from that. If we don't need this
method, it should be removed.
##########
core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java:
##########
@@ -78,29 +94,34 @@ public int read(byte[] b, int offset, int length) {
length = avail;
}
- System.arraycopy(buffer, cur, b, offset, length);
- cur += length;
+ System.arraycopy(buffer, currentValue, b, offset, length);
return length;
}
@Override
public long skip(long requestedSkip) {
- int actualSkip = max - cur;
- if (requestedSkip < actualSkip) {
- if (requestedSkip < 0) {
- actualSkip = 0;
- } else {
- actualSkip = (int) requestedSkip;
+
+ BiFunction<Integer,Integer,Integer> skipValue = (current, skip) -> {
+ int actual = max - current;
+ if (skip < actual) {
+ actual = Math.max(skip, 0);
}
- }
+ return actual;
+ };
Review Comment:
```suggestion
```
##########
core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java:
##########
@@ -78,29 +94,34 @@ public int read(byte[] b, int offset, int length) {
length = avail;
}
- System.arraycopy(buffer, cur, b, offset, length);
- cur += length;
+ System.arraycopy(buffer, currentValue, b, offset, length);
return length;
}
@Override
public long skip(long requestedSkip) {
- int actualSkip = max - cur;
- if (requestedSkip < actualSkip) {
- if (requestedSkip < 0) {
- actualSkip = 0;
- } else {
- actualSkip = (int) requestedSkip;
+
+ BiFunction<Integer,Integer,Integer> skipValue = (current, skip) -> {
+ int actual = max - current;
+ if (skip < actual) {
+ actual = Math.max(skip, 0);
Review Comment:
```suggestion
```
##########
core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java:
##########
@@ -68,7 +72,19 @@ public int read(byte[] b, int offset, int length) {
return 0;
}
- int avail = max - cur;
+ IntBinaryOperator add = (cur1, length1) -> {
Review Comment:
```suggestion
// compute how much to read, based on what's left available
IntBinaryOperator add = (cur1, length1) -> {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]