Author: cutting
Date: Thu Sep 3 21:19:46 2009
New Revision: 811128
URL: http://svn.apache.org/viewvc?rev=811128&view=rev
Log:
AVRO-109. Add Java support for controlling sort order.
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/java/org/apache/avro/Schema.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java
hadoop/avro/trunk/src/test/schemata/simple.avpr
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Sep 3 21:19:46 2009
@@ -42,6 +42,11 @@
Comparable. The implementation is consistent with the binary
comparator added in AVRO-108. (cutting)
+ AVRO-109. Add Java support for controlling sort order via schema
+ annotations. Record fields now support an "order" attribute whose
+ possible values are "increasing" (the default), "decreasing", and
+ "ignore". (cutting)
+
IMPROVEMENTS
AVRO-71. C++: make deserializer more generic. (Scott Banachowski
Modified: hadoop/avro/trunk/src/java/org/apache/avro/Schema.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/Schema.java?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/Schema.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/Schema.java Thu Sep 3 21:19:46
2009
@@ -223,18 +223,33 @@
/** A field within a record. */
public static class Field {
+
+ /** How values of this field should be ordered when sorting records. */
+ public enum Order {
+ ASCENDING, DESCENDING, IGNORE;
+ private String name;
+ private Order() { this.name = this.name().toLowerCase(); }
+ };
+
private int position = -1;
private final Schema schema;
private final JsonNode defaultValue;
+ private final Order order;
+
public Field(Schema schema, JsonNode defaultValue) {
+ this(schema, defaultValue, Order.ASCENDING);
+ }
+ public Field(Schema schema, JsonNode defaultValue, Order order) {
this.schema = schema;
this.defaultValue = defaultValue;
+ this.order = order;
}
/** The position of this field within the record. */
public int pos() { return position; }
/** This field's {...@link Schema}. */
public Schema schema() { return schema; }
public JsonNode defaultValue() { return defaultValue; }
+ public Order order() { return order; }
public boolean equals(Object other) {
if (other == this) return true;
if (!(other instanceof Field)) return false;
@@ -245,6 +260,7 @@
? that.defaultValue == null
: (defaultValue.equals(that.defaultValue)));
}
+ public int hashCode() { return schema.hashCode(); }
}
private static abstract class NamedSchema extends Schema {
@@ -377,6 +393,8 @@
gen.writeFieldName("default");
gen.writeTree(entry.getValue().defaultValue());
}
+ if (entry.getValue().order() != Field.Order.ASCENDING)
+ gen.writeStringField("order", entry.getValue().order().name);
gen.writeEndObject();
}
gen.writeEndArray();
@@ -657,8 +675,12 @@
if (fieldTypeNode == null)
throw new SchemaParseException("No field type: "+field);
Schema fieldSchema = parse(fieldTypeNode, names);
+ Field.Order order = Field.Order.ASCENDING;
+ JsonNode orderNode = field.get("order");
+ if (orderNode != null)
+ order =
Field.Order.valueOf(orderNode.getTextValue().toUpperCase());
fields.put(fieldNameNode.getTextValue(),
- new Field(fieldSchema, field.get("default")));
+ new Field(fieldSchema, field.get("default"), order));
}
result.setFields(fields);
return result;
Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java Thu Sep
3 21:19:46 2009
@@ -389,10 +389,14 @@
case RECORD:
GenericRecord r1 = (GenericRecord)o1;
GenericRecord r2 = (GenericRecord)o2;
- for (Map.Entry<String, Schema> e : s.getFieldSchemas()) {
- String field = e.getKey();
- int compare = compare(r1.get(field), r2.get(field), e.getValue());
- if (compare != 0) return compare;
+ for (Map.Entry<String, Field> e : s.getFields().entrySet()) {
+ Field f = e.getValue();
+ if (f.order() == Field.Order.IGNORE)
+ continue; // ignore this field
+ String name = e.getKey();
+ int compare = compare(r1.get(name), r2.get(name), f.schema());
+ if (compare != 0) // not equal
+ return f.order() == Field.Order.DESCENDING ? -compare : compare;
}
return 0;
case ENUM:
Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java Thu Sep 3
21:19:46 2009
@@ -18,17 +18,46 @@
package org.apache.avro.io;
import java.util.Map;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.generic.GenericDatumReader;
/** Utilities for binary-encoded data. */
public class BinaryData {
private BinaryData() {} // no public ctor
- private static final int GT = -1;
- private static final int LT = -2;
+ private static class Buffer extends ByteArrayInputStream {
+ public Buffer() { super(new byte[0]); }
+ public byte[] buf() { return buf; }
+ public int pos() { return pos; }
+ public void skip(int i) { this.pos += i; }
+ public void set(byte[] buf, int pos) {
+ this.buf = buf;
+ this.pos = pos;
+ this.count = buf.length;
+ }
+ }
+
+ private static class Decoders {
+ private final Buffer b1, b2;
+ private final Decoder d1, d2;
+ public Decoders() {
+ this.b1 = new Buffer();
+ this.b2 = new Buffer();
+ this.d1 = new BinaryDecoder(b1);
+ this.d2 = new BinaryDecoder(b2);
+ }
+ }
+
+ private static final ThreadLocal<Decoders> DECODERS
+ = new ThreadLocal<Decoders>() {
+ @Override protected Decoders initialValue() { return new Decoders(); }
+ };
/** Compare binary encoded data. If equal, return zero. If greater-than,
* return 1, if less than return -1. Order is consistent with that of
{...@link
@@ -36,103 +65,107 @@
public static int compare(byte[] b1, int s1,
byte[] b2, int s2,
Schema schema) {
- int comp = comp(b1, s1, b2, s2, schema); // compare
- return (comp >= 0) ? 0 : ((comp == GT) ? 1 : -1); // decode comparison
+ Decoders decoders = DECODERS.get();
+ decoders.b1.set(b1, s1);
+ decoders.b2.set(b2, s2);
+ try {
+ return compare(decoders, schema);
+ } catch (IOException e) {
+ throw new AvroRuntimeException(e);
+ }
}
-
+
/** If equal, return the number of bytes consumed. If greater than, return
* GT, if less than, return LT. */
- private static int comp(byte[] b1, int s1, byte[] b2, int s2, Schema schema)
{
+ private static int compare(Decoders d, Schema schema) throws IOException {
+ Decoder d1 = d.d1; Decoder d2 = d.d2;
switch (schema.getType()) {
case RECORD: {
- int size = 0;
- for (Map.Entry<String, Schema> entry : schema.getFieldSchemas()) {
- int comp = comp(b1, s1+size, b2, s2+size, entry.getValue());
- if (comp < 0) return comp;
- size += comp;
+ for (Map.Entry<String, Field> entry : schema.getFields().entrySet()) {
+ Field field = entry.getValue();
+ if (field.order() == Field.Order.IGNORE) {
+ GenericDatumReader.skip(field.schema(), d1);
+ GenericDatumReader.skip(field.schema(), d2);
+ continue;
+ }
+ int c = compare(d, field.schema());
+ if (c != 0)
+ return (field.order() != Field.Order.DESCENDING) ? c : -c;
}
- return size;
+ return 0;
}
case ENUM: case INT: case LONG: {
- long l1 = readLong(b1, s1);
- long l2 = readLong(b2, s2);
- return (l1 == l2) ? longSize(l1) : ((l1 > l2) ? GT : LT);
+ long l1 = d1.readLong();
+ long l2 = d2.readLong();
+ return l1 == l2 ? 0 : (l1 > l2 ? 1 : -1);
}
case ARRAY: {
long i = 0; // position in array
long r1 = 0, r2 = 0; // remaining in current block
long l1 = 0, l2 = 0; // total array length
- int size1 = 0, size2 = 0; // total array size
while (true) {
if (r1 == 0) { // refill blocks(s)
- r1 = readLong(b1, s1+size1);
- size1 += longSize(r1);
+ r1 = d1.readLong();
+ if (r1 < 0) { r1 = -r1; d1.readLong(); }
l1 += r1;
}
if (r2 == 0) {
- r2 = readLong(b2, s2+size2);
- size2 += longSize(r2);
+ r2 = d2.readLong();
+ if (r2 < 0) { r2 = -r2; d2.readLong(); }
l2 += r2;
}
if (r1 == 0 || r2 == 0) // empty block: done
- return (l1 == l2) ? size1 : ((l1 > l2) ? GT : LT);
+ return (l1 == l2) ? 0 : ((l1 > l2) ? 1 : -1);
long l = Math.min(l1, l2);
while (i < l) { // compare to end of block
- int comp = comp(b1, s1+size1, b2, s2+size2, schema.getElementType());
- if (comp < 0) return comp;
- size1 += comp;
- size2 += comp;
+ int c = compare(d, schema.getElementType());
+ if (c != 0) return c;
i++; r1--; r2--;
}
}
- }
+ }
case MAP:
throw new AvroRuntimeException("Can't compare maps!");
case UNION: {
- int i1 = readInt(b1, s1);
- int i2 = readInt(b2, s2);
+ int i1 = d1.readInt();
+ int i2 = d2.readInt();
if (i1 == i2) {
- int size = intSize(i1);
- return comp(b1, s1+size, b2, s2+size, schema.getTypes().get(i1));
+ return compare(d, schema.getTypes().get(i1));
} else {
- return (i1 > i2) ? GT : LT;
+ return i1 - i2;
}
}
case FIXED: {
int size = schema.getFixedSize();
- int c = compareBytes(b1, s1, size, b2, s2, size);
- return (c == 0) ? size : ((c > 0) ? GT : LT);
+ int c = compareBytes(d.b1.buf(), d.b1.pos(), size,
+ d.b2.buf(), d.b2.pos(), size);
+ d.b1.skip(size);
+ d.b2.skip(size);
+ return c;
}
case STRING: case BYTES: {
- int l1 = readInt(b1, s1);
- int l2 = readInt(b2, s2);
- int size1 = intSize(l1);
- int size2 = intSize(l2);
- int c = compareBytes(b1, s1+size1, l1, b2, s2+size2, l2);
- return (c == 0) ? size1+l1 : ((c > 0) ? GT : LT);
+ int l1 = d1.readInt();
+ int l2 = d2.readInt();
+ int c = compareBytes(d.b1.buf(), d.b1.pos(), l1,
+ d.b2.buf(), d.b2.pos(), l2);
+ d.b1.skip(l1);
+ d.b2.skip(l2);
+ return c;
}
case FLOAT: {
- int n1 = 0, n2 = 0;
- for (int i = 0, shift = 0; i < 4; i++, shift += 8) {
- n1 |= (b1[s1+i] & 0xff) << shift;
- n2 |= (b2[s2+i] & 0xff) << shift;
- }
- float f1 = Float.intBitsToFloat(n1);
- float f2 = Float.intBitsToFloat(n2);
- return (f1 == f2) ? 4 : ((f1 > f2) ? GT : LT);
+ float f1 = d1.readFloat();
+ float f2 = d2.readFloat();
+ return (f1 == f2) ? 0 : ((f1 > f2) ? 1 : -1);
}
case DOUBLE: {
- long n1 = 0, n2 = 0;
- for (int i = 0, shift = 0; i < 8; i++, shift += 8) {
- n1 |= (b1[s1+i] & 0xffL) << shift;
- n2 |= (b2[s2+i] & 0xffL) << shift;
- }
- double d1 = Double.longBitsToDouble(n1);
- double d2 = Double.longBitsToDouble(n2);
- return (d1 == d2) ? 8 : ((d1 > d2) ? GT : LT);
+ double f1 = d1.readDouble();
+ double f2 = d2.readDouble();
+ return (f1 == f2) ? 0 : ((f1 > f2) ? 1 : -1);
}
case BOOLEAN:
- return b1[s1] == b2[s2] ? 1 : ((b1[s1] > b2[s2]) ? GT : LT);
+ boolean b1 = d1.readBoolean();
+ boolean b2 = d2.readBoolean();
+ return (b1 == b2) ? 0 : (b1 ? 1 : -1);
case NULL:
return 0;
default:
@@ -156,36 +189,4 @@
return l1 - l2;
}
- private static int readInt(byte[] b, int s) {
- long l = readLong(b, s);
- if (l < Integer.MIN_VALUE || Integer.MAX_VALUE < l)
- throw new AvroRuntimeException("Integer overflow.");
- return (int)l;
- }
-
-
- private static long readLong(byte[] buffer, int s) {
- long n = 0;
- for (int shift = 0; ; shift += 7) {
- long b = buffer[s++];
- n |= (b & 0x7F) << shift;
- if ((b & 0x80) == 0) {
- break;
- }
- }
- return (n >>> 1) ^ -(n & 1); // back to two's-complement
- }
-
- private static int intSize(int i) { return longSize(i); }
-
- private static int longSize(long n) {
- int size = 1;
- n = (n << 1) ^ (n >> 63); // move sign to low-order bit
- while ((n & ~0x7F) != 0) {
- size++;
- n >>>= 7;
- }
- return size;
- }
-
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java Thu
Sep 3 21:19:46 2009
@@ -17,9 +17,10 @@
*/
package org.apache.avro.specific;
-import java.util.Map;
+import java.util.Iterator;
import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
import org.apache.avro.reflect.ReflectData;
/** Utilities for generated Java classes and interfaces. */
@@ -48,11 +49,14 @@
case RECORD:
SpecificRecord r1 = (SpecificRecord)o1;
SpecificRecord r2 = (SpecificRecord)o2;
- int i = 0;
- for (Map.Entry<String, Schema> e : s.getFieldSchemas()) {
- int compare = compare(r1.get(i), r2.get(i), e.getValue());
- if (compare != 0) return compare;
- i++;
+ Iterator<Field> fields = s.getFields().values().iterator();
+ for (int i = 0; fields.hasNext(); i++) {
+ Field f = fields.next();
+ if (f.order() == Field.Order.IGNORE)
+ continue; // ignore this field
+ int compare = compare(r1.get(i), r2.get(i), f.schema());
+ if (compare != 0) // not equal
+ return f.order() == Field.Order.DESCENDING ? -compare : compare;
}
return 0;
case ENUM:
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java Thu Sep 3
21:19:46 2009
@@ -111,15 +111,23 @@
@Test
public void testRecord() throws Exception {
- String recordJson = "{\"type\":\"record\", \"name\":\"Test\", \"fields\":"
-
+"[{\"name\":\"f\",\"type\":\"int\"},{\"name\":\"g\",\"type\":\"int\"}]}";
+ String recordJson = "{\"type\":\"record\", \"name\":\"Test\", \"fields\":["
+ +"{\"name\":\"f\",\"type\":\"int\",\"order\":\"ignore\"},"
+ +"{\"name\":\"g\",\"type\":\"int\",\"order\":\"descending\"},"
+ +"{\"name\":\"h\",\"type\":\"int\"}]}";
Schema schema = Schema.parse(recordJson);
GenericData.Record r1 = new GenericData.Record(schema);
- r1.put("f", 11);
- r1.put("g", 12);
+ r1.put("f", 1);
+ r1.put("g", 13);
+ r1.put("h", 41);
GenericData.Record r2 = new GenericData.Record(schema);
- r2.put("f", 11);
+ r2.put("f", 0);
+ r2.put("g", 12);
+ r2.put("h", 41);
+ check(recordJson, r1, r2);
+ r2.put("f", 0);
r2.put("g", 13);
+ r2.put("h", 42);
check(recordJson, r1, r2);
}
@@ -148,16 +156,20 @@
Simple.TestRecord s1 = new Simple.TestRecord();
Simple.TestRecord s2 = new Simple.TestRecord();
s1.name = new Utf8("foo");
- s2.name = new Utf8("foo");
- s1.kind = Simple.Kind.BAR;
- s2.kind = Simple.Kind.BAR;
+ s1.kind = Simple.Kind.BAZ;
s1.hash = new Simple.MD5();
s1.hash.bytes(new byte[] {0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5});
+ s2.name = new Utf8("bar");
+ s2.kind = Simple.Kind.BAR;
s2.hash = new Simple.MD5();
s2.hash.bytes(new byte[] {0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,6});
check(Simple.TestRecord._SCHEMA, s1, s2, true,
new SpecificDatumWriter(Simple.TestRecord._SCHEMA),
SpecificData.get());
+ s2.kind = Simple.Kind.BAZ;
+ check(Simple.TestRecord._SCHEMA, s1, s2, true,
+ new SpecificDatumWriter(Simple.TestRecord._SCHEMA),
+ SpecificData.get());
}
private static void check(String schemaJson, Object o1, Object o2)
Modified: hadoop/avro/trunk/src/test/schemata/simple.avpr
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/schemata/simple.avpr?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/schemata/simple.avpr (original)
+++ hadoop/avro/trunk/src/test/schemata/simple.avpr Thu Sep 3 21:19:46 2009
@@ -8,8 +8,8 @@
{"name": "TestRecord", "type": "record",
"fields": [
- {"name": "name", "type": "string"},
- {"name": "kind", "type": "Kind"},
+ {"name": "name", "type": "string", "order": "ignore"},
+ {"name": "kind", "type": "Kind", "order": "descending"},
{"name": "hash", "type": "MD5"}
]
},