davidradl commented on code in PR #28252:
URL: https://github.com/apache/flink/pull/28252#discussion_r3357356695
##########
flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java:
##########
@@ -333,26 +341,52 @@ private int getUTFBytesSize(int c) {
}
}
+ /**
+ * Computes the new buffer length for a {@link #resize(int)} call.
+ *
+ * <p>Uses {@code long} arithmetic so that doubling does not silently
overflow once the current
+ * buffer length crosses {@code Integer.MAX_VALUE / 2}. When doubling
would exceed {@link
+ * #MAX_ARRAY_SIZE}, the buffer jumps directly to the cap rather than
growing by {@code
+ * minCapacityAdd} bytes at a time — the latter would degrade every
subsequent resize into a
+ * full copy of a ~1–2 GB buffer.
+ *
+ * @throws IOException if the required size exceeds {@link
#MAX_ARRAY_SIZE}.
+ */
+ @VisibleForTesting
+ static int computeNewBufferLength(int currentLength, int minCapacityAdd)
throws IOException {
+ long requiredLen = (long) currentLength + minCapacityAdd;
+ if (requiredLen > MAX_ARRAY_SIZE) {
+ throw new IOException(
+ "Serialization failed because the record length ("
+ + requiredLen
+ + " bytes) would exceed the maximum Java array
size ("
+ + MAX_ARRAY_SIZE
+ + " bytes).");
+ }
+ long doubledLen = (long) currentLength * 2L;
+ if (doubledLen > MAX_ARRAY_SIZE) {
+ return MAX_ARRAY_SIZE;
+ }
+ return (int) Math.max(doubledLen, requiredLen);
+ }
+
private void resize(int minCapacityAdd) throws IOException {
- int newLen = Math.max(this.buffer.length * 2, this.buffer.length +
minCapacityAdd);
+ int newLen = computeNewBufferLength(this.buffer.length,
minCapacityAdd);
byte[] nb;
try {
nb = new byte[newLen];
- } catch (NegativeArraySizeException e) {
- throw new IOException(
- "Serialization failed because the record length would
exceed 2GB (max addressable array size in Java).");
} catch (OutOfMemoryError e) {
// this was too large to allocate, try the smaller size (if
possible)
- if (newLen > this.buffer.length + minCapacityAdd) {
- newLen = this.buffer.length + minCapacityAdd;
+ int minLen = this.buffer.length + minCapacityAdd;
+ if (newLen > minLen) {
try {
- nb = new byte[newLen];
+ nb = new byte[minLen];
} catch (OutOfMemoryError ee) {
// still not possible. give an informative exception
message that reports the
// size
throw new IOException(
Review Comment:
should we pre allocate this Exception in case cannot create a new
IOException as we have no memory? Or just leave out of memory to percolate up
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]