Author: thiru
Date: Sat Jan 16 05:59:46 2010
New Revision: 899888
URL: http://svn.apache.org/viewvc?rev=899888&view=rev
Log:
AVRO-328. Performance improvements Validating encoder/decoder for nested records
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/io/parsing/Symbol.java
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=899888&r1=899887&r2=899888&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Sat Jan 16 05:59:46 2010
@@ -230,6 +230,8 @@
AVRO-316. Optiminzing inner loop functions of Avro io (thiru)
+ AVRO-328. Performance improvements Validating encoder/decoder for nested
records (thiru)
+
BUG FIXES
AVRO-176. Safeguard against bad istreams before reading. (sbanacho)
Modified:
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/io/parsing/Symbol.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/io/parsing/Symbol.java?rev=899888&r1=899887&r2=899888&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/io/parsing/Symbol.java
(original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/io/parsing/Symbol.java
Sat Jan 16 05:59:46 2010
@@ -19,8 +19,11 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import org.codehaus.jackson.JsonEncoding;
@@ -134,6 +137,97 @@
return new ResolvingAction(w, r);
}
+ private static class Fixup {
+ public final Symbol[] symbols;
+ public final int pos;
+
+ public Fixup(Symbol[] symbols, int pos) {
+ this.symbols = symbols;
+ this.pos = pos;
+ }
+ }
+
+ public Symbol flatten(Map<Sequence, Sequence> map,
+ Map<Sequence, List<Fixup>> map2) {
+ return this;
+ }
+
+ public int flattenedSize() {
+ return 1;
+ }
+
+ /**
+ * Flattens the given sub-array of symbols into an sub-array of symbols.
Every
+ * <tt>Sequence</tt> in the input are replaced by its production recursively.
+ * Non-<tt>Sequence</tt> symbols, they internally have other symbols
+ * those internal symbols also get flattened.
+ *
+ * The algorithm does a few tricks to handle recursive symbol definitions.
+ * In order to avoid infinite recursion with recursive symbols, we have a map
+ * of Symbol->Symbol. Before fully constructing a flattened symbol for a
+ * <tt>Sequence</tt> we insert an empty output symbol into the map and then
+ * start filling the production for the <tt>Sequence</tt>. If the same
+ * <tt>Sequence</tt> is encountered due to recursion, we simply return the
+ * (empty) output <tt>Sequence<tt> from the map. Then we actually fill out
+ * the production for the <tt>Sequence</tt>.
+ * As part of the flattening process we copy the production of
+ * <tt>Sequence</tt>s into larger arrays. If the original <tt>Sequence</tt>
+ * has not not be fully constructed yet, we copy a bunch of <tt>null</tt>s.
+ * Fix-up remembers all those <tt>null</tt> patches. The fix-ups gets finally
+ * filled when we know the symbols to occupy those patches.
+ *
+ * @param in The array of input symbols to flatten
+ * @param start The position where the input sub-array starts.
+ * @param out The output that receives the flattened list of symbols. The
+ * output array should have sufficient space to receive the expanded
sub-array
+ * of symbols.
+ * @param skip The position where the output input sub-array starts.
+ * @param map A map of symbols which have already been expanded. Useful for
+ * handling recursive definitions and for caching.
+ * @param map2 A map to to store the list of fix-ups.
+ */
+ static void flatten(Symbol[] in, int start,
+ Symbol[] out, int skip,
+ Map<Sequence, Sequence> map,
+ Map<Sequence, List<Fixup>> map2) {
+ for (int i = start, j = skip; i < in.length; i++) {
+ Symbol s = in[i].flatten(map, map2);
+ if (s instanceof Sequence) {
+ Symbol[] p = s.production;
+ List<Fixup> l = map2.get(s);
+ if (l == null) {
+ System.arraycopy(p, 0, out, j, p.length);
+ } else {
+ l.add(new Fixup(out, j));
+ }
+ j += p.length;
+ } else {
+ out[j++] = s;
+ }
+ }
+ }
+
+ /**
+ * Returns the amount of space required to flatten the given
+ * sub-array of symbols.
+ * @param symbols The array of input symbols.
+ * @param start The index where the subarray starts.
+ * @return The number of symbols that will be produced if one expands
+ * the given input.
+ */
+ protected static int flattenedSize(Symbol[] symbols, int start) {
+ int result = 0;
+ for (int i = start; i < symbols.length; i++) {
+ if (symbols[i] instanceof Sequence) {
+ Sequence s = (Sequence) symbols[i];
+ result += s.flattenedSize();
+ } else {
+ result += 1;
+ }
+ }
+ return result;
+ }
+
private static class Terminal extends Symbol {
private final String printName;
public Terminal(String printName) {
@@ -168,11 +262,14 @@
}
private static Symbol[] makeProduction(Symbol[] symbols) {
- Symbol[] result = new Symbol[symbols.length + 1];
- System.arraycopy(symbols, 0, result, 1, symbols.length);
+ Symbol[] result = new Symbol[flattenedSize(symbols, 0) + 1];
+ flatten(symbols, 0, result, 1,
+ new HashMap<Sequence, Sequence>(),
+ new HashMap<Sequence, List<Fixup>>());
return result;
}
}
+
protected static class Sequence extends Symbol implements Iterable<Symbol> {
private Sequence(Symbol[] productions) {
super(Kind.SEQUENCE, productions);
@@ -207,6 +304,31 @@
}
};
}
+ @Override
+ public Sequence flatten(Map<Sequence, Sequence> map,
+ Map<Sequence, List<Fixup>> map2) {
+ Sequence result = map.get(this);
+ if (result == null) {
+ result = new Sequence(new Symbol[flattenedSize()]);
+ map.put(this, result);
+ List<Fixup> l = new ArrayList<Fixup>();
+ map2.put(result, l);
+
+ flatten(production, 0,
+ result.production, 0, map, map2);
+ for (Fixup f : l) {
+ System.arraycopy(result.production, 0, f.symbols, f.pos,
+ result.production.length);
+ }
+ map2.remove(result);
+ }
+ return result;
+ }
+
+ @Override
+ public final int flattenedSize() {
+ return flattenedSize(production, 0);
+ }
}
public static class Repeater extends Symbol {
@@ -223,6 +345,16 @@
System.arraycopy(p, 0, result, 1, p.length);
return result;
}
+
+ @Override
+ public Repeater flatten(Map<Sequence, Sequence> map,
+ Map<Sequence, List<Fixup>> map2) {
+ Repeater result =
+ new Repeater(end, new Symbol[flattenedSize(production, 1)]);
+ flatten(production, 1, result.production, 1, map, map2);
+ return result;
+ }
+
}
public static class Alternative extends Symbol {
@@ -256,6 +388,16 @@
}
return -1;
}
+
+ @Override
+ public Alternative flatten(Map<Sequence, Sequence> map,
+ Map<Sequence, List<Fixup>> map2) {
+ Symbol[] ss = new Symbol[symbols.length];
+ for (int i = 0; i < ss.length; i++) {
+ ss[i] = symbols[i].flatten(map, map2);
+ }
+ return new Alternative(ss, labels);
+ }
}
public static class ErrorAction extends ImplicitAction {
@@ -291,6 +433,14 @@
this.writer = writer;
this.reader = reader;
}
+
+ @Override
+ public ResolvingAction flatten(Map<Sequence, Sequence> map,
+ Map<Sequence, List<Fixup>> map2) {
+ return new ResolvingAction(writer.flatten(map, map2),
+ reader.flatten(map, map2));
+ }
+
}
public static class SkipAction extends ImplicitAction {
@@ -298,6 +448,13 @@
public SkipAction(Symbol symToSkip) {
this.symToSkip = symToSkip;
}
+
+ @Override
+ public SkipAction flatten(Map<Sequence, Sequence> map,
+ Map<Sequence, List<Fixup>> map2) {
+ return new SkipAction(symToSkip.flatten(map, map2));
+ }
+
}
public static class FieldAdjustAction extends ImplicitAction {
@@ -331,6 +488,13 @@
this.rindex = rindex;
this.symToParse = symToParse;
}
+
+ @Override
+ public UnionAdjustAction flatten(Map<Sequence, Sequence> map,
+ Map<Sequence, List<Fixup>> map2) {
+ return new UnionAdjustAction(rindex, symToParse.flatten(map, map2));
+ }
+
}
/** For JSON. */
Modified: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java?rev=899888&r1=899887&r2=899888&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java
(original)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/io/Perf.java Sat
Jan 16 05:59:46 2010
@@ -49,6 +49,8 @@
tests.add(new ReadLong());
} else if (a.equals("-R")) {
tests.add(new RepeaterTest());
+ } else if (a.equals("-N")) {
+ tests.add(new NestedRecordTest());
} else {
usage();
System.exit(1);
@@ -58,6 +60,7 @@
tests.addAll(Arrays.asList(new Test[] {
new ReadInt(), new ReadLong(),
new ReadFloat(), new ReadDouble(),
+ new RepeaterTest(), new NestedRecordTest(),
}));
}
@@ -113,8 +116,13 @@
private static class ReadInt extends Test {
public ReadInt() throws IOException {
- super("ReadInt", "{ \"type\": \"array\", \"items\": \"int\"} ");
+ this("ReadInt", "{ \"type\": \"array\", \"items\": \"int\"} ");
}
+
+ public ReadInt(String name, String schema) throws IOException {
+ super(name, schema);
+ }
+
@Override void genData(Encoder e) throws IOException {
e.writeArrayStart();
e.setItemCount((COUNT/4) * 4); //next lowest multiple of 4
@@ -229,9 +237,24 @@
protected Decoder getDecoder() throws IOException {
return new ValidatingDecoder(schema, super.getDecoder());
}
+
+ }
+
+ private static class NestedRecordTest extends ReadInt {
+ public NestedRecordTest() throws IOException {
+ super("RepeaterTest",
+ "{ \"type\": \"array\", \"items\": \n"
+ + "{ \"type\": \"record\", \"name\": \"r1\", \n"
+ + "\"fields\": \n"
+ + "[ { \"name\": \"f1\", \"type\": \"int\" } ] } } ");
+ }
+ @Override
+ public Decoder getDecoder() throws IOException {
+ return new ValidatingDecoder(schema, super.getDecoder());
+ }
}
-
+
private static void usage() {
System.out.println("Usage: Perf { -i | -l | -f | -d }");
System.out.println(" -i readInt() performance");