Repository: flink
Updated Branches:
  refs/heads/master d9cb5b710 -> 19066b520


[FLINK-1371] [runtime] Fix KryoSerializer to not swallow EOFExceptions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19066b52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19066b52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19066b52

Branch: refs/heads/master
Commit: 19066b520435528e104a69ccf372f56811123ee3
Parents: d9cb5b7
Author: Stephan Ewen <[email protected]>
Authored: Thu Jan 8 12:38:51 2015 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Jan 8 12:38:51 2015 +0100

----------------------------------------------------------------------
 .../java/typeutils/runtime/KryoSerializer.java  |  25 +-
 .../typeutils/runtime/WritableSerializer.java   |   1 +
 .../runtime/KryoGenericTypeSerializerTest.java  |  47 ++-
 .../runtime/KryoVersusAvroMinibenchmark.java    | 260 +---------------
 .../runtime/TestDataOutputSerializer.java       | 301 +++++++++++++++++++
 5 files changed, 373 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 9e302bf..b2c55fb 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 
 public class KryoSerializer<T> extends TypeSerializer<T> {
@@ -114,8 +115,19 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                        previousOut = target;
                }
                
-               kryo.writeClassAndObject(output, record);
-               output.flush();
+               try {
+                       kryo.writeClassAndObject(output, record);
+                       output.flush();
+               }
+               catch (KryoException ke) {
+                       Throwable cause = ke.getCause();
+                       if (cause instanceof EOFException) {
+                               throw (EOFException) cause;
+                       }
+                       else {
+                               throw ke;
+                       }
+               }
        }
 
        @SuppressWarnings("unchecked")
@@ -173,4 +185,13 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                        
this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
                }
        }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // for testing
