Author: cutting
Date: Tue Apr 14 22:28:43 2009
New Revision: 764989
URL: http://svn.apache.org/viewvc?rev=764989&view=rev
Log:
AVRO-11. Re-implement specific and reflect datum readers and writers to
leverage AVRO-6.
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=764989&r1=764988&r2=764989&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Apr 14 22:28:43 2009
@@ -12,6 +12,9 @@
IMPROVEMENTS
+ AVRO-11. Re-implement specific and reflect datum readers and
+ writers to leverage AVRO-6. (cutting)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=764989&r1=764988&r2=764989&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java
Tue Apr 14 22:28:43 2009
@@ -42,32 +42,36 @@
setSchema(root);
}
- protected Object readRecord(Object old, Schema actual, Schema expected,
- ValueReader in) throws IOException {
- Class recordClass;
+ protected Object newRecord(Object old, Schema schema) {
+ Class c;
try {
- recordClass = Class.forName(packageName+expected.getName());
+ c = Class.forName(packageName+schema.getName());
} catch (ClassNotFoundException e) {
throw new AvroRuntimeException(e);
}
- expected = ReflectData.getSchema(recordClass);
- Map<String,Schema.Field> expectedFields = expected.getFields();
- Object record = recordClass.isInstance(old) ? old :
newInstance(recordClass);
- for (Map.Entry<String, Schema> entry : actual.getFieldSchemas()) {
- try {
- Field field = recordClass.getField(entry.getKey());
- field.setAccessible(true);
- String key = entry.getKey();
- Schema aField = entry.getValue();
- Schema eField = field.getType() ==
- Object.class ? aField : expectedFields.get(key).schema();
- field.set(record, read(null, aField, eField, in));
- } catch (NoSuchFieldException e) { // ignore unmatched field
- } catch (IllegalAccessException e) {
- throw new AvroRuntimeException(e);
- }
+ return(c.isInstance(old) ? old : newInstance(c));
+ }
+
+ protected void addField(Object record, String name, int position, Object o) {
+ try {
+ Field field = record.getClass().getField(name);
+ field.setAccessible(true);
+ field.set(record, o);
+ } catch (Exception e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+ protected Object getField(Object record, String name, int position) {
+ try {
+ Field field = record.getClass().getField(name);
+ field.setAccessible(true);
+ return field.get(record);
+ } catch (Exception e) {
+ throw new AvroRuntimeException(e);
}
- return record;
+ }
+ protected void removeField(Object record, String name, int position) {
+ addField(record, name, position, null);
}
private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
@@ -91,24 +95,4 @@
}
return result;
}
-
- @Override
- protected void addField(Object record, String name, int position, Object o) {
- throw new AvroRuntimeException("Not implemented");
- }
-
- @Override
- protected Object getField(Object record, String name, int position) {
- throw new AvroRuntimeException("Not implemented");
- }
-
- @Override
- protected void removeField(Object record, String field, int position) {
- throw new AvroRuntimeException("Not implemented");
- }
-
- @Override
- protected Object newRecord(Object old, Schema schema) {
- throw new AvroRuntimeException("Not implemented");
- }
}
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java?rev=764989&r1=764988&r2=764989&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
Tue Apr 14 22:28:43 2009
@@ -36,18 +36,12 @@
super(root);
}
- protected void writeRecord(Schema schema, Object datum, ValueWriter out)
- throws IOException {
- Class recordClass = datum.getClass();
- for (Map.Entry<String, Schema> entry : schema.getFieldSchemas()) {
- try {
- Field field = recordClass.getField(entry.getKey());
- write(entry.getValue(), field.get(datum), out);
- } catch (NoSuchFieldException e) {
- throw new AvroRuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new AvroRuntimeException(e);
- }
+ protected Object getField(Object record, String name, int position) {
+ try {
+ Field field = record.getClass().getField(name);
+ return field.get(record);
+ } catch (Exception e) {
+ throw new AvroRuntimeException(e);
}
}
@@ -55,11 +49,6 @@
protected boolean isRecord(Object datum) {
return ReflectData.getSchema(datum.getClass()).getType() == Type.RECORD;
}
-
- @Override
- protected Object getField(Object record, String field, int position) {
- throw new AvroRuntimeException("Not implemented");
- }
protected boolean instanceOf(Schema schema, Object datum) {
return (schema.getType() == Type.RECORD)
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java?rev=764989&r1=764988&r2=764989&view=diff
==============================================================================
---
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
(original)
+++
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
Tue Apr 14 22:28:43 2009
@@ -39,36 +39,19 @@
super(root, packageName);
}
- protected Object readRecord(Object old, Schema remote, Schema local,
- ValueReader in) throws IOException {
- /* TODO: Use schema's field numbers instead of creating our own map? */
- Class c = getClass(remote.getName());
- SpecificRecord record =
- (SpecificRecord)(c.isInstance(old) ? old : newInstance(c));
- local = record.schema();
- Map<String,Schema.Field> localFields = local.getFields();
- int[] map = getMap(local, remote);
- int i = 0, size = 0, j = 0;
- for (Map.Entry<String, Schema> entry : remote.getFieldSchemas()) {
- String key = entry.getKey();
- Schema rField = entry.getValue();
- Schema lField = local == remote ? rField : localFields.get(key).schema();
- int fieldNum = map[i++];
- if (fieldNum == -1) {
- skip(rField, in);
- continue;
- }
- Object oldDatum = old != null ? record.get(fieldNum) : null;
- record.set(fieldNum, read(oldDatum, rField, lField, in));
- size++;
- }
- if (local.getFields().size() > size) // clear unset fields
- for (Map.Entry<String, Schema> entry : local.getFieldSchemas()) {
- if (!(remote.getFields().containsKey(entry.getKey())))
- record.set(j, null);
- j++;
- }
- return record;
+ protected Object newRecord(Object old, Schema schema) {
+ Class c = getClass(schema.getName());
+ return(c.isInstance(old) ? old : newInstance(c));
+ }
+
+ protected void addField(Object record, String name, int position, Object o) {
+ ((SpecificRecord)record).set(position, o);
+ }
+ protected Object getField(Object record, String name, int position) {
+ return ((SpecificRecord)record).get(position);
+ }
+ protected void removeField(Object record, String field, int position) {
+ ((SpecificRecord)record).set(position, null);
}
private Map<String,Class> classCache = new ConcurrentHashMap<String,Class>();
@@ -86,62 +69,4 @@
return c;
}
- private Map<Schema,Map<Schema,int[]>> mapCache =
- new IdentityHashMap<Schema,Map<Schema,int[]>>();
-
- private int[] getMap(Schema local, Schema remote) {
- synchronized (mapCache) {
- Map<Schema,int[]> localCache = mapCache.get(local);
- if (localCache == null) {
- localCache = new IdentityHashMap<Schema,int[]>();
- mapCache.put(local, localCache);
- }
- int[] result = localCache.get(remote);
- if (result == null) {
- result = createMap(remote, local);
- localCache.put(remote, result);
- }
- return result;
- }
- }
-
- private static int[] createMap(Schema remote, Schema local) {
- int[] map = new int[remote.getFields().size()];
- int i = 0;
- for (Map.Entry<String, Schema> f : remote.getFieldSchemas()) {
- map[i++] = getLocalIndex(f.getKey(), f.getValue().getType(), local);
- }
- return map;
- }
-
- private static int getLocalIndex(String name, Schema.Type type,
- Schema local) {
- int i = 0;
- for (Map.Entry<String, Schema> f : local.getFieldSchemas()) {
- if (f.getKey().equals(name) && f.getValue().getType().equals(type))
- return i;
- i++;
- }
- return -1;
- }
-
- @Override
- protected void addField(Object record, String name, int position, Object o) {
- throw new AvroRuntimeException("Not implemented");
- }
-
- @Override
- protected Object getField(Object record, String name, int position) {
- throw new AvroRuntimeException("Not implemented");
- }
-
- @Override
- protected void removeField(Object record, String field, int position) {
- throw new AvroRuntimeException("Not implemented");
- }
-
- @Override
- protected Object newRecord(Object old, Schema schema) {
- throw new AvroRuntimeException("Not implemented");
- }
}
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java?rev=764989&r1=764988&r2=764989&view=diff
==============================================================================
---
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java
(original)
+++
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java
Tue Apr 14 22:28:43 2009
@@ -33,11 +33,8 @@
super(root);
}
- protected void writeRecord(Schema schema, Object datum, ValueWriter out)
- throws IOException {
- SpecificRecord record = (SpecificRecord)datum;
- int i = 0;
- for (Map.Entry<String, Schema> entry : schema.getFieldSchemas())
- write(entry.getValue(), record.get(i++), out);
+ protected Object getField(Object record, String name, int position) {
+ return ((SpecificRecord)record).get(position);
}
+
}