Repository: flink Updated Branches: refs/heads/master d9cb5b710 -> 19066b520
[FLINK-1371] [runtime] Fix KryoSerializer to not swallow EOFExceptions Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19066b52 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19066b52 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19066b52 Branch: refs/heads/master Commit: 19066b520435528e104a69ccf372f56811123ee3 Parents: d9cb5b7 Author: Stephan Ewen <[email protected]> Authored: Thu Jan 8 12:38:51 2015 +0100 Committer: Stephan Ewen <[email protected]> Committed: Thu Jan 8 12:38:51 2015 +0100 ---------------------------------------------------------------------- .../java/typeutils/runtime/KryoSerializer.java | 25 +- .../typeutils/runtime/WritableSerializer.java | 1 + .../runtime/KryoGenericTypeSerializerTest.java | 47 ++- .../runtime/KryoVersusAvroMinibenchmark.java | 260 +--------------- .../runtime/TestDataOutputSerializer.java | 301 +++++++++++++++++++ 5 files changed, 373 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index 9e302bf..b2c55fb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -30,6 +30,7 @@ import org.apache.flink.core.memory.DataOutputView; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.EOFException; import java.io.IOException; public class KryoSerializer<T> extends TypeSerializer<T> { @@ -114,8 +115,19 @@ public class KryoSerializer<T> extends TypeSerializer<T> { previousOut = target; } - kryo.writeClassAndObject(output, record); - output.flush(); + try { + kryo.writeClassAndObject(output, record); + output.flush(); + } + catch (KryoException ke) { + Throwable cause = ke.getCause(); + if (cause instanceof EOFException) { + throw (EOFException) cause; + } + else { + throw ke; + } + } } @SuppressWarnings("unchecked") @@ -173,4 +185,13 @@ public class KryoSerializer<T> extends TypeSerializer<T> { this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } } + + // -------------------------------------------------------------------------------------------- + // for testing + // -------------------------------------------------------------------------------------------- + + Kryo getKryo() { + checkKryoInitialized(); + return this.kryo; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java index e838d27..c89733e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java @@ -43,6 +43,7 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> { this.typeClass = typeClass; } + @SuppressWarnings("unchecked") @Override public T createInstance() { if(typeClass == NullWritable.class) { http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java index 3c22b15..d0fc6ed 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java @@ -18,17 +18,20 @@ package org.apache.flink.api.java.typeutils.runtime; +import static org.junit.Assert.*; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.junit.Test; +import org.apache.flink.api.common.typeutils.TypeSerializer; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; +import java.util.Random; +@SuppressWarnings("unchecked") public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { - + @Test public void testJavaList(){ Collection<Integer> a = new ArrayList<Integer>(); @@ -67,4 +70,44 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer protected <T> TypeSerializer<T> createSerializer(Class<T> type) { return new KryoSerializer<T>(type); } + + /** + * Make sure that the kryo serializer forwards EOF exceptions properly + */ + @Test + public void testForwardEOFException() { + try { + // construct a long string + String str; + { + char[] charData = new char[40000]; + Random rnd = new Random(); + + for (int i = 0; i < charData.length; i++) { + charData[i] = (char) rnd.nextInt(10000); + } + + str = new String(charData); + } + + // construct a memory target that is too small for the string + TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000); + KryoSerializer<String> serializer = new KryoSerializer<String>(String.class); + + try { + serializer.serialize(str, target); + fail("should throw a java.io.EOFException"); + } + catch (java.io.EOFException e) { + // that is how we like it + } + catch (Exception e) { + fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java index 8111dc6..4c6b39f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java @@ -29,7 +29,6 @@ import java.util.Random; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.MemoryUtils; public class KryoVersusAvroMinibenchmark { @@ -61,7 +60,7 @@ public class KryoVersusAvroMinibenchmark { System.out.println("Avro serializer"); { - final DataOutputSerializer outView = new DataOutputSerializer(100000000); + final TestDataOutputSerializer outView = new TestDataOutputSerializer(100000000); final AvroSerializer<MyType> serializer = new AvroSerializer<MyType>(MyType.class); long start = System.nanoTime(); @@ -84,8 +83,9 @@ public class KryoVersusAvroMinibenchmark { System.out.println("Kryo serializer"); { - final DataOutputSerializer outView = new DataOutputSerializer(100000000); + final TestDataOutputSerializer outView = new TestDataOutputSerializer(100000000); final KryoSerializer<MyType> serializer = new KryoSerializer<MyType>(MyType.class); + serializer.getKryo().register(Tuple2.class); long start = System.nanoTime(); @@ -181,260 +181,6 @@ public class KryoVersusAvroMinibenchmark { // ============================================================================================ // ============================================================================================ - public static final class DataOutputSerializer implements DataOutputView { - - private byte[] buffer; - - private int position; - - private ByteBuffer wrapper; - - public DataOutputSerializer(int startSize) { - if (startSize < 1) { - throw new IllegalArgumentException(); - } - - this.buffer = new byte[startSize]; - this.wrapper = ByteBuffer.wrap(buffer); - } - - public ByteBuffer wrapAsByteBuffer() { - this.wrapper.position(0); - this.wrapper.limit(this.position); - return this.wrapper; - } - - public void clear() { - this.position = 0; - } - - public int length() { - return this.position; - } - - @Override - public String toString() { - return String.format("[pos=%d cap=%d]", this.position, this.buffer.length); - } - - // ---------------------------------------------------------------------------------------- - // Data Output - // ---------------------------------------------------------------------------------------- - - @Override - public void write(int b) throws IOException { - if (this.position >= this.buffer.length) { - resize(1); - } - this.buffer[this.position++] = (byte) (b & 0xff); - } - - @Override - public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (len < 0 || off > b.length - len) { - throw new ArrayIndexOutOfBoundsException(); - } - if (this.position > this.buffer.length - len) { - resize(len); - } - System.arraycopy(b, off, this.buffer, this.position, len); - this.position += len; - } - - @Override - public void writeBoolean(boolean v) throws IOException { - write(v ? 1 : 0); - } - - @Override - public void writeByte(int v) throws IOException { - write(v); - } - - @Override - public void writeBytes(String s) throws IOException { - final int sLen = s.length(); - if (this.position >= this.buffer.length - sLen) { - resize(sLen); - } - - for (int i = 0; i < sLen; i++) { - writeByte(s.charAt(i)); - } - this.position += sLen; - } - - @Override - public void writeChar(int v) throws IOException { - if (this.position >= this.buffer.length - 1) { - resize(2); - } - this.buffer[this.position++] = (byte) (v >> 8); - this.buffer[this.position++] = (byte) v; - } - - @Override - public void writeChars(String s) throws IOException { - final int sLen = s.length(); - if (this.position >= this.buffer.length - 2*sLen) { - resize(2*sLen); - } - for (int i = 0; i < sLen; i++) { - writeChar(s.charAt(i)); - } - } - - @Override - public void writeDouble(double v) throws IOException { - writeLong(Double.doubleToLongBits(v)); - } - - @Override - public void writeFloat(float v) throws IOException { - writeInt(Float.floatToIntBits(v)); - } - - @SuppressWarnings("restriction") - @Override - public void writeInt(int v) throws IOException { - if (this.position >= this.buffer.length - 3) { - resize(4); - } - if (LITTLE_ENDIAN) { - v = Integer.reverseBytes(v); - } - UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v); - this.position += 4; - } - - @SuppressWarnings("restriction") - @Override - public void writeLong(long v) throws IOException { - if (this.position >= this.buffer.length - 7) { - resize(8); - } - if (LITTLE_ENDIAN) { - v = Long.reverseBytes(v); - } - UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v); - this.position += 8; - } - - @Override - public void writeShort(int v) throws IOException { - if (this.position >= this.buffer.length - 1) { - resize(2); - } - this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff); - this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff); - } - - @Override - public void writeUTF(String str) throws IOException { - int strlen = str.length(); - int utflen = 0; - int c; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) { - throw new UTFDataFormatException("Encoded string is too long: " + utflen); - } - else if (this.position > this.buffer.length - utflen - 2) { - resize(utflen + 2); - } - - byte[] bytearr = this.buffer; - int count = this.position; - - bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); - bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF); - - int i = 0; - for (i = 0; i < strlen; i++) { - c = str.charAt(i); - if (!((c >= 0x0001) && (c <= 0x007F))) { - break; - } - bytearr[count++] = (byte) c; - } - - for (; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (byte) c; - - } else if (c > 0x07FF) { - bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } else { - bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } - } - - this.position = count; - } - - - private void resize(int minCapacityAdd) throws IOException { - try { - final int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); - final byte[] nb = new byte[newLen]; - System.arraycopy(this.buffer, 0, nb, 0, this.position); - this.buffer = nb; - this.wrapper = ByteBuffer.wrap(this.buffer); - } - catch (NegativeArraySizeException nasex) { - throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java)."); - } - } - - @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; - - @SuppressWarnings("restriction") - private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - - private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); - - @Override - public void skipBytesToWrite(int numBytes) throws IOException { - if(buffer.length - this.position < numBytes){ - throw new EOFException("Could not skip " + numBytes + " bytes."); - } - - this.position += numBytes; - } - - @Override - public void write(DataInputView source, int numBytes) throws IOException { - if(buffer.length - this.position < numBytes){ - throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow."); - } - - source.read(this.buffer, this.position, numBytes); - this.position += numBytes; - } - - } - public static final class DataInputDeserializer implements DataInputView { private byte[] buffer; http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java new file mode 100644 index 0000000..58a2aeb --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.EOFException; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemoryUtils; + +public final class TestDataOutputSerializer implements DataOutputView { + + private byte[] buffer; + + private int position; + + private ByteBuffer wrapper; + + private final int maxSize; + + + public TestDataOutputSerializer(int startSize) { + this(startSize, Integer.MAX_VALUE); + } + + public TestDataOutputSerializer(int startSize, int maxSize) { + if (startSize < 1 || startSize > maxSize) { + throw new IllegalArgumentException(); + } + + this.buffer = new byte[startSize]; + this.wrapper = ByteBuffer.wrap(buffer); + this.maxSize = maxSize; + } + + public ByteBuffer wrapAsByteBuffer() { + this.wrapper.position(0); + this.wrapper.limit(this.position); + return this.wrapper; + } + + public void clear() { + this.position = 0; + } + + public int length() { + return this.position; + } + + @Override + public String toString() { + return String.format("[pos=%d cap=%d]", this.position, this.buffer.length); + } + + // ---------------------------------------------------------------------------------------- + // Data Output + // ---------------------------------------------------------------------------------------- + + @Override + public void write(int b) throws IOException { + if (this.position >= this.buffer.length) { + resize(1); + } + this.buffer[this.position++] = (byte) (b & 0xff); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + if (this.position > this.buffer.length - len) { + resize(len); + } + System.arraycopy(b, off, this.buffer, this.position, len); + this.position += len; + } + + @Override + public void writeBoolean(boolean v) throws IOException { + write(v ? 1 : 0); + } + + @Override + public void writeByte(int v) throws IOException { + write(v); + } + + @Override + public void writeBytes(String s) throws IOException { + final int sLen = s.length(); + if (this.position >= this.buffer.length - sLen) { + resize(sLen); + } + + for (int i = 0; i < sLen; i++) { + writeByte(s.charAt(i)); + } + this.position += sLen; + } + + @Override + public void writeChar(int v) throws IOException { + if (this.position >= this.buffer.length - 1) { + resize(2); + } + this.buffer[this.position++] = (byte) (v >> 8); + this.buffer[this.position++] = (byte) v; + } + + @Override + public void writeChars(String s) throws IOException { + final int sLen = s.length(); + if (this.position >= this.buffer.length - 2*sLen) { + resize(2*sLen); + } + for (int i = 0; i < sLen; i++) { + writeChar(s.charAt(i)); + } + } + + @Override + public void writeDouble(double v) throws IOException { + writeLong(Double.doubleToLongBits(v)); + } + + @Override + public void writeFloat(float v) throws IOException { + writeInt(Float.floatToIntBits(v)); + } + + @SuppressWarnings("restriction") + @Override + public void writeInt(int v) throws IOException { + if (this.position >= this.buffer.length - 3) { + resize(4); + } + if (LITTLE_ENDIAN) { + v = Integer.reverseBytes(v); + } + UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v); + this.position += 4; + } + + @SuppressWarnings("restriction") + @Override + public void writeLong(long v) throws IOException { + if (this.position >= this.buffer.length - 7) { + resize(8); + } + if (LITTLE_ENDIAN) { + v = Long.reverseBytes(v); + } + UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v); + this.position += 8; + } + + @Override + public void writeShort(int v) throws IOException { + if (this.position >= this.buffer.length - 1) { + resize(2); + } + this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff); + this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff); + } + + @Override + public void writeUTF(String str) throws IOException { + int strlen = str.length(); + int utflen = 0; + int c; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + if (utflen > 65535) { + throw new UTFDataFormatException("Encoded string is too long: " + utflen); + } + else if (this.position > this.buffer.length - utflen - 2) { + resize(utflen + 2); + } + + byte[] bytearr = this.buffer; + int count = this.position; + + bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); + bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF); + + int i = 0; + for (i = 0; i < strlen; i++) { + c = str.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) { + break; + } + bytearr[count++] = (byte) c; + } + + for (; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + bytearr[count++] = (byte) c; + + } else if (c > 0x07FF) { + bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } else { + bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } + } + + this.position = count; + } + + + private void resize(int minCapacityAdd) throws IOException { + try { + int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); + + if (newLen > maxSize) { + + if (this.buffer.length + minCapacityAdd > maxSize) { + throw new EOFException("Exceeded maximum capacity"); + } + + newLen = maxSize; + } + + final byte[] nb = new byte[newLen]; + System.arraycopy(this.buffer, 0, nb, 0, this.position); + this.buffer = nb; + this.wrapper = ByteBuffer.wrap(this.buffer); + } + catch (NegativeArraySizeException nasex) { + throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java)."); + } + } + + @SuppressWarnings("restriction") + private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + + @SuppressWarnings("restriction") + private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); + + @Override + public void skipBytesToWrite(int numBytes) throws IOException { + if(buffer.length - this.position < numBytes){ + throw new EOFException("Could not skip " + numBytes + " bytes."); + } + + this.position += numBytes; + } + + @Override + public void write(DataInputView source, int numBytes) throws IOException { + if(buffer.length - this.position < numBytes){ + throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow."); + } + + source.read(this.buffer, this.position, numBytes); + this.position += numBytes; + } + +} \ No newline at end of file
