This is an automated email from the ASF dual-hosted git repository. thiru pushed a commit to branch fast-decoder-thiru in repository https://gitbox.apache.org/repos/asf/avro.git
commit 5e50f64995559c647fc98e3f87e1aebccec3f7c7 Author: rstata <[email protected]> AuthorDate: Tue Apr 30 16:34:39 2019 -0700 Fleshed out rest of cases in GenericReader2 (but do not handle logical types). --- .../java/org/apache/avro/generic/Advancer.java | 107 ++++++++++++++------- .../apache/avro/generic/GenericDatumReader2.java | 96 ++++++++++++------ 2 files changed, 137 insertions(+), 66 deletions(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java index d80e812..7c469cb 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java +++ b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java @@ -57,9 +57,9 @@ import org.apache.avro.util.Utf8; * and return the actual value.) * * Traversing records, arrays, and maps is more involved. In the - * case of an array or map, call {@link getContainerAdvancer} and - * proceed as described in the documentation for {@link - * Advancer.Container}. For records, best to just look at the + * case of an array or map, call {@link getArrayAdvancer} {@link + * getMapAdvancer} and proceed as described in the documentation for + * {@link Advancer.Container}. For records, best to just look at the * implementation of {@link GenericDatumReader2}. **/ abstract class Advancer { @@ -112,11 +112,18 @@ abstract class Advancer { return null; } - /** Access to advancer for array or map type. */ - public Container getContainerAdvancer(Decoder in) throws IOException { + /** Access to advancer for array type. */ + public Container getArrayAdvancer(Decoder in) throws IOException { exception(); return null; } + + /** Access to advancer for array type. */ + public Map getMapAdvancer(Decoder in) throws IOException { + exception(); + return null; + } + /** Access to advancer for record type. */ public Record getRecordAdvancer(Decoder in) throws IOException { exception(); @@ -168,8 +175,8 @@ abstract class Advancer { case CONTAINER: Advancer ea = Advancer.from(((Resolver.Container)a).elementAction); if (a.writer.getType() == Schema.Type.ARRAY) - return new ArrayContainer(a.writer, a.reader, ea); - else return new MapContainer(a.writer, a.reader, ea); + return new Container(a.writer, a.reader, ea); + else return new Map(a.writer, a.reader, ea); case RECORD: return Advancer.Record.from((Resolver.RecordAdjust)a); @@ -188,7 +195,7 @@ abstract class Advancer { ru.firstMatch, Advancer.from(ru.actualAction)); case ERROR: - return new Error(w,r, a.toString()); + return new Error(a.writer,a.reader, a.toString()); case SKIP: throw new RuntimeException("Internal error. Skip should've been consumed."); default: @@ -224,42 +231,54 @@ abstract class Advancer { } } - /** Used for Array and Map. The following fragment illustrates how + /** Used for Array. The following fragment illustrates how * to use to read an array of int: * * <pre> - * Advancer c = advancer.getContainerAdvancer(in); - * Advancer.Container ec = c.getElementAdvancer(in); + * Advancer.Container c = advancer.getArrayAdvancer(in); * for(long i = c.firstChunk(in); i != 0; i = c.nextChunk(in)) { * for (long j = 0; j < i; j++) { - * int element = c.readInt(in); + * int element = c.elementAdvancer.readInt(in); * // .. do something with this element * } * } * </pre> * See the implementation of {@link GenericDatumReader2} for more * illustrations. */ - public abstract static class Container extends Advancer { - private final Advancer elementAdvancer; - public Container(Schema w, Schema r, Advancer ea) { super(wr); elementAdvancer = ea; } - public Container getContainerAdvancer(Decoder in) { return this; } - public Advancer getElementAdvancer(Decoder in) { return elementAdvancer; } - public abstract long firstChunk(Decoder in) throws IOException; - public abstract long nextChunk(Decoder in) throws IOException; - } + public static class Container extends Advancer { + public final Advancer elementAdvancer; + public Container(Schema w, Schema r, Advancer ea) + { super(w,r); elementAdvancer = ea; } - private static class ArrayContainer extends Container { - public ArrayContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); } public long firstChunk(Decoder in) throws IOException { return in.readArrayStart(); } + public long nextChunk(Decoder in) throws IOException { return in.arrayNext(); } } - private static class MapContainer extends Container { - public MapContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); } + /** Used for Map. The following fragment illustrates how + * to use to read an array of int: + * + * <pre> + * Advancer.Map c = advancer.getMapAdvancer(in); + * for(long i = c.firstChunk(in); i != 0; i = c.nextChunk(in)) { + * for (long j = 0; j < i; j++) { + * String key = c.keyAdvancer.readString(in); + * int element = c.elementAdvancer.readInt(in); + * // .. do something with this element + * } + * } + * </pre> + * See the implementation of {@link GenericDatumReader2} for more + * illustrations. */ + public static class Map extends Container { + public final Advancer keyAdvancer = StringFast.instance; + public Map(Schema w, Schema r, Advancer ea) { super(w,r,ea); } + public long firstChunk(Decoder in) throws IOException { return in.readMapStart(); } + public long nextChunk(Decoder in) throws IOException { return in.mapNext(); } } @@ -270,7 +289,8 @@ abstract class Advancer { private static class NullFast extends Advancer { public static final NullFast instance = new NullFast(); - private NullFast() { Schema s = Schema.create(Schema.Type.NULL); super(s,s); } + private static final Schema s = Schema.create(Schema.Type.NULL); + private NullFast() { super(s,s); } public Object nextNull(Decoder in) throws IOException { in.readNull(); return null; @@ -280,7 +300,8 @@ abstract class Advancer { private static class BooleanFast extends Advancer { public static final BooleanFast instance = new BooleanFast(); - private BooleanFast() { Schema s = Schema.create(Schema.Type.BOOLEAN); super(s,s); } + private static final Schema s = Schema.create(Schema.Type.BOOLEAN); + private BooleanFast() { super(s,s); } public boolean nextBoolean(Decoder in) throws IOException { return in.readBoolean(); } @@ -289,7 +310,8 @@ abstract class Advancer { private static class IntFast extends Advancer { public static final IntFast instance = new IntFast(); - private IntFast() { Schema s = Schema.create(Schema.Type.INTEGER); super(s,s); } + private static final Schema s = Schema.create(Schema.Type.INT); + private IntFast() { super(s,s); } public int nextInt(Decoder in) throws IOException { return in.readInt(); } @@ -298,7 +320,8 @@ abstract class Advancer { private static class LongFast extends Advancer { public static final LongFast instance = new LongFast(); - private LongFast() { Schema s = Schema.create(Schema.Type.LONG); super(s,s); } + private static final Schema s = Schema.create(Schema.Type.LONG); + private LongFast() { super(s,s); } public long nextLong(Decoder in) throws IOException { return in.readLong(); } @@ -307,7 +330,8 @@ abstract class Advancer { private static class FloatFast extends Advancer { public static final FloatFast instance = new FloatFast(); - private FloatFast() { Schema s = Schema.create(Schema.Type.FLOAT); super(s,s); } + private static final Schema s = Schema.create(Schema.Type.FLOAT); + private FloatFast() { super(s,s); } public float nextFloat(Decoder in) throws IOException { return in.readFloat(); } @@ -316,7 +340,8 @@ abstract class Advancer { private static class DoubleFast extends Advancer { public static final DoubleFast instance = new DoubleFast(); - private DoubleFast() { Schema s = Schema.create(Schema.Type.DOUBLE); super(s,s); } + private static final Schema s = Schema.create(Schema.Type.DOUBLE); + private DoubleFast() { super(s,s); } public double nextDouble(Decoder in) throws IOException { return in.readDouble(); } @@ -325,7 +350,8 @@ abstract class Advancer { private static class StringFast extends Advancer { public static final StringFast instance = new StringFast(); - private StringFast() { Schema s = Schema.create(Schema.Type.STRING); super(s,s); } + private static final Schema s = Schema.create(Schema.Type.STRING); + private StringFast() { super(s,s); } public String nextString(Decoder in) throws IOException { return in.readString(); } public Utf8 nextString(Decoder in, Utf8 old) throws IOException { return in.readString(old); @@ -335,7 +361,8 @@ abstract class Advancer { private static class BytesFast extends Advancer { public static final BytesFast instance = new BytesFast(); - private BytesFast() { Schema s = Schema.create(Schema.Type.BYTES); super(s,s); } + private static final Schema s = Schema.create(Schema.Type.BYTES); + private BytesFast() { super(s,s); } public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException { return in.readBytes(old); } @@ -477,7 +504,7 @@ abstract class Advancer { * consume the tag ourself and call the corresponding advancer. */ private static class WriterUnion extends Advancer { private Advancer[] branches; - public WriterUnion(Schema w, Schema r, Advancer[] b) { super(w,r) branches = b; } + public WriterUnion(Schema w, Schema r, Advancer[] b) { super(w,r); branches = b; } private final Advancer b(Decoder in) throws IOException { return branches[in.readIndex()]; } @@ -504,8 +531,11 @@ abstract class Advancer { public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException { return b(in).getBranchAdvancer(in, branch); } - public Container getContainerAdvancer(Decoder in) throws IOException - { return b(in).getContainerAdvancer(in); } + public Container getArrayAdvancer(Decoder in) throws IOException + { return b(in).getArrayAdvancer(in); } + + public Map getMapAdvancer(Decoder in) throws IOException + { return b(in).getMapAdvancer(in); } public Record getRecordAdvancer(Decoder in) throws IOException { return b(in).getRecordAdvancer(in); } @@ -693,8 +723,11 @@ abstract class Advancer { public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException { ignore(toSkip, in); return field.getBranchAdvancer(in, branch); } - public Container getContainerAdvancer(Decoder in) throws IOException - { ignore(toSkip, in); return field.getContainerAdvancer(in); } + public Container getArrayAdvancer(Decoder in) throws IOException + { ignore(toSkip, in); return field.getArrayAdvancer(in); } + + public Map getMapAdvancer(Decoder in) throws IOException + { ignore(toSkip, in); return field.getMapAdvancer(in); } public Record getRecordAdvancer(Decoder in) throws IOException { ignore(toSkip, in); return field.getRecordAdvancer(in); } diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java index 46506c2..cbfcff7 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java +++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader2.java @@ -19,47 +19,42 @@ package org.apache.avro.generic; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.List; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import org.apache.avro.Resolver; import org.apache.avro.Schema; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; -public class GenericDatumReader2<D extends IndexedRecord> implements DatumReader<D> { - private final Schema reader, writer; +public class GenericDatumReader2<D> implements DatumReader<D> { private final Advancer.Record advancer; private final GenericData data; - private GenericDatumReader2(Schema writer, Schema reader, Advancer.Record a, GenericData d) { - this.writer = writer; - this.reader = reader; + private GenericDatumReader2(Advancer.Record a, GenericData d) { advancer = a; data = d; } - public static GenericDatumReader2 getReaderFor(Schema writer, Schema reader, GenericData data) { + /** ... Document how we use <code>d:</code> to create fixed, array, + * map, and record objects. + */ + public static GenericDatumReader2 getReaderFor(Schema writer, Schema reader, GenericData d) { // TODO: add caching - Resolver.Action a = Resolver.resolve(writer, reader, data); + Resolver.Action a = Resolver.resolve(writer, reader, d); Advancer.Record r = (Advancer.Record)Advancer.from(a); - return new GenericDatumReader2(writer, reader, r, data); + return new GenericDatumReader2(r, d); } public D read(D reuse, Decoder in) throws IOException { - List<Schema.Field> wf = writer.getFields(); - if (reuse == null) reuse = null; // FIXME - for (int i = 0; i < advancer.advancers.length; i++) { - int p = advancer.readerOrder[i].pos(); - reuse.put(p, read(null, wf.get(i).schema(), advancer.advancers[i], in)); - } - advancer.done(in); - return reuse; + return null; } - public Object read(Object reuse, Schema expected, Advancer a, Decoder in) + public Object read(Object reuse, Advancer a, Decoder in) throws IOException { - switch (expected.getType()) { + switch (a.reader.getType()) { case NULL: return a.nextNull(in); case BOOLEAN: return (Boolean) a.nextBoolean(in); case INT: return (Integer) a.nextInt(in); @@ -68,22 +63,65 @@ public class GenericDatumReader2<D extends IndexedRecord> implements DatumReader case DOUBLE: return (Double) a.nextDouble(in); case STRING: return (String) a.nextString(in); case BYTES: return a.nextBytes(in, (ByteBuffer)reuse); - case FIXED: + case FIXED: { + GenericFixed fixed = (GenericFixed) data.createFixed(reuse, a.reader); + a.nextFixed(in, fixed.bytes()); + return fixed; + } + case ARRAY: { - List result = null; // FIXME -- use GenericData methods here... - Advancer.Container c = advancer.getContainerAdvancer(in); - Advancer ec = c.getElementAdvancer(in); - Schema es = expected.getElementType(); - for(long i = c.firstChunk(in); i != 0; i = c.nextChunk(in)) { + Advancer.Container c = advancer.getArrayAdvancer(in); + Advancer ec = c.elementAdvancer; + long i = c.firstChunk(in); + if (reuse instanceof GenericArray) { + ((GenericArray) reuse).reset(); + } else if (reuse instanceof Collection) { + ((Collection) reuse).clear(); + } else reuse = new GenericData.Array((int)i, a.reader); + + Collection array = (Collection)reuse; + for( ; i != 0; i = c.nextChunk(in)) for (long j = 0; j < i; j++) { - result.add(read(null, es, ec, in)); + Object v = read(null, ec, in); + // TODO -- logical type conversion + array.add(v); } - } + if (array instanceof GenericArray<?>) + ((GenericArray<?>) array).prune(); } - case MAP: - case RECORD: + case MAP: { + Advancer.Map c = advancer.getMapAdvancer(in); + Advancer kc = c.keyAdvancer; + Advancer ec = c.elementAdvancer; + long i = c.firstChunk(in); + if (reuse instanceof Map) { + ((Map) reuse).clear(); + } else reuse = new HashMap<Object,Object>((int)i); + Map map = (Map)reuse; + for ( ; i != 0; i = c.nextChunk(in)) + for (int j = 0; j < i; j++) { + Object key = kc.nextString(in); + Object val = read(null, ec, in); + map.put(key, val); + } + return map; + } + + case RECORD: { + Advancer.Record ra = advancer.getRecordAdvancer(in); + Object r = data.newRecord(reuse, ra.reader); + for (int i = 0; i < ra.advancers.length; i++) { + int p = ra.readerOrder[i].pos(); + ((IndexedRecord)reuse).put(p, read(null, ra.advancers[i], in)); + } + ra.done(in); + return r; + } + case UNION: + return read(reuse, advancer.getBranchAdvancer(in, advancer.nextIndex(in)), in); + default: throw new IllegalArgumentException("Can't handle this yet."); }
