This is an automated email from the ASF dual-hosted git repository. thiru pushed a commit to branch fast-decoder-thiru in repository https://gitbox.apache.org/repos/asf/avro.git
commit ab9601dbfeff198d41bc9fb5fe419d22ec8d14e7 Author: rstata <[email protected]> AuthorDate: Tue Apr 30 13:13:55 2019 -0700 Will be handy to have schemas available during decoding. --- .../java/org/apache/avro/generic/Advancer.java | 111 +++++++++++++-------- 1 file changed, 69 insertions(+), 42 deletions(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java index 8f2b010..d80e812 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java +++ b/lang/java/avro/src/main/java/org/apache/avro/generic/Advancer.java @@ -75,6 +75,9 @@ abstract class Advancer { //// an integer is read with no promotion) overrides just //// readInt. + public final Schema writer, reader; + protected Advancer(Schema w, Schema r) { this.writer = w; this.reader = r; } + public Object next(Decoder in) throws IOException { exception(); return null; } public Object nextNull(Decoder in) throws IOException { exception(); return null; } public boolean nextBoolean(Decoder in) throws IOException { exception(); return false; } @@ -140,7 +143,7 @@ abstract class Advancer { case DOUBLE: return DoubleFast.instance; case STRING: return StringFast.instance; case BYTES: return BytesFast.instance; - case FIXED: return new FixedFast(a.writer.getFixedSize()); + case FIXED: return new FixedFast(a.writer, a.reader); default: throw new IllegalArgumentException("Unexpected schema for DoNothing:" + a.reader); } @@ -159,13 +162,14 @@ abstract class Advancer { } case ENUM: Resolver.EnumAdjust e = (Resolver.EnumAdjust)a; - if (e.noAdjustmentsNeeded) return EnumFast.instance; - else return new EnumWithAdjustments(e.adjustments); + if (e.noAdjustmentsNeeded) return new EnumFast(a.writer, a.reader); + else return new EnumWithAdjustments(a.writer, a.reader, e.adjustments); case CONTAINER: Advancer ea = Advancer.from(((Resolver.Container)a).elementAction); - if (a.writer.getType() == Schema.Type.ARRAY) return new ArrayContainer(ea); - else return new MapContainer(ea); + if (a.writer.getType() == Schema.Type.ARRAY) + return new ArrayContainer(a.writer, a.reader, ea); + else return new MapContainer(a.writer, a.reader, ea); case RECORD: return Advancer.Record.from((Resolver.RecordAdjust)a); @@ -175,15 +179,16 @@ abstract class Advancer { Advancer[] branches = new Advancer[wu.actions.length]; for (int i = 0; i < branches.length; i++) branches[i] = Advancer.from(wu.actions[i]); - if (wu.unionEquiv) return new EquivUnion(branches); - return new WriterUnion(branches); + if (wu.unionEquiv) return new EquivUnion(a.writer, a.reader, branches); + return new WriterUnion(a.writer, a.reader, branches); case READER_UNION: Resolver.ReaderUnion ru = (Resolver.ReaderUnion)a; - return new ReaderUnion(ru.firstMatch, Advancer.from(ru.actualAction)); + return new ReaderUnion(a.writer, a.reader, + ru.firstMatch, Advancer.from(ru.actualAction)); case ERROR: - throw new AvroTypeException(a.toString()); + return new Error(w,r, a.toString()); case SKIP: throw new RuntimeException("Internal error. Skip should've been consumed."); default: @@ -213,7 +218,7 @@ abstract class Advancer { * data causes the error to manifest). */ private static class Error extends Advancer { String msg; - public Error(String msg) { this.msg = msg; } + public Error(Schema w, Schema r, String msg) { super(w,r); this.msg = msg; } protected Exception exception() { throw new AvroTypeException(msg); } @@ -236,7 +241,7 @@ abstract class Advancer { * illustrations. */ public abstract static class Container extends Advancer { private final Advancer elementAdvancer; - public Container(Advancer elementAdvancer) { this.elementAdvancer = elementAdvancer; } + public Container(Schema w, Schema r, Advancer ea) { super(wr); elementAdvancer = ea; } public Container getContainerAdvancer(Decoder in) { return this; } public Advancer getElementAdvancer(Decoder in) { return elementAdvancer; } public abstract long firstChunk(Decoder in) throws IOException; @@ -244,7 +249,7 @@ abstract class Advancer { } private static class ArrayContainer extends Container { - public ArrayContainer(Advancer ea) { super(ea); } + public ArrayContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); } public long firstChunk(Decoder in) throws IOException { return in.readArrayStart(); } public long nextChunk(Decoder in) throws IOException @@ -252,7 +257,7 @@ abstract class Advancer { } private static class MapContainer extends Container { - public MapContainer(Advancer ea) { super(ea); } + public MapContainer(Schema w, Schema r, Advancer ea) { super(w,r,ea); } public long firstChunk(Decoder in) throws IOException { return in.readMapStart(); } public long nextChunk(Decoder in) throws IOException @@ -265,7 +270,7 @@ abstract class Advancer { private static class NullFast extends Advancer { public static final NullFast instance = new NullFast(); - private NullFast() { } + private NullFast() { Schema s = Schema.create(Schema.Type.NULL); super(s,s); } public Object nextNull(Decoder in) throws IOException { in.readNull(); return null; @@ -275,7 +280,7 @@ abstract class Advancer { private static class BooleanFast extends Advancer { public static final BooleanFast instance = new BooleanFast(); - private BooleanFast() { } + private BooleanFast() { Schema s = Schema.create(Schema.Type.BOOLEAN); super(s,s); } public boolean nextBoolean(Decoder in) throws IOException { return in.readBoolean(); } @@ -284,7 +289,7 @@ abstract class Advancer { private static class IntFast extends Advancer { public static final IntFast instance = new IntFast(); - private IntFast() { } + private IntFast() { Schema s = Schema.create(Schema.Type.INTEGER); super(s,s); } public int nextInt(Decoder in) throws IOException { return in.readInt(); } @@ -293,7 +298,7 @@ abstract class Advancer { private static class LongFast extends Advancer { public static final LongFast instance = new LongFast(); - private LongFast() { } + private LongFast() { Schema s = Schema.create(Schema.Type.LONG); super(s,s); } public long nextLong(Decoder in) throws IOException { return in.readLong(); } @@ -302,7 +307,7 @@ abstract class Advancer { private static class FloatFast extends Advancer { public static final FloatFast instance = new FloatFast(); - private FloatFast() { } + private FloatFast() { Schema s = Schema.create(Schema.Type.FLOAT); super(s,s); } public float nextFloat(Decoder in) throws IOException { return in.readFloat(); } @@ -311,7 +316,7 @@ abstract class Advancer { private static class DoubleFast extends Advancer { public static final DoubleFast instance = new DoubleFast(); - private DoubleFast() { } + private DoubleFast() { Schema s = Schema.create(Schema.Type.DOUBLE); super(s,s); } public double nextDouble(Decoder in) throws IOException { return in.readDouble(); } @@ -320,7 +325,7 @@ abstract class Advancer { private static class StringFast extends Advancer { public static final StringFast instance = new StringFast(); - private StringFast() { } + private StringFast() { Schema s = Schema.create(Schema.Type.STRING); super(s,s); } public String nextString(Decoder in) throws IOException { return in.readString(); } public Utf8 nextString(Decoder in, Utf8 old) throws IOException { return in.readString(old); @@ -330,7 +335,7 @@ abstract class Advancer { private static class BytesFast extends Advancer { public static final BytesFast instance = new BytesFast(); - private BytesFast() { } + private BytesFast() { Schema s = Schema.create(Schema.Type.BYTES); super(s,s); } public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException { return in.readBytes(old); } @@ -339,7 +344,7 @@ abstract class Advancer { private static class FixedFast extends Advancer { private final int len; - private FixedFast(int len) { this.len = len; } + private FixedFast(Schema w, Schema r) { super(w,r); this.len = w.getFixedSize(); } public byte[] nextFixed(Decoder in, byte[] bytes, int start, int len) throws IOException { in.readFixed(bytes, start, len); return bytes; @@ -352,8 +357,7 @@ abstract class Advancer { } private static class EnumFast extends Advancer { - public static final EnumFast instance = new EnumFast(); - private EnumFast() { } + public EnumFast(Schema w, Schema r) { super(w,r); } public int nextEnum(Decoder in) throws IOException { return in.readEnum(); } public Object next(Decoder in) throws IOException { return nextEnum(in); } } @@ -363,7 +367,9 @@ abstract class Advancer { private static class LongFromInt extends Advancer { public static final LongFromInt instance = new LongFromInt(); - private LongFromInt() { } + private LongFromInt() { + super(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.LONG)); + } public long nextLong(Decoder in) throws IOException { return (long) in.readInt(); } @@ -372,7 +378,9 @@ abstract class Advancer { private static class FloatFromInt extends Advancer { public static final FloatFromInt instance = new FloatFromInt(); - private FloatFromInt() { } + private FloatFromInt() { + super(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.FLOAT)); + } public float nextFloat(Decoder in) throws IOException { return (float) in.readInt(); } @@ -381,7 +389,9 @@ abstract class Advancer { private static class FloatFromLong extends Advancer { public static final FloatFromLong instance = new FloatFromLong(); - private FloatFromLong() { } + private FloatFromLong() { + super(Schema.create(Schema.Type.LONG), Schema.create(Schema.Type.FLOAT)); + } public float nextFloat(Decoder in) throws IOException { return (long) in.readLong(); } @@ -390,7 +400,9 @@ abstract class Advancer { private static class DoubleFromInt extends Advancer { public static final DoubleFromInt instance = new DoubleFromInt(); - private DoubleFromInt() { } + private DoubleFromInt() { + super(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.DOUBLE)); + } public double nextDouble(Decoder in) throws IOException { return (double) in.readInt(); } @@ -399,7 +411,9 @@ abstract class Advancer { private static class DoubleFromLong extends Advancer { public static final DoubleFromLong instance = new DoubleFromLong(); - private DoubleFromLong() { } + private DoubleFromLong() { + super(Schema.create(Schema.Type.LONG), Schema.create(Schema.Type.DOUBLE)); + } public double nextDouble(Decoder in) throws IOException { return (double) in.readLong(); } @@ -408,7 +422,9 @@ abstract class Advancer { private static class DoubleFromFloat extends Advancer { public static final DoubleFromFloat instance = new DoubleFromFloat(); - private DoubleFromFloat() { } + private DoubleFromFloat() { + super(Schema.create(Schema.Type.FLOAT), Schema.create(Schema.Type.DOUBLE)); + } public double nextDouble(Decoder in) throws IOException { return (double) in.readFloat(); } @@ -417,7 +433,9 @@ abstract class Advancer { private static class BytesFromString extends Advancer { public static final BytesFromString instance = new BytesFromString(); - private BytesFromString() { } + private BytesFromString() { + super(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.BYTES)); + } public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException { Utf8 s = in.readString(null); return ByteBuffer.wrap(s.getBytes(), 0, s.getByteLength()); @@ -427,7 +445,9 @@ abstract class Advancer { private static class StringFromBytes extends Advancer { public static final StringFromBytes instance = new StringFromBytes(); - private StringFromBytes() { } + private StringFromBytes() { + super(Schema.create(Schema.Type.BYTES), Schema.create(Schema.Type.STRING)); + } public String nextString(Decoder in) throws IOException { return new String(in.readBytes(null).array(), StandardCharsets.UTF_8); } @@ -443,7 +463,8 @@ abstract class Advancer { private static class EnumWithAdjustments extends Advancer { private final int[] adjustments; - public EnumWithAdjustments(int[] adjustments) { + public EnumWithAdjustments(Schema w, Schema r, int[] adjustments) { + super(w,r); this.adjustments = adjustments; } public int nextEnum(Decoder in) throws IOException { @@ -456,7 +477,7 @@ abstract class Advancer { * consume the tag ourself and call the corresponding advancer. */ private static class WriterUnion extends Advancer { private Advancer[] branches; - public WriterUnion(Advancer[] branches) { this.branches = branches; } + public WriterUnion(Schema w, Schema r, Advancer[] b) { super(w,r) branches = b; } private final Advancer b(Decoder in) throws IOException { return branches[in.readIndex()]; } @@ -494,7 +515,7 @@ abstract class Advancer { * consume it as a regular union. */ private static class EquivUnion extends Advancer { private final Advancer[] branches; - public EquivUnion(Advancer[] branches) { this.branches = branches; } + public EquivUnion(Schema w, Schema r, Advancer[] b) {super(w,r); branches = b; } public int nextIndex(Decoder in) throws IOException { return in.readIndex(); } public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException { @@ -505,8 +526,11 @@ abstract class Advancer { private static class ReaderUnion extends Advancer { private int branch; private Advancer advancer; - public ReaderUnion(int b, Advancer a) { branch = b; advancer = a; } + public ReaderUnion(Schema w, Schema r, int b, Advancer a) + { super(w,r); branch = b; advancer = a; } + public int nextIndex(Decoder in) { return branch; } + public Advancer getBranchAdvancer(Decoder in, int b) { if (b != this.branch) throw new IllegalArgumentException("Branch much be " + branch + ", got " + b); @@ -544,9 +568,10 @@ abstract class Advancer { public final Schema.Field[] readerOrder; public final boolean inOrder; - private Record(Advancer[] advancers, Schema[] finalSkips, + private Record(Schema w, Schema r, Advancer[] advancers, Schema[] finalSkips, Schema.Field[] readerOrder, boolean inOrder) { + super(w,r); this.advancers = advancers; this.finalSkips = finalSkips; this.readerOrder = readerOrder; @@ -580,7 +605,8 @@ abstract class Advancer { Schema[] toSkip = collectSkips(ra.fieldActions, i); i += toSkip.length; Advancer fieldAdv = Advancer.from(ra.fieldActions[i++]); - if (toSkip.length != 0) fieldAdv = new RecordField(toSkip, fieldAdv); + if (toSkip.length != 0) + fieldAdv = new RecordField(fieldAdv.writer, fieldAdv.reader, toSkip, fieldAdv); fieldAdvs[nrf] = fieldAdv; } @@ -590,7 +616,7 @@ abstract class Advancer { // Deal with defaults for (int df = 0; rf < readOrder.length; rf++, df++, nrf++) - fieldAdvs[nrf] = new Default(ra.defaults[df]); + fieldAdvs[nrf] = new Default(ra.readerOrder[df].schema(), ra.defaults[df]); // If reader and writer orders agree, sort fieldAdvs by reader // order (i.e., move defaults into the correct place), to allow @@ -615,14 +641,15 @@ abstract class Advancer { readOrder = newReadOrder; } - return new Record(fieldAdvs, finalSkips, readOrder, inOrder); + return new Record(ra.writer, ra.reader, fieldAdvs, finalSkips, readOrder, inOrder); } } private static class RecordField extends Advancer { private final Schema[] toSkip; private final Advancer field; - public RecordField(Schema[] toSkip, Advancer field) { + public RecordField(Schema w, Schema r, Schema[] toSkip, Advancer field) { + super(w,r); this.toSkip = toSkip; this.field = field; } @@ -675,7 +702,7 @@ abstract class Advancer { private static class Default extends Advancer { protected final Object val; - private Default(Object val) { this.val = val; } + private Default(Schema s, Object v) { super(s,s); val = v; } public Object next(Decoder in) { return val; } public Object nextNull(Decoder in) { return val; }
