Repository: samza Updated Branches: refs/heads/master c89841366 -> 66525b51e
SAMZA-1681: Samza-sql - Add support for handling older record schema versions in AvroRelConverter In addition to handling older record schema versions in AvroRelConverter, this change also handles Avro enum and fixed types and also handles the proper conversion of samza message key to rel message. Author: Aditya Toomula <atoom...@linkedin.com> Reviewers: Srini P <spun...@linkedin.com> Closes #481 from atoomula/rel Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/66525b51 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/66525b51 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/66525b51 Branch: refs/heads/master Commit: 66525b51ee2cfc82b81798d2c9d450cfe770f21e Parents: c898413 Author: Aditya Toomula <atoom...@linkedin.com> Authored: Wed Apr 25 09:48:11 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Wed Apr 25 09:48:11 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/sql/avro/AvroRelConverter.java | 49 ++++++++++++++++---- .../samza/sql/TestSamzaSqlRelMessageSerde.java | 2 +- 2 files changed, 42 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/66525b51/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java index 5793d6e..f121983 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java @@ -80,11 +80,18 @@ public class AvroRelConverter implements SamzaRelConverter { Object value = samzaMessage.getValue(); if (value instanceof IndexedRecord) { IndexedRecord record = (IndexedRecord) value; + // Please note that record schema and cached schema could be different due to schema evolution. + // Always represent record schema in the form of cached schema. This approach has the side-effect + // of dropping the newly added fields in the scenarios where the record schema has newer version + // than the cached schema. [TODO: SAMZA-1679] + Schema recordSchema = record.getSchema(); fieldNames.addAll(avroSchema.getFields().stream() .map(Schema.Field::name) .collect(Collectors.toList())); fieldValues.addAll(fieldNames.stream() - .map(f -> convertToJavaObject(record.get(avroSchema.getField(f).pos()), avroSchema.getField(f).schema())) + .map(f -> convertToJavaObject( + recordSchema.getField(f) != null ? record.get(recordSchema.getField(f).pos()) : null, + getNonNullUnionSchema(avroSchema.getField(f).schema()))) .collect(Collectors.toList())); } else if (value == null) { fieldNames.addAll(avroSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList())); @@ -95,7 +102,14 @@ public class AvroRelConverter implements SamzaRelConverter { throw new SamzaException(msg); } - return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, fieldValues); + Object key = samzaMessage.getKey(); + if (key != null && key instanceof IndexedRecord) { + IndexedRecord keyRecord = (IndexedRecord) key; + Schema keySchema = keyRecord.getSchema(); + key = convertToJavaObject(samzaMessage.getKey(), keySchema); + } + + return new SamzaSqlRelMessage(key, fieldNames, fieldValues); } private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) { @@ -109,7 +123,7 @@ public class AvroRelConverter implements SamzaRelConverter { values.addAll(avroRecord.getSchema().getFields() .stream() .map(f -> convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()), - avroRecord.getSchema().getField(f.name()).schema())) + getNonNullUnionSchema(avroRecord.getSchema().getField(f.name()).schema()))) .collect(Collectors.toList())); } else { String msg = "Avro Record is null"; @@ -141,13 +155,13 @@ public class AvroRelConverter implements SamzaRelConverter { Object relObj = values.get(index); String fieldName = fieldNames.get(index); Schema fieldSchema = schema.getField(fieldName).schema(); - record.put(fieldName, convertToAvroObject(relObj, fieldSchema)); + record.put(fieldName, convertToAvroObject(relObj, getNonNullUnionSchema(fieldSchema))); } } return record; } - private Object convertToAvroObject(Object relObj, Schema schema) { + public Object convertToAvroObject(Object relObj, Schema schema) { if (relObj == null) { return null; } @@ -155,17 +169,27 @@ public class AvroRelConverter implements SamzaRelConverter { case RECORD: return convertToGenericRecord((SamzaSqlRelRecord) relObj, getNonNullUnionSchema(schema)); case ARRAY: + if (((List<Object>) relObj).size() == 0) { + return null; + } List<Object> avroList = ((List<Object>) relObj).stream() .map(o -> convertToAvroObject(o, getNonNullUnionSchema(schema).getElementType())) .collect(Collectors.toList()); return avroList; case MAP: + if (((Map<String, ?>) relObj).size() == 0) { + return null; + } return ((Map<String, ?>) relObj).entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> convertToAvroObject(e.getValue(), getNonNullUnionSchema(schema).getValueType()))); case UNION: return convertToAvroObject(relObj, getNonNullUnionSchema(schema)); + case FIXED: + return new GenericData.Fixed(schema, ((String) relObj).getBytes()); + case ENUM: + return new GenericData.EnumSymbol(schema, (String) relObj); default: return relObj; } @@ -173,7 +197,7 @@ public class AvroRelConverter implements SamzaRelConverter { // Not doing any validations of data types with Avro schema considering the resource cost per message. // Casting would fail if the data types are not in sync with the schema. - private Object convertToJavaObject(Object avroObj, Schema schema) { + public Object convertToJavaObject(Object avroObj, Schema schema) { switch(schema.getType()) { case RECORD: if (avroObj == null) { @@ -183,11 +207,13 @@ public class AvroRelConverter implements SamzaRelConverter { case ARRAY: { ArrayList<Object> retVal = new ArrayList<>(); if (avroObj != null) { - List<Object> avroArray = null; + List<Object> avroArray; if (avroObj instanceof GenericData.Array) { avroArray = (GenericData.Array) avroObj; } else if (avroObj instanceof List) { avroArray = (List) avroObj; + } else { + throw new SamzaException("Unsupported array type " + avroObj.getClass().getSimpleName()); } if (avroArray != null) { @@ -214,13 +240,20 @@ public class AvroRelConverter implements SamzaRelConverter { return null; } return convertToJavaObject(avroObj, getNonNullUnionSchema(schema)); + case ENUM: + case FIXED: + if (avroObj == null) { + return null; + } + return avroObj.toString(); + default: return avroObj; } } // Two non-nullable types in a union is not yet supported. - private Schema getNonNullUnionSchema(Schema schema) { + public Schema getNonNullUnionSchema(Schema schema) { if (schema.getType().equals(Schema.Type.UNION)) { if (schema.getTypes().get(0).getType() != Schema.Type.NULL) { return schema.getTypes().get(0); http://git-wip-us.apache.org/repos/asf/samza/blob/66525b51/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java index 883abbf..94695c4 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java @@ -81,7 +81,7 @@ public class TestSamzaSqlRelMessageSerde { for (Schema.Field field : Profile.SCHEMA$.getFields()) { // equals() on GenericRecord does the nested record equality check as well. - Assert.assertEquals(recordPostConversion.get(field.name()), messageRecordPair.getValue().get(field.name())); + Assert.assertEquals(messageRecordPair.getValue().get(field.name()), recordPostConversion.get(field.name())); } }