Author: cutting
Date: Thu Jun 25 19:33:11 2009
New Revision: 788481
URL: http://svn.apache.org/viewvc?rev=788481&view=rev
Log:
AVRO-29. Reverting changes to GenericDatumReader.
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java?rev=788481&r1=788480&r2=788481&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
Thu Jun 25 19:33:11 2009
@@ -18,57 +18,59 @@
package org.apache.avro.generic;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
+import java.nio.ByteBuffer;
+
+import org.codehaus.jackson.JsonNode;
import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.util.Utf8;
-import org.codehaus.jackson.JsonNode;
/** {...@link DatumReader} for generic Java objects. */
public class GenericDatumReader<D> implements DatumReader<D> {
- private Schema writerSchema;
- private Schema readerSchema;
-
- public GenericDatumReader() { }
+ private Schema actual;
+ private Schema expected;
- public GenericDatumReader(Schema schema) {
- this.readerSchema = schema;
- }
+ public GenericDatumReader() {}
- public GenericDatumReader(Schema writerSchema, Schema readerSchema) {
- this.writerSchema = writerSchema;
- this.readerSchema = readerSchema;
+ public GenericDatumReader(Schema actual) {
+ setSchema(actual);
}
- public void setSchema(Schema schema) {
- this.readerSchema = schema;
+ public GenericDatumReader(Schema actual, Schema expected) {
+ this(actual);
+ this.expected = expected;
}
+ public void setSchema(Schema actual) { this.actual = actual; }
+
@SuppressWarnings("unchecked")
public D read(D reuse, Decoder in) throws IOException {
- return (D) read(reuse, readerSchema,
- writerSchema == null ? in :
- new ResolvingDecoder(writerSchema, readerSchema, in));
+ return (D) read(reuse, actual, expected != null ? expected : actual, in);
}
/** Called to read data.*/
- protected Object read(Object old, Schema schema, Decoder in)
- throws IOException {
- switch (schema.getType()) {
- case RECORD: return readRecord(old, schema, in);
- case ENUM: return readEnum(schema, in);
- case ARRAY: return readArray(old, schema, in);
- case MAP: return readMap(old, schema, in);
- case FIXED: return readFixed(old, schema, in);
+ protected Object read(Object old, Schema actual,
+ Schema expected, Decoder in) throws IOException {
+ if (actual.getType() == Type.UNION) // resolve unions
+ actual = actual.getTypes().get((int)in.readIndex());
+ if (expected.getType() == Type.UNION)
+ expected = resolveExpected(actual, expected);
+ switch (actual.getType()) {
+ case RECORD: return readRecord(old, actual, expected, in);
+ case ENUM: return readEnum(actual, expected, in);
+ case ARRAY: return readArray(old, actual, expected, in);
+ case MAP: return readMap(old, actual, expected, in);
+ case FIXED: return readFixed(old, actual, expected, in);
case STRING: return readString(old, in);
case BYTES: return readBytes(old, in);
case INT: return in.readInt();
@@ -77,62 +79,90 @@
case DOUBLE: return in.readDouble();
case BOOLEAN: return in.readBoolean();
case NULL: return null;
- case UNION: return readUnion(old, schema, in);
- default: throw new AvroRuntimeException("Unknown type: " + schema +
- " " + schema.getType());
+ default: throw new AvroRuntimeException("Unknown type: "+actual);
}
}
+ private Schema resolveExpected(Schema actual, Schema expected) {
+ // first scan for exact match
+ for (Schema branch : expected.getTypes())
+ if (branch.getType() == actual.getType())
+ switch (branch.getType()) {
+ case RECORD:
+ case FIXED:
+ String name = branch.getName();
+ if (name == null || name.equals(actual.getName()))
+ return branch;
+ break;
+ default:
+ return branch;
+ }
+ // then scan match via numeric promotion
+ for (Schema branch : expected.getTypes())
+ switch (actual.getType()) {
+ case INT:
+ switch (expected.getType()) {
+ case LONG: case FLOAT: case DOUBLE:
+ return expected;
+ }
+ break;
+ case LONG:
+ switch (expected.getType()) {
+ case FLOAT: case DOUBLE:
+ return expected;
+ }
+ break;
+ case FLOAT:
+ switch (expected.getType()) {
+ case DOUBLE:
+ return expected;
+ }
+ break;
+ }
+ throw new AvroTypeException("Expected "+expected+", found "+actual);
+ }
+
/** Called to read a record instance. May be overridden for alternate record
* representations.*/
- protected Object readRecord(Object old, Schema schema,
+ protected Object readRecord(Object old, Schema actual, Schema expected,
Decoder in) throws IOException {
- if (in instanceof ResolvingDecoder) {
- return readRecord(old, schema, (ResolvingDecoder) in);
- }
- Object record = newRecord(old, schema);
+ /* TODO: We may want to compute the expected and actual mapping and cache
+ * the mapping (keyed by <actual, expected>). */
+ String recordName = expected.getName();
+ if (recordName != null && !recordName.equals(actual.getName()))
+ throw new AvroTypeException("Expected "+expected+", found "+actual);
+ Map<String, Field> expectedFields = expected.getFields();
+ // all fields not in expected should be removed by newRecord.
+ Object record = newRecord(old, expected);
int size = 0;
- for (Map.Entry<String, Field> entry : schema.getFields().entrySet()) {
+ for (Map.Entry<String, Field> entry : actual.getFields().entrySet()) {
String fieldName = entry.getKey();
- Field field = entry.getValue();
- int fieldPosition = field.pos();
+ Field actualField = entry.getValue();
+ Field expectedField =
+ expected == actual ? actualField :
expectedFields.get(entry.getKey());
+ if (expectedField == null) {
+ skip(actualField.schema(), in);
+ continue;
+ }
+ int fieldPosition = expectedField.pos();
Object oldDatum =
(old != null) ? getField(record, fieldName, fieldPosition) : null;
addField(record, fieldName, fieldPosition,
- read(oldDatum, field.schema(), in));
+ read(oldDatum,actualField.schema(),expectedField.schema(), in));
size++;
}
- return record;
- }
-
- protected Object readRecord(Object old, Schema schema,
- ResolvingDecoder in) throws IOException {
- Object record = newRecord(old, schema);
- Map<String, Field> readerFields = schema.getFields();
-
- BitSet bs = new BitSet();
- for (int i = 0; i < readerFields.size(); i++) {
- String fn = in.readFieldName();
- if (fn == null) {
- break;
- }
- Field f = readerFields.get(fn);
- int fp = f.pos();
- bs.set(fp);
- Object oldDatum =
- (old != null) ? getField(record, fn, fp) : null;
- addField(record, fn, fp, read(oldDatum, f.schema(), in));
- }
- for (Map.Entry<String, Field> entry : readerFields.entrySet()) {
- Field f = entry.getValue();
- if (! bs.get(f.pos())) {
+ if (expectedFields.size() > size) { // not all fields set
+ Set<String> actualFields = actual.getFields().keySet();
+ for (Map.Entry<String, Field> entry : expectedFields.entrySet()) {
String fieldName = entry.getKey();
- JsonNode json = f.defaultValue();
- if (json != null) { // has default
- addField(record, fieldName, f.pos(), // add default
- defaultFieldValue(old, f.schema(), json));
- } else if (old != null) { // remove stale value
- removeField(record, fieldName, entry.getValue().pos());
+ if (!actualFields.contains(fieldName)) { // an unset field
+ Field f = entry.getValue();
+ JsonNode json = f.defaultValue();
+ if (json != null) // has default
+ addField(record, fieldName, f.pos(), // add default
+ defaultFieldValue(old, f.schema(), json));
+ else if (old != null) // remove stale value
+ removeField(record, fieldName, entry.getValue().pos());
}
}
}
@@ -207,16 +237,18 @@
case DOUBLE: return json.getDoubleValue();
case BOOLEAN: return json.getBooleanValue();
case NULL: return null;
- default: throw new AvroRuntimeException("Unknown type: " + schema);
+ default: throw new AvroRuntimeException("Unknown type: "+actual);
}
}
/** Called to read an enum value. May be overridden for alternate enum
* representations. By default, returns the symbol as a String. */
- protected Object readEnum(Schema schema, Decoder in)
+ protected Object readEnum(Schema actual, Schema expected, Decoder in)
throws IOException {
- return createEnum(schema.getEnumSymbols().get(in.readEnum()),
- schema);
+ String name = expected.getName();
+ if (name != null && !name.equals(actual.getName()))
+ throw new AvroTypeException("Expected "+expected+", found "+actual);
+ return createEnum(actual.getEnumSymbols().get(in.readEnum()), expected);
}
/** Called to create an enum value. May be overridden for alternate enum
@@ -225,18 +257,19 @@
/** Called to read an array instance. May be overridden for alternate array
* representations.*/
- protected Object readArray(Object old, Schema schema,
+ protected Object readArray(Object old, Schema actual, Schema expected,
Decoder in) throws IOException {
- Schema type = schema.getElementType();
+ Schema actualType = actual.getElementType();
+ Schema expectedType = expected.getElementType();
long l = in.readArrayStart();
if (l > 0) {
Object array = newArray(old, (int) l);
do {
for (long i = 0; i < l; i++) {
- addToArray(array,
- read(peekArray(array), type, in));
+ addToArray(array, read(peekArray(array), actualType, expectedType,
in));
}
} while ((l = in.arrayNext()) > 0);
+
return array;
} else {
return newArray(old, 0);
@@ -260,9 +293,10 @@
/** Called to read a map instance. May be overridden for alternate map
* representations.*/
- protected Object readMap(Object old, Schema schema,
+ protected Object readMap(Object old, Schema actual, Schema expected,
Decoder in) throws IOException {
- Schema valueType = schema.getValueType();
+ Schema aValue = actual.getValueType();
+ Schema eValue = expected.getValueType();
long l = in.readMapStart();
Object map = newMap(old, (int) l);
if (l > 0) {
@@ -270,7 +304,7 @@
for (int i = 0; i < l; i++) {
addToMap(map,
readString(null, in),
- read(null, valueType, in));
+ read(null, aValue, eValue, in));
}
} while ((l = in.mapNext()) > 0);
}
@@ -286,11 +320,13 @@
/** Called to read a fixed value. May be overridden for alternate fixed
* representations. By default, returns {...@link GenericFixed}. */
- protected Object readFixed(Object old, Schema schema,
+ protected Object readFixed(Object old, Schema actual, Schema expected,
Decoder in)
throws IOException {
- GenericFixed fixed = (GenericFixed) createFixed(old, schema);
- in.readFixed(fixed.bytes(), 0, schema.getFixedSize());
+ if (!actual.equals(expected))
+ throw new AvroTypeException("Expected "+expected+", found "+actual);
+ GenericFixed fixed = (GenericFixed)createFixed(old, expected);
+ in.readFixed(fixed.bytes(), 0, actual.getFixedSize());
return fixed;
}
@@ -310,14 +346,6 @@
System.arraycopy(bytes, 0, fixed.bytes(), 0, schema.getFixedSize());
return fixed;
}
-
- private Object readUnion(Object old, Schema schema, Decoder in)
- throws IOException {
- int idx = in.readIndex();
- Schema s = schema.getTypes().get(idx);
- return read(old, s, in);
- }
-
/**
* Called to create new record instances. Subclasses may override to use a
* different record implementation. The returned instance must conform to the
@@ -380,4 +408,54 @@
* override to use a different byte array representation. By default, this
* calls {...@link ByteBuffer#wrap(byte[])}.*/
protected Object createBytes(byte[] value) { return ByteBuffer.wrap(value); }
+
+ private static final Schema STRING_SCHEMA = Schema.create(Type.STRING);
+
+ /** Skip an instance of a schema. */
+ public static void skip(Schema schema, Decoder in) throws IOException {
+ switch (schema.getType()) {
+ case RECORD:
+ for (Map.Entry<String, Schema> entry : schema.getFieldSchemas())
+ skip(entry.getValue(), in);
+ break;
+ case ENUM:
+ in.readInt();
+ break;
+ case ARRAY:
+ Schema elementType = schema.getElementType();
+ for (long l = in.skipArray(); l > 0; l = in.skipArray()) {
+ for (long i = 0; i < l; i++) {
+ skip(elementType, in);
+ }
+ }
+ break;
+ case MAP:
+ Schema value = schema.getValueType();
+ for (long l = in.skipMap(); l > 0; l = in.skipMap()) {
+ for (long i = 0; i < l; i++) {
+ skip(STRING_SCHEMA, in);
+ skip(value, in);
+ }
+ }
+ break;
+ case UNION:
+ skip(schema.getTypes().get((int)in.readIndex()), in);
+ break;
+ case FIXED:
+ in.skipFixed(schema.getFixedSize());
+ break;
+ case STRING:
+ case BYTES:
+ in.skipBytes();
+ break;
+ case INT: in.readInt(); break;
+ case LONG: in.readLong(); break;
+ case FLOAT: in.readFloat(); break;
+ case DOUBLE: in.readDouble(); break;
+ case BOOLEAN: in.readBoolean(); break;
+ case NULL: break;
+ default: throw new RuntimeException("Unknown type: "+schema);
+ }
+ }
+
}