Author: cutting
Date: Fri Apr 10 23:31:37 2009
New Revision: 764104
URL: http://svn.apache.org/viewvc?rev=764104&view=rev
Log:
AVRO-3. Fix ValueReader to throw an exception at EOF. Contributed by Pat Hunt.
Added:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/test/java/org/apache/avro/io/
hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestValueReader.java
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java
Added: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=764104&view=auto
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (added)
+++ hadoop/avro/trunk/CHANGES.txt Fri Apr 10 23:31:37 2009
@@ -0,0 +1,17 @@
+Avro Change Log
+
+Trunk (unreleased changes)
+
+ INCOMPATIBLE CHANGES
+
+ NEW FEATURES
+
+ IMPROVEMENTS
+
+ OPTIMIZATIONS
+
+ BUG FIXES
+
+ AVRO-3. Fix ValueReader to throw an exception at EOF.
+ (Pat Hunt via cutting)
+
Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java?rev=764104&r1=764103&r2=764104&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/ValueReader.java Fri Apr 10
23:31:37 2009
@@ -17,8 +17,12 @@
*/
package org.apache.avro.io;
-import java.io.*;
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
+
import org.apache.avro.util.Utf8;
/** Read leaf values.
@@ -30,73 +34,93 @@
public ValueReader(InputStream in) {
super(in);
}
- /** Read a string written by {...@link ValueWriter#writeUtf8(Utf8)}. */
+
+ /** Same contract as {...@link InputStream#read()}, except that EOFException
is
+ * throw when EOF reached rather than returning -1.
+ * @throws EOFException if at EOF. */
+ public int read() throws IOException {
+ int value = in.read();
+ if (value < 0) throw new EOFException();
+ return value;
+ }
+
+ /** Read a string written by {...@link ValueWriter#writeUtf8(Utf8)}.
+ * @throws EOFException if EOF is reached before reading all the bytes. */
public Utf8 readUtf8(Object old) throws IOException {
Utf8 utf8 = old instanceof Utf8 ? (Utf8)old : new Utf8();
utf8.setLength((int)readLong());
readBytes(utf8.getBytes(), 0, utf8.getLength());
return utf8;
}
- /** Read buffer written by {...@link ValueWriter#writeBuffer(ByteBuffer)}. */
+ /** Read buffer written by {...@link ValueWriter#writeBuffer(ByteBuffer)}.
+ * @throws EOFException if EOF is reached before reading all the bytes. */
public ByteBuffer readBuffer(Object old) throws IOException {
int length = (int)readLong();
ByteBuffer bytes;
if ((old instanceof ByteBuffer) && ((ByteBuffer)old).capacity() >= length)
{
bytes = (ByteBuffer)old;
bytes.clear();
- } else
+ } else {
bytes = ByteBuffer.allocate(length);
+ }
readBytes(bytes.array(), 0, length);
bytes.limit(length);
return bytes;
}
- /** Read an int written by {...@link ValueWriter#writeInt(int)}. */
+ /** Read an int written by {...@link ValueWriter#writeInt(int)}.
+ * @throws EOFException if EOF is reached before reading all the bytes.*/
public int readInt() throws IOException {
return (int)readLong();
}
- /** Read a long written by {...@link ValueWriter#writeLong(long)}. */
+ /** Read a long written by {...@link ValueWriter#writeLong(long)}.
+ * @throws EOFException if EOF is reached before reading all the bytes. */
public long readLong() throws IOException {
- long b = in.read();
+ long b = read();
long n = b & 0x7F;
for (int shift = 7; (b & 0x80) != 0; shift += 7) {
- b = in.read();
+ b = read();
n |= (b & 0x7F) << shift;
}
return (n >>> 1) ^ -(n & 1); // back to two's-complement
}
- /** Read a float written by {...@link ValueWriter#writeFloat(float)}. */
+ /** Read a float written by {...@link ValueWriter#writeFloat(float)}.
+ * @throws EOFException if EOF is reached before reading all the bytes. */
public float readFloat() throws IOException {
- return Float.intBitsToFloat(((in.read() & 0xff) ) |
- ((in.read() & 0xff) << 8) |
- ((in.read() & 0xff) << 16) |
- ((in.read() & 0xff) << 24));
+ return Float.intBitsToFloat(((read() & 0xff) ) |
+ ((read() & 0xff) << 8) |
+ ((read() & 0xff) << 16) |
+ ((read() & 0xff) << 24));
}
- /** Read a double written by {...@link ValueWriter#writeDouble(double)}. */
+ /** Read a double written by {...@link ValueWriter#writeDouble(double)}.
+ * @throws EOFException if EOF is reached before reading all the bytes. */
public double readDouble() throws IOException {
- return Double.longBitsToDouble(((in.read() & 0xffL) ) |
- ((in.read() & 0xffL) << 8) |
- ((in.read() & 0xffL) << 16) |
- ((in.read() & 0xffL) << 24) |
- ((in.read() & 0xffL) << 32) |
- ((in.read() & 0xffL) << 40) |
- ((in.read() & 0xffL) << 48) |
- ((in.read() & 0xffL) << 56));
+ return Double.longBitsToDouble(((read() & 0xffL) ) |
+ ((read() & 0xffL) << 8) |
+ ((read() & 0xffL) << 16) |
+ ((read() & 0xffL) << 24) |
+ ((read() & 0xffL) << 32) |
+ ((read() & 0xffL) << 40) |
+ ((read() & 0xffL) << 48) |
+ ((read() & 0xffL) << 56));
}
- /** Read a boolean written by {...@link ValueWriter#writeBoolean(boolean)}.
*/
+ /** Read a boolean written by {...@link ValueWriter#writeBoolean(boolean)}.
+ * @throws EOFException if EOF is reached before reading all the bytes. */
public boolean readBoolean() throws IOException {
- return in.read() == 1;
+ return read() == 1;
}
- /** Read bytes into an array. */
+ /** Read bytes into an array.
+ * @throws EOFException if EOF is reached before reading all the bytes. */
public void readBytes(byte[] buffer) throws IOException {
readBytes(buffer, 0, buffer.length);
}
- /** Read bytes into an array. */
+ /** Read bytes into an array.
+ * @throws EOFException if EOF is reached before reading all the bytes. */
public void readBytes(byte[] buffer, int offset, int length)
throws IOException {
int total = 0;
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java?rev=764104&r1=764103&r2=764104&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/ByteBufferInputStream.java
Fri Apr 10 23:31:37 2009
@@ -18,14 +18,14 @@
package org.apache.avro.ipc;
-import java.io.*;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.List;
/** Utility to present {...@link ByteBuffer} data as an {...@link
InputStream}.*/
class ByteBufferInputStream extends InputStream {
- private static final int BUFFER_SIZE = 8192;
-
private List<ByteBuffer> buffers;
private int current;
@@ -33,15 +33,16 @@
this.buffers = buffers;
}
- public int read() {
- ByteBuffer buffer = buffers.get(current);
- while (!buffer.hasRemaining()) // skip empty
- buffer = buffers.get(++current);
- return buffer.get();
+ /** @see InputStream#read()
+ * @throws EOFException if EOF is reached. */
+ public int read() throws IOException {
+ return getBuffer().get() & 0xff;
}
- public int read(byte b[], int off, int len) {
- ByteBuffer buffer = buffers.get(current);
+ /** @see InputStream#read(byte[], int, int)
+ * @throws EOFException if EOF is reached before reading all the bytes. */
+ public int read(byte b[], int off, int len) throws IOException {
+ ByteBuffer buffer = getBuffer();
int remaining = buffer.remaining();
if (len > remaining) {
buffer.get(b, off, remaining);
@@ -52,18 +53,32 @@
}
}
- /** Read a buffer from the input without copying, if possible. */
+ /** Read a buffer from the input without copying, if possible.
+ * @throws EOFException if EOF is reached before reading all the bytes. */
public ByteBuffer readBuffer(int length) throws IOException {
- ByteBuffer buffer = buffers.get(current);
- while (!buffer.hasRemaining()) // skip empty
- buffer = buffers.get(++current);
+ ByteBuffer buffer = getBuffer();
if (buffer.remaining() == length) { // can return current as-is?
current++;
return buffer; // return w/o copying
}
// punt: allocate a new buffer & copy into it
ByteBuffer result = ByteBuffer.allocate(length);
- read(result.array(), 0, length);
+ int start = 0;
+ while (start < length)
+ start += read(result.array(), start, length-start);
return result;
}
+
+ /** Returns the next non-empty buffer.
+ * @throws EOFException if EOF is reached before reading all the bytes.
+ */
+ private ByteBuffer getBuffer() throws IOException {
+ while (current < buffers.size()) {
+ ByteBuffer buffer = buffers.get(current);
+ if (buffer.hasRemaining())
+ return buffer;
+ current++;
+ }
+ throw new EOFException();
+ }
}
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestValueReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestValueReader.java?rev=764104&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestValueReader.java
(added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestValueReader.java Fri
Apr 10 23:31:37 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.avro.util.Utf8;
+
+
+public class TestValueReader extends TestCase {
+ /** Verify EOFException throw at EOF */
+ public void testEOFHandling() throws IOException {
+ InputStream is = new ByteArrayInputStream(new byte[0]);
+ ValueReader vr = new ValueReader(is);
+
+ try {
+ vr.readBoolean();
+ fail();
+ } catch (EOFException e) { /* this is good */ }
+ try {
+ vr.readBuffer(null);
+ fail();
+ } catch (EOFException e) { /* this is good */ }
+ try {
+ vr.readBytes(new byte[1]);
+ fail();
+ } catch (EOFException e) { /* this is good */ }
+ try {
+ vr.readBytes(new byte[1], 0, 1);
+ fail();
+ } catch (EOFException e) { /* this is good */ }
+ try {
+ vr.readDouble();
+ fail();
+ } catch (EOFException e) { /* this is good */ }
+ try {
+ vr.readFloat();
+ fail();
+ } catch (EOFException e) { /* this is good */ }
+ try {
+ vr.readInt();
+ fail();
+ } catch (EOFException e) { /* this is good */ }
+ try {
+ vr.readLong();
+ fail();
+ } catch (EOFException e) { /* this is good */ }
+ try {
+ vr.readUtf8(new Utf8("a"));
+ fail();
+ } catch (EOFException e) { /* this is good */ }
+ }
+}