[FLINK-1598] [runtime] Better error message when network serialization of 
records exceeds java heap space.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9528a521
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9528a521
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9528a521

Branch: refs/heads/master
Commit: 9528a521f56e0c6b0c70d43e62ad84b19c048c36
Parents: 98bc7b9
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 23 13:45:51 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 24 00:08:27 2015 +0100

----------------------------------------------------------------------
 .../runtime/util/DataOutputSerializer.java      | 31 ++++++++++++++++----
 1 file changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9528a521/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
index 2d06e29..7f8105d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
@@ -268,16 +268,35 @@ public class DataOutputSerializer implements 
DataOutputView {
        
        
        private void resize(int minCapacityAdd) throws IOException {
+               int newLen = Math.max(this.buffer.length * 2, 
this.buffer.length + minCapacityAdd);
+               byte[] nb;
                try {
-                       final int newLen = Math.max(this.buffer.length * 2, 
this.buffer.length + minCapacityAdd);
-                       final byte[] nb = new byte[newLen];
-                       System.arraycopy(this.buffer, 0, nb, 0, this.position);
-                       this.buffer = nb;
-                       this.wrapper = ByteBuffer.wrap(this.buffer);
+                       nb = new byte[newLen];
                }
-               catch (NegativeArraySizeException nasex) {
+               catch (NegativeArraySizeException e) {
                        throw new IOException("Serialization failed because the 
record length would exceed 2GB (max addressable array size in Java).");
                }
+               catch (OutOfMemoryError e) {
+                       // this was too large to allocate, try the smaller size 
(if possible)
+                       if (newLen > this.buffer.length + minCapacityAdd) {
+                               newLen = this.buffer.length + minCapacityAdd;
+                               try {
+                                       nb = new byte[newLen];
+                               }
+                               catch (OutOfMemoryError ee) {
+                                       // still not possible. give an 
informative exception message that reports the size
+                                       throw new IOException("Failed to 
serialize element. Serialized size (> "
+                                                       + newLen + " bytes) 
exceeds JVM heap space", ee);
+                               }
+                       } else {
+                               throw new IOException("Failed to serialize 
element. Serialized size (> "
+                                               + newLen + " bytes) exceeds JVM 
heap space", e);
+                       }
+               }
+
+               System.arraycopy(this.buffer, 0, nb, 0, this.position);
+               this.buffer = nb;
+               this.wrapper = ByteBuffer.wrap(this.buffer);
        }
        
        @SuppressWarnings("restriction")

Reply via email to