This is an automated email from the ASF dual-hosted git repository. thiru pushed a commit to branch fast-decoder-thiru in repository https://gitbox.apache.org/repos/asf/avro.git
commit 345f6b32f9cc9de75cf492785a62e8a91c78085e Author: rstata <[email protected]> AuthorDate: Tue Apr 30 00:20:14 2019 -0700 Dealt with record field-skipping problem. Also fixed a few errors missed earlier. --- .../java/org/apache/avro/specific/Advancer.java | 96 +++++++++++++--------- 1 file changed, 56 insertions(+), 40 deletions(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/specific/Advancer.java b/lang/java/avro/src/main/java/org/apache/avro/specific/Advancer.java index 75d2615..ee326e2 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/specific/Advancer.java +++ b/lang/java/avro/src/main/java/org/apache/avro/specific/Advancer.java @@ -76,20 +76,24 @@ abstract class Advancer { //// an integer is read with no promotion) overrides just //// readInt. - public Object next(Decoder in) throws IOException { exception(); } - public Object nextNull(Decoder in) throws IOException { exception(); } - public boolean nextBoolean(Decoder in) throws IOException { exception(); } - public int nextInt(Decoder in) throws IOException { exception(); } - public long nextLong(Decoder in) throws IOException { exception(); } - public float nextFloat(Decoder in) throws IOException { exception(); } - public double nextDouble(Decoder in) throws IOException { exception(); } - public int nextEnum(Decoder in) throws IOException { exception(); } - public Utf8 nextString(Decoder in, Utf8 old) throws IOException { exception(); } - public String nextString(Decoder in) throws IOException { exception(); } - public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException { exception(); } + public Object next(Decoder in) throws IOException { exception(); return null; } + public Object nextNull(Decoder in) throws IOException { exception(); return null; } + public boolean nextBoolean(Decoder in) throws IOException { exception(); return false; } + public int nextInt(Decoder in) throws IOException { exception(); return 0; } + public long nextLong(Decoder in) throws IOException { exception(); return 0; } + public float nextFloat(Decoder in) throws IOException { exception(); return 0; } + public double nextDouble(Decoder in) throws IOException { exception(); return 0; } + public int nextEnum(Decoder in) throws IOException { exception(); return 0; } + public Utf8 nextString(Decoder in, Utf8 old) throws IOException { exception(); return null; } + public String nextString(Decoder in) throws IOException { exception(); return null; } + public ByteBuffer nextBytes(Decoder in, ByteBuffer old) throws IOException { + exception(); + return null; + } public byte[] nextFixed(Decoder in, byte[] bytes, int start, int length) throws IOException { exception(); + return null; } public byte[] nextFixed(Decoder in, byte[] bytes) throws IOException { @@ -99,20 +103,23 @@ abstract class Advancer { /** Access to contained advancer (for Array and Map types). */ public Advancer getElementAdvancer(Decoder in) throws IOException { exception(); + return null; } /** Get index for a union. */ - public int nextIndex(Decoder in) throws IOException { exception(); } + public int nextIndex(Decoder in) throws IOException { exception(); return 0; } /** Access to contained advancer for unions. You must call {@link * nextIndex} before calling this method. */ public Advancer getBranchAdvancer(Decoder in, int branch) throws IOException { exception(); + return null; } /** Access to contained advancer (for Array, Map, and Union types). */ public Record getRecordAdvancer(Decoder in) throws IOException { exception(); + return null; } @@ -216,6 +223,7 @@ abstract class Advancer { * of the union. */ private static class Container extends Advancer { private final Advancer elementAdvancer; + public Container(Advancer elementAdvancer) { this.elementAdvancer = elementAdvancer; } public Advancer getElementAdvancer(Decoder in) { return elementAdvancer; } } @@ -486,32 +494,44 @@ abstract class Advancer { //// Records are particularly intricate because we may have to skip //// fields, read fields out of order, and use default values. - /** Advancer for records. The {@link advancer} array contains an + /** Advancer for records. The {@link advancers} array contains an * advancer for each field, ordered according writer (which * determines the order in which data must be read). The {@link * readerOrder} array tells you how those advancers line up with the * reader's fields. Thus, the following is how to read a record: * <pre> - * for (int i = 0; i < a.advancers.length; i++) + * for (int i = 0; i < a.advancers.length; i++) { * dataum.set(a.readerOrder[i], a.advancers[i].next()); + * } + * a.done(); * </pre> + * Note that a decoder <em>must</em> call {@link done} after interpreting + * all the elemnts in {@link advancers}. + * * As a convenience, {@link inOrder} is set to true iff the reader * and writer order agrees (i.e., iff <code>readerOrder[i] == * i</code> for all i). Generated code can use this to optimize this * common case. */ public static class Record extends Advancer { public final Advancer[] advancers; + private Schema[] finalSkips; public final int[] readerOrder; public final boolean inOrder; - private Record(Advancer[] advancers, int[] readerOrder, boolean inOrder) { + private Record(Advancer[] advancers, Schema[] finalSkips, int[] order, boolean inOrder) { this.advancers = advancers; - this.readerOrder = readerOrder; + this.finalSkips = finalSkips; + this.readerOrder = order; this.inOrder = inOrder; } public Record getRecordAdvancer(Decoder in) { return this; } + /** Must be called after consuming all elements of {@link advancers}. */ + public void done(Decoder in) throws IOException { + ignore(finalSkips, in); + } + protected static Advancer from(Resolver.RecordAdjust ra) { /** Two cases: reader + writer agree on order, vs disagree. */ /** This is the complicated case, since skipping is involved. */ @@ -525,48 +545,44 @@ abstract class Advancer { Advancer[] fieldAdvs = new Advancer[readOrder.length]; int i = 0; // Index into ra.fieldActions - int rf = 0; // Index into readOrder - int nrf = 0; // Index into fieldAdvs - - // Deal with any leading fields to be skipped - Schema[] firstSkips = collectSkips(ra.fieldActions, i); - if (firstSkips.length != 0) i += firstSkips.length; - else firstSkips = null; + int rf = 0; // Index into ra.readerOrder + int nrf = 0; // (Insertion) index into fieldAdvs // Deal with fields to be read - for ( ; i < ra.fieldActions.length; nrf++, rf++) { - Advancer fieldAdv = Advancer.from(ra.fieldActions[i]); - i++; + for ( ; rf < ra.firstDefault; rf++, nrf++) { Schema[] toSkip = collectSkips(ra.fieldActions, i); - if (toSkip.length != 0) { - fieldAdv = new RecordField(fieldAdv, toSkip); - i += toSkip.length; - } - if (firstSkips != null) { - fieldAdv = new RecordFieldWithBefore(firstSkips, fieldAdv); - firstSkips = null; - } + i += toSkip.length; + Advancer fieldAdv = Advancer.from(ra.fieldActions[i++]); + if (toSkip.length != 0) fieldAdv = new RecordField(fieldAdv, toSkip); fieldAdvs[nrf] = fieldAdv; } + // Deal with any trailing fields to be skipped: + Schema[] finalSkips = collectSkips(ra.fieldActions, i); + // Assert i == ra.fieldActions.length + + // Deal with defaults + for (int df = 0; rf < readOrder.length; rf++, df++, nrf++) + fieldAdvs[nrf] = new Default(ra.defaults[df]); + // If reader and writer orders agree, sort fieldAdvs by reader // order (i.e., move defaults into the correct place), to allow // decoders to have an optimized path for the common case of a // record's field order not changing. boolean inOrder = true; for (int k = 0; k < ra.firstDefault-1; k++) - if (readOrder[k] > readOrder[k+1]) inOrder = false; + inOrder &= (readOrder[k] < readOrder[k+1]); if (inOrder) { Advancer[] newAdvancers = new Advancer[fieldAdvs.length]; for (int k = 0, rf2 = 0, df = ra.firstDefault; k < readOrder.length; k++) { - if (rf2 < df) newAdvancers[k] = fieldAdvs[rf2++]; - else newAdvancers[k] = fieldAdvs[df++]; - readOrder[k] = k; + if (readOrder[rf2] < readOrder[df]) newAdvancers[k] = fieldAdvs[rf2++]; + else newAdvancers[k] = fieldAdvs[df++]; } - newAdvancers = fieldAdvs; + for (int k = 0; k < readOrder.length; k++) readOrder[k] = k; + fieldAdvs = newAdvancers; } - return new Record(fieldAdvs, readOrder, inOrder); + return new Record(fieldAdvs, finalSkips, readOrder, inOrder); } }
