[FLINK-1598] [runtime] Better error message when network serialization of records exceeds java heap space.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9528a521 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9528a521 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9528a521 Branch: refs/heads/master Commit: 9528a521f56e0c6b0c70d43e62ad84b19c048c36 Parents: 98bc7b9 Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 23 13:45:51 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 24 00:08:27 2015 +0100 ---------------------------------------------------------------------- .../runtime/util/DataOutputSerializer.java | 31 ++++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9528a521/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java index 2d06e29..7f8105d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java @@ -268,16 +268,35 @@ public class DataOutputSerializer implements DataOutputView { private void resize(int minCapacityAdd) throws IOException { + int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); + byte[] nb; try { - final int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); - final byte[] nb = new byte[newLen]; - System.arraycopy(this.buffer, 0, nb, 0, this.position); - this.buffer = nb; - this.wrapper = ByteBuffer.wrap(this.buffer); + nb = new byte[newLen]; } - catch (NegativeArraySizeException nasex) { + catch (NegativeArraySizeException e) { throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java)."); } + catch (OutOfMemoryError e) { + // this was too large to allocate, try the smaller size (if possible) + if (newLen > this.buffer.length + minCapacityAdd) { + newLen = this.buffer.length + minCapacityAdd; + try { + nb = new byte[newLen]; + } + catch (OutOfMemoryError ee) { + // still not possible. give an informative exception message that reports the size + throw new IOException("Failed to serialize element. Serialized size (> " + + newLen + " bytes) exceeds JVM heap space", ee); + } + } else { + throw new IOException("Failed to serialize element. Serialized size (> " + + newLen + " bytes) exceeds JVM heap space", e); + } + } + + System.arraycopy(this.buffer, 0, nb, 0, this.position); + this.buffer = nb; + this.wrapper = ByteBuffer.wrap(this.buffer); } @SuppressWarnings("restriction")