+       // 
--------------------------------------------------------------------------------------------
+       
+       Kryo getKryo() {
+               checkKryoInitialized();
+               return this.kryo;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index e838d27..c89733e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -43,6 +43,7 @@ public class WritableSerializer<T extends Writable> extends 
TypeSerializer<T> {
                this.typeClass = typeClass;
        }
        
+       @SuppressWarnings("unchecked")
        @Override
        public T createInstance() {
                if(typeClass == NullWritable.class) {

http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
index 3c22b15..d0fc6ed 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import static org.junit.Assert.*;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.junit.Test;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.Random;
 
+@SuppressWarnings("unchecked")
 public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializerTest {
-
+       
        @Test
        public void testJavaList(){
                Collection<Integer> a = new ArrayList<Integer>();
@@ -67,4 +70,44 @@ public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializer
        protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
                return new KryoSerializer<T>(type);
        }
+       
+       /**
+        * Make sure that the kryo serializer forwards EOF exceptions properly
+        */
+       @Test
+       public void testForwardEOFException() {
+               try {
+                       // construct a long string
+                       String str;
+                       {
+                               char[] charData = new char[40000];
+                               Random rnd = new Random();
+                               
+                               for (int i = 0; i < charData.length; i++) {
+                                       charData[i] = (char) rnd.nextInt(10000);
+                               }
+                               
+                               str = new String(charData);
+                       }
+                       
+                       // construct a memory target that is too small for the 
string
+                       TestDataOutputSerializer target = new 
TestDataOutputSerializer(10000, 30000);
+                       KryoSerializer<String> serializer = new 
KryoSerializer<String>(String.class);
+                       
+                       try {
+                               serializer.serialize(str, target);
+                               fail("should throw a java.io.EOFException");
+                       }
+                       catch (java.io.EOFException e) {
+                               // that is how we like it
+                       }
+                       catch (Exception e) {
+                               fail("throws wrong exception: should throw a 
java.io.EOFException, has thrown a " + e.getClass().getName());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
index 8111dc6..4c6b39f 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
@@ -29,7 +29,6 @@ import java.util.Random;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemoryUtils;
 
 public class KryoVersusAvroMinibenchmark {
@@ -61,7 +60,7 @@ public class KryoVersusAvroMinibenchmark {
                        
                        System.out.println("Avro serializer");
                        {
-                               final DataOutputSerializer outView = new 
DataOutputSerializer(100000000);
+                               final TestDataOutputSerializer outView = new 
TestDataOutputSerializer(100000000);
                                final AvroSerializer<MyType> serializer = new 
AvroSerializer<MyType>(MyType.class);
                                
                                long start = System.nanoTime();
@@ -84,8 +83,9 @@ public class KryoVersusAvroMinibenchmark {
                        
                        System.out.println("Kryo serializer");
                        {
-                               final DataOutputSerializer outView = new 
DataOutputSerializer(100000000);
+                               final TestDataOutputSerializer outView = new 
TestDataOutputSerializer(100000000);
                                final KryoSerializer<MyType> serializer = new 
KryoSerializer<MyType>(MyType.class);
+                               serializer.getKryo().register(Tuple2.class);
                                
                                long start = System.nanoTime();
                                
@@ -181,260 +181,6 @@ public class KryoVersusAvroMinibenchmark {
        // 
============================================================================================
        // 
============================================================================================
        
-       public static final class DataOutputSerializer implements 
DataOutputView {
-               
-               private byte[] buffer;
-               
-               private int position;
-
-               private ByteBuffer wrapper;
-               
-               public DataOutputSerializer(int startSize) {
-                       if (startSize < 1) {
-                               throw new IllegalArgumentException();
-                       }
-
-                       this.buffer = new byte[startSize];
-                       this.wrapper = ByteBuffer.wrap(buffer);
-               }
-               
-               public ByteBuffer wrapAsByteBuffer() {
-                       this.wrapper.position(0);
-                       this.wrapper.limit(this.position);
-                       return this.wrapper;
-               }
-
-               public void clear() {
-                       this.position = 0;
-               }
-
-               public int length() {
-                       return this.position;
-               }
-
-               @Override
-               public String toString() {
-                       return String.format("[pos=%d cap=%d]", this.position, 
this.buffer.length);
-               }
-
-               // 
----------------------------------------------------------------------------------------
-               //                               Data Output
-               // 
----------------------------------------------------------------------------------------
-               
-               @Override
-               public void write(int b) throws IOException {
-                       if (this.position >= this.buffer.length) {
-                               resize(1);
-                       }
-                       this.buffer[this.position++] = (byte) (b & 0xff);
-               }
-
-               @Override
-               public void write(byte[] b) throws IOException {
-                       write(b, 0, b.length);
-               }
-
-               @Override
-               public void write(byte[] b, int off, int len) throws 
IOException {
-                       if (len < 0 || off > b.length - len) {
-                               throw new ArrayIndexOutOfBoundsException();
-                       }
-                       if (this.position > this.buffer.length - len) {
-                               resize(len);
-                       }
-                       System.arraycopy(b, off, this.buffer, this.position, 
len);
-                       this.position += len;
-               }
-
-               @Override
-               public void writeBoolean(boolean v) throws IOException {
-                       write(v ? 1 : 0);
-               }
-
-               @Override
-               public void writeByte(int v) throws IOException {
-                       write(v);
-               }
-
-               @Override
-               public void writeBytes(String s) throws IOException {
-                       final int sLen = s.length();
-                       if (this.position >= this.buffer.length - sLen) {
-                               resize(sLen);
-                       }
-                       
-                       for (int i = 0; i < sLen; i++) {
-                               writeByte(s.charAt(i));
-                       }
-                       this.position += sLen;
-               }
-
-               @Override
-               public void writeChar(int v) throws IOException {
-                       if (this.position >= this.buffer.length - 1) {
-                               resize(2);
-                       }
-                       this.buffer[this.position++] = (byte) (v >> 8);
-                       this.buffer[this.position++] = (byte) v;
-               }
-
-               @Override
-               public void writeChars(String s) throws IOException {
-                       final int sLen = s.length();
-                       if (this.position >= this.buffer.length - 2*sLen) {
-                               resize(2*sLen);
-                       } 
-                       for (int i = 0; i < sLen; i++) {
-                               writeChar(s.charAt(i));
-                       }
-               }
-
-               @Override
-               public void writeDouble(double v) throws IOException {
-                       writeLong(Double.doubleToLongBits(v));
-               }
-
-               @Override
-               public void writeFloat(float v) throws IOException {
-                       writeInt(Float.floatToIntBits(v));
-               }
-
-               @SuppressWarnings("restriction")
-               @Override
-               public void writeInt(int v) throws IOException {
-                       if (this.position >= this.buffer.length - 3) {
-                               resize(4);
-                       }
-                       if (LITTLE_ENDIAN) {
-                               v = Integer.reverseBytes(v);
-                       }                       
-                       UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, 
v);
-                       this.position += 4;
-               }
-
-               @SuppressWarnings("restriction")
-               @Override
-               public void writeLong(long v) throws IOException {
-                       if (this.position >= this.buffer.length - 7) {
-                               resize(8);
-                       }
-                       if (LITTLE_ENDIAN) {
-                               v = Long.reverseBytes(v);
-                       }
-                       UNSAFE.putLong(this.buffer, BASE_OFFSET + 
this.position, v);
-                       this.position += 8;
-               }
-
-               @Override
-               public void writeShort(int v) throws IOException {
-                       if (this.position >= this.buffer.length - 1) {
-                               resize(2);
-                       }
-                       this.buffer[this.position++] = (byte) ((v >>> 8) & 
0xff);
-                       this.buffer[this.position++] = (byte) ((v >>> 0) & 
0xff);
-               }
-
-               @Override
-               public void writeUTF(String str) throws IOException {
-                       int strlen = str.length();
-                       int utflen = 0;
-                       int c;
-
-                       /* use charAt instead of copying String to char array */
-                       for (int i = 0; i < strlen; i++) {
-                               c = str.charAt(i);
-                               if ((c >= 0x0001) && (c <= 0x007F)) {
-                                       utflen++;
-                               } else if (c > 0x07FF) {
-                                       utflen += 3;
-                               } else {
-                                       utflen += 2;
-                               }
-                       }
-
-                       if (utflen > 65535) {
-                               throw new UTFDataFormatException("Encoded 
string is too long: " + utflen);
-                       }
-                       else if (this.position > this.buffer.length - utflen - 
2) {
-                               resize(utflen + 2);
-                       }
-                       
-                       byte[] bytearr = this.buffer;
-                       int count = this.position;
-
-                       bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-                       bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
-
-                       int i = 0;
-                       for (i = 0; i < strlen; i++) {
-                               c = str.charAt(i);
-                               if (!((c >= 0x0001) && (c <= 0x007F))) {
-                                       break;
-                               }
-                               bytearr[count++] = (byte) c;
-                       }
-
-                       for (; i < strlen; i++) {
-                               c = str.charAt(i);
-                               if ((c >= 0x0001) && (c <= 0x007F)) {
-                                       bytearr[count++] = (byte) c;
-
-                               } else if (c > 0x07FF) {
-                                       bytearr[count++] = (byte) (0xE0 | ((c 
>> 12) & 0x0F));
-                                       bytearr[count++] = (byte) (0x80 | ((c 
>> 6) & 0x3F));
-                                       bytearr[count++] = (byte) (0x80 | ((c 
>> 0) & 0x3F));
-                               } else {
-                                       bytearr[count++] = (byte) (0xC0 | ((c 
>> 6) & 0x1F));
-                                       bytearr[count++] = (byte) (0x80 | ((c 
>> 0) & 0x3F));
-                               }
-                       }
-
-                       this.position = count;
-               }
-               
-               
-               private void resize(int minCapacityAdd) throws IOException {
-                       try {
-                               final int newLen = Math.max(this.buffer.length 
* 2, this.buffer.length + minCapacityAdd);
-                               final byte[] nb = new byte[newLen];
-                               System.arraycopy(this.buffer, 0, nb, 0, 
this.position);
-                               this.buffer = nb;
-                               this.wrapper = ByteBuffer.wrap(this.buffer);
-                       }
-                       catch (NegativeArraySizeException nasex) {
-                               throw new IOException("Serialization failed 
because the record length would exceed 2GB (max addressable array size in 
Java).");
-                       }
-               }
-               
-               @SuppressWarnings("restriction")
-               private static final sun.misc.Unsafe UNSAFE = 
MemoryUtils.UNSAFE;
-               
-               @SuppressWarnings("restriction")
-               private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
-               
-               private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
-
-               @Override
-               public void skipBytesToWrite(int numBytes) throws IOException {
-                       if(buffer.length - this.position < numBytes){
-                               throw new EOFException("Could not skip " + 
numBytes + " bytes.");
-                       }
-
-                       this.position += numBytes;
-               }
-
-               @Override
-               public void write(DataInputView source, int numBytes) throws 
IOException {
-                       if(buffer.length - this.position < numBytes){
-                               throw new EOFException("Could not write " + 
numBytes + " bytes. Buffer overflow.");
-                       }
-
-                       source.read(this.buffer, this.position, numBytes);
-                       this.position += numBytes;
-               }
-               
-       }
-       
        public static final class DataInputDeserializer implements 
DataInputView {
                
                private byte[] buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/19066b52/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
new file mode 100644
index 0000000..58a2aeb
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemoryUtils;
+
+public final class TestDataOutputSerializer implements DataOutputView {
+       
+       private byte[] buffer;
+       
+       private int position;
+
+       private ByteBuffer wrapper;
+       
+       private final int maxSize;
+       
+
+       public TestDataOutputSerializer(int startSize) {
+               this(startSize, Integer.MAX_VALUE);
+       }
+       
+       public TestDataOutputSerializer(int startSize, int maxSize) {
+               if (startSize < 1 || startSize > maxSize) {
+                       throw new IllegalArgumentException();
+               }
+
+               this.buffer = new byte[startSize];
+               this.wrapper = ByteBuffer.wrap(buffer);
+               this.maxSize = maxSize;
+       }
+       
+       public ByteBuffer wrapAsByteBuffer() {
+               this.wrapper.position(0);
+               this.wrapper.limit(this.position);
+               return this.wrapper;
+       }
+
+       public void clear() {
+               this.position = 0;
+       }
+
+       public int length() {
+               return this.position;
+       }
+
+       @Override
+       public String toString() {
+               return String.format("[pos=%d cap=%d]", this.position, 
this.buffer.length);
+       }
+
+       // 
----------------------------------------------------------------------------------------
+       //                               Data Output
+       // 
----------------------------------------------------------------------------------------
+       
+       @Override
+       public void write(int b) throws IOException {
+               if (this.position >= this.buffer.length) {
+                       resize(1);
+               }
+               this.buffer[this.position++] = (byte) (b & 0xff);
+       }
+
+       @Override
+       public void write(byte[] b) throws IOException {
+               write(b, 0, b.length);
+       }
+
+       @Override
+       public void write(byte[] b, int off, int len) throws IOException {
+               if (len < 0 || off > b.length - len) {
+                       throw new ArrayIndexOutOfBoundsException();
+               }
+               if (this.position > this.buffer.length - len) {
+                       resize(len);
+               }
+               System.arraycopy(b, off, this.buffer, this.position, len);
+               this.position += len;
+       }
+
+       @Override
+       public void writeBoolean(boolean v) throws IOException {
+               write(v ? 1 : 0);
+       }
+
+       @Override
+       public void writeByte(int v) throws IOException {
+               write(v);
+       }
+
+       @Override
+       public void writeBytes(String s) throws IOException {
+               final int sLen = s.length();
+               if (this.position >= this.buffer.length - sLen) {
+                       resize(sLen);
+               }
+               
+               for (int i = 0; i < sLen; i++) {
+                       writeByte(s.charAt(i));
+               }
+               this.position += sLen;
+       }
+
+       @Override
+       public void writeChar(int v) throws IOException {
+               if (this.position >= this.buffer.length - 1) {
+                       resize(2);
+               }
+               this.buffer[this.position++] = (byte) (v >> 8);
+               this.buffer[this.position++] = (byte) v;
+       }
+
+       @Override
+       public void writeChars(String s) throws IOException {
+               final int sLen = s.length();
+               if (this.position >= this.buffer.length - 2*sLen) {
+                       resize(2*sLen);
+               } 
+               for (int i = 0; i < sLen; i++) {
+                       writeChar(s.charAt(i));
+               }
+       }
+
+       @Override
+       public void writeDouble(double v) throws IOException {
+               writeLong(Double.doubleToLongBits(v));
+       }
+
+       @Override
+       public void writeFloat(float v) throws IOException {
+               writeInt(Float.floatToIntBits(v));
+       }
+
+       @SuppressWarnings("restriction")
+       @Override
+       public void writeInt(int v) throws IOException {
+               if (this.position >= this.buffer.length - 3) {
+                       resize(4);
+               }
+               if (LITTLE_ENDIAN) {
+                       v = Integer.reverseBytes(v);
+               }                       
+               UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
+               this.position += 4;
+       }
+
+       @SuppressWarnings("restriction")
+       @Override
+       public void writeLong(long v) throws IOException {
+               if (this.position >= this.buffer.length - 7) {
+                       resize(8);
+               }
+               if (LITTLE_ENDIAN) {
+                       v = Long.reverseBytes(v);
+               }
+               UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
+               this.position += 8;
+       }
+
+       @Override
+       public void writeShort(int v) throws IOException {
+               if (this.position >= this.buffer.length - 1) {
+                       resize(2);
+               }
+               this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
+               this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff);
+       }
+
+       @Override
+       public void writeUTF(String str) throws IOException {
+               int strlen = str.length();
+               int utflen = 0;
+               int c;
+
+               /* use charAt instead of copying String to char array */
+               for (int i = 0; i < strlen; i++) {
+                       c = str.charAt(i);
+                       if ((c >= 0x0001) && (c <= 0x007F)) {
+                               utflen++;
+                       } else if (c > 0x07FF) {
+                               utflen += 3;
+                       } else {
+                               utflen += 2;
+                       }
+               }
+
+               if (utflen > 65535) {
+                       throw new UTFDataFormatException("Encoded string is too 
long: " + utflen);
+               }
+               else if (this.position > this.buffer.length - utflen - 2) {
+                       resize(utflen + 2);
+               }
+               
+               byte[] bytearr = this.buffer;
+               int count = this.position;
+
+               bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+               bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+
+               int i = 0;
+               for (i = 0; i < strlen; i++) {
+                       c = str.charAt(i);
+                       if (!((c >= 0x0001) && (c <= 0x007F))) {
+                               break;
+                       }
+                       bytearr[count++] = (byte) c;
+               }
+
+               for (; i < strlen; i++) {
+                       c = str.charAt(i);
+                       if ((c >= 0x0001) && (c <= 0x007F)) {
+                               bytearr[count++] = (byte) c;
+
+                       } else if (c > 0x07FF) {
+                               bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 
0x0F));
+                               bytearr[count++] = (byte) (0x80 | ((c >> 6) & 
0x3F));
+                               bytearr[count++] = (byte) (0x80 | ((c >> 0) & 
0x3F));
+                       } else {
+                               bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 
0x1F));
+                               bytearr[count++] = (byte) (0x80 | ((c >> 0) & 
0x3F));
+                       }
+               }
+
+               this.position = count;
+       }
+       
+       
+       private void resize(int minCapacityAdd) throws IOException {
+               try {
+                       int newLen = Math.max(this.buffer.length * 2, 
this.buffer.length + minCapacityAdd);
+                       
+                       if (newLen > maxSize) {
+                               
+                               if (this.buffer.length + minCapacityAdd > 
maxSize) {
+                                       throw new EOFException("Exceeded 
maximum capacity");
+                               }
+                               
+                               newLen = maxSize;
+                       }
+                       
+                       final byte[] nb = new byte[newLen];
+                       System.arraycopy(this.buffer, 0, nb, 0, this.position);
+                       this.buffer = nb;
+                       this.wrapper = ByteBuffer.wrap(this.buffer);
+               }
+               catch (NegativeArraySizeException nasex) {
+                       throw new IOException("Serialization failed because the 
record length would exceed 2GB (max addressable array size in Java).");
+               }
+       }
+       
+       @SuppressWarnings("restriction")
+       private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+       
+       @SuppressWarnings("restriction")
+       private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+       
+       private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+
+       @Override
+       public void skipBytesToWrite(int numBytes) throws IOException {
+               if(buffer.length - this.position < numBytes){
+                       throw new EOFException("Could not skip " + numBytes + " 
bytes.");
+               }
+
+               this.position += numBytes;
+       }
+
+       @Override
+       public void write(DataInputView source, int numBytes) throws 
IOException {
+               if(buffer.length - this.position < numBytes){
+                       throw new EOFException("Could not write " + numBytes + 
" bytes. Buffer overflow.");
+               }
+
+               source.read(this.buffer, this.position, numBytes);
+               this.position += numBytes;
+       }
+       
+}
\ No newline at end of file

Reply via email to