Repository: hive Updated Branches: refs/heads/branch-3 a3e535f94 -> 34edad28c
HIVE-18410 : [Performance][Avro] Reading flat Avro tables is very expensive in Hive (Ratandeep Ratti via Anthony Hsu, Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8bfea2d0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8bfea2d0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8bfea2d0 Branch: refs/heads/branch-3 Commit: 8bfea2d09533bcf46290af140b2c44a420758987 Parents: a3e535f Author: Ratandeep Ratti <rdsr...@gmail.com> Authored: Mon Jan 8 16:47:00 2018 -0800 Committer: Vineet Garg <vgarg@HW14117.local> Committed: Thu Apr 19 22:32:39 2018 -0700 ---------------------------------------------------------------------- .../hive/serde2/avro/AvroDeserializer.java | 77 +++----------------- .../hadoop/hive/serde2/avro/AvroSerDe.java | 25 ++----- .../hadoop/hive/serde2/avro/AvroSerdeUtils.java | 23 ++++-- .../hive/serde2/avro/TestAvroDeserializer.java | 67 +++++++++++++++++ .../avro/TestAvroObjectInspectorGenerator.java | 11 +++ 5 files changed, 114 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8bfea2d0/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index b7b3d12..34da50d 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -198,11 +198,18 @@ class AvroDeserializer { private Object worker(Object datum, Schema fileSchema, Schema recordSchema, TypeInfo columnType) throws AvroSerdeException { - // Klaxon! Klaxon! Klaxon! - // Avro requires NULLable types to be defined as unions of some type T - // and NULL. This is annoying and we're going to hide it from the user. + if (datum == null) { + return null; + } + + // Avro requires nullable types to be defined as unions of some type T + // and NULL. This is annoying and we're going to hide it from the user. + if (AvroSerdeUtils.isNullableType(recordSchema)) { - return deserializeNullableUnion(datum, fileSchema, recordSchema, columnType); + recordSchema = AvroSerdeUtils.getOtherTypeFromNullableType(recordSchema); + } + if (fileSchema != null && AvroSerdeUtils.isNullableType(fileSchema)) { + fileSchema = AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema); } switch(columnType.getCategory()) { @@ -300,68 +307,6 @@ class AvroDeserializer { } } - /** - * Extract either a null or the correct type from a Nullable type. - */ - private Object deserializeNullableUnion(Object datum, Schema fileSchema, Schema recordSchema, TypeInfo columnType) - throws AvroSerdeException { - if (recordSchema.getTypes().size() == 2) { - // A type like [NULL, T] - return deserializeSingleItemNullableUnion(datum, fileSchema, recordSchema, columnType); - } else { - // Types like [NULL, T1, T2, ...] - if (datum == null) { - return null; - } else { - Schema newRecordSchema = AvroSerdeUtils.getOtherTypeFromNullableType(recordSchema); - return worker(datum, fileSchema, newRecordSchema, columnType); - } - } - } - - private Object deserializeSingleItemNullableUnion(Object datum, - Schema fileSchema, - Schema recordSchema, - TypeInfo columnType) - throws AvroSerdeException { - int tag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value - Schema schema = recordSchema.getTypes().get(tag); - if (schema.getType().equals(Type.NULL)) { - return null; - } - - Schema currentFileSchema = null; - if (fileSchema != null) { - if (fileSchema.getType() == Type.UNION) { - // The fileSchema may have the null value in a different position, so - // we need to get the correct tag - try { - tag = GenericData.get().resolveUnion(fileSchema, datum); - currentFileSchema = fileSchema.getTypes().get(tag); - } catch (UnresolvedUnionException e) { - if (LOG.isDebugEnabled()) { - String datumClazz = null; - if (datum != null) { - datumClazz = datum.getClass().getName(); - } - String msg = "File schema union could not resolve union. fileSchema = " + fileSchema + - ", recordSchema = " + recordSchema + ", datum class = " + datumClazz + ": " + e; - LOG.debug(msg, e); - } - // This occurs when the datum type is different between - // the file and record schema. For example if datum is long - // and the field in the file schema is int. See HIVE-9462. - // in this case we will re-use the record schema as the file - // schema, Ultimately we need to clean this code up and will - // do as a follow-on to HIVE-9462. - currentFileSchema = schema; - } - } else { - currentFileSchema = fileSchema; - } - } - return worker(datum, currentFileSchema, schema, columnType); - } private Object deserializeStruct(GenericData.Record datum, Schema fileSchema, StructTypeInfo columnType) throws AvroSerdeException { http://git-wip-us.apache.org/repos/asf/hive/blob/8bfea2d0/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 1746a0f..3955611 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -136,6 +136,11 @@ public class AvroSerDe extends AbstractSerDe { this.columnNames = StringInternUtils.internStringsInList(aoig.getColumnNames()); this.columnTypes = aoig.getColumnTypes(); this.oi = aoig.getObjectInspector(); + + if(!badSchema) { + this.avroSerializer = new AvroSerializer(); + this.avroDeserializer = new AvroDeserializer(); + } } private boolean hasExternalSchema(Properties properties) { @@ -214,7 +219,7 @@ public class AvroSerDe extends AbstractSerDe { if(badSchema) { throw new BadSchemaException(); } - return getSerializer().serialize(o, objectInspector, columnNames, columnTypes, schema); + return avroSerializer.serialize(o, objectInspector, columnNames, columnTypes, schema); } @Override @@ -222,7 +227,7 @@ public class AvroSerDe extends AbstractSerDe { if(badSchema) { throw new BadSchemaException(); } - return getDeserializer().deserialize(columnNames, columnTypes, writable, schema); + return avroDeserializer.deserialize(columnNames, columnTypes, writable, schema); } @Override @@ -236,22 +241,6 @@ public class AvroSerDe extends AbstractSerDe { return null; } - private AvroDeserializer getDeserializer() { - if(avroDeserializer == null) { - avroDeserializer = new AvroDeserializer(); - } - - return avroDeserializer; - } - - private AvroSerializer getSerializer() { - if(avroSerializer == null) { - avroSerializer = new AvroSerializer(); - } - - return avroSerializer; - } - @Override public boolean shouldStoreFieldsInMetastore(Map<String, String> tableParams) { return !hasExternalSchema(tableParams); http://git-wip-us.apache.org/repos/asf/hive/blob/8bfea2d0/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java index 391a300..d16abdb 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -205,12 +205,25 @@ public class AvroSerdeUtils { } /** - * In a nullable type, get the schema for the non-nullable type. This method - * does no checking that the provides Schema is nullable. + * If the union schema is a nullable union, get the schema for the non-nullable type. + * This method does no checking that the provided Schema is nullable. If the provided + * union schema is non-nullable, it simply returns the union schema */ - public static Schema getOtherTypeFromNullableType(Schema schema) { - List<Schema> itemSchemas = new ArrayList<>(); - for (Schema itemSchema : schema.getTypes()) { + public static Schema getOtherTypeFromNullableType(Schema unionSchema) { + final List<Schema> types = unionSchema.getTypes(); + if (types.size() == 2) { // most common scenario + if (types.get(0).getType() == Schema.Type.NULL) { + return types.get(1); + } + if (types.get(1).getType() == Schema.Type.NULL) { + return types.get(0); + } + // not a nullable union + return unionSchema; + } + + final List<Schema> itemSchemas = new ArrayList<>(); + for (Schema itemSchema : types) { if (!Schema.Type.NULL.equals(itemSchema.getType())) { itemSchemas.add(itemSchema); } http://git-wip-us.apache.org/repos/asf/hive/blob/8bfea2d0/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java ---------------------------------------------------------------------- diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java index 3dc3331..ef97d2d 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector; +import org.junit.Assert; import org.junit.Test; public class TestAvroDeserializer { @@ -256,6 +257,20 @@ public class TestAvroDeserializer { } @Test + public void canDeserializeSingleItemUnions() throws SerDeException, IOException { + Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.SINGLE_ITEM_UNION_SCHEMA); + GenericData.Record record = new GenericData.Record(s); + + record.put("aUnion", "this is a string"); + + ResultPair result = unionTester(s, record); + assertTrue(result.value instanceof String); + assertEquals("this is a string", result.value); + UnionObjectInspector uoi = (UnionObjectInspector)result.oi; + assertEquals(0, uoi.getTag(result.unionObject)); + } + + @Test public void canDeserializeUnions() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.UNION_SCHEMA); GenericData.Record record = new GenericData.Record(s); @@ -361,6 +376,58 @@ public class TestAvroDeserializer { return new ResultPair(fieldObjectInspector, value, theUnion); } + @Test + public void primitiveSchemaEvolution() throws Exception { + Schema fileSchema = AvroSerdeUtils.getSchemaFor( + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"r1\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"int_field\",\n" + + " \"type\": \"int\"\n" + + " }\n" + + " ]\n" + + "}" + ); + Schema readerSchema = AvroSerdeUtils.getSchemaFor( + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"r1\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"int_field\",\n" + + " \"type\": \"int\"\n" + + " },\n" + + " {\n" + + " \"name\": \"dec_field\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " {\n" + + " \"type\": \"bytes\",\n" + + " \"logicalType\": \"decimal\",\n" + + " \"precision\": 5,\n" + + " \"scale\": 4\n" + + " }\n" + + " ],\n" + + " \"default\": null\n" + + " }\n" + + " ]\n" + + "}" + ); + GenericData.Record record = new GenericData.Record(fileSchema); + + record.put("int_field", 1); + assertTrue(GENERIC_DATA.validate(fileSchema, record)); + AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record); + AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(readerSchema); + + AvroDeserializer de = new AvroDeserializer(); + List<Object> row = (List<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, readerSchema); + Assert.assertEquals(1, row.get(0)); + Assert.assertNull(row.get(1)); + } + @Test // Enums are one of two types we fudge for Hive. Enums go in, Strings come out. public void canDeserializeEnums() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.ENUM_SCHEMA); http://git-wip-us.apache.org/repos/asf/hive/blob/8bfea2d0/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java ---------------------------------------------------------------------- diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java index 3736a1f..ee83ba3 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java @@ -101,6 +101,17 @@ public class TestAvroObjectInspectorGenerator { " ]\n" + "}"; public static final String NULLABLE_RECORD_SCHEMA = "[\"null\", " + RECORD_SCHEMA + "]"; + public static final String SINGLE_ITEM_UNION_SCHEMA = "{\n" + + " \"namespace\": \"test.a.rossa\",\n" + + " \"name\": \"oneUnion\",\n" + + " \"type\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\":\"aUnion\",\n" + + " \"type\":[\"string\"]\n" + + " }\n" + + " ]\n" + + "}"; public static final String UNION_SCHEMA = "{\n" + " \"namespace\": \"test.a.rossa\",\n" + " \"name\": \"oneUnion\",\n" +