Repository: samza
Updated Branches:
  refs/heads/master c89841366 -> 66525b51e


SAMZA-1681: Samza-sql - Add support for handling older record schema versions 
in AvroRelConverter

In addition to handling older record schema versions in AvroRelConverter, this 
change also handles Avro enum and fixed types and also handles the proper 
conversion of samza message key to rel message.

Author: Aditya Toomula <atoom...@linkedin.com>

Reviewers: Srini P <spun...@linkedin.com>

Closes #481 from atoomula/rel


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/66525b51
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/66525b51
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/66525b51

Branch: refs/heads/master
Commit: 66525b51ee2cfc82b81798d2c9d450cfe770f21e
Parents: c898413
Author: Aditya Toomula <atoom...@linkedin.com>
Authored: Wed Apr 25 09:48:11 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Wed Apr 25 09:48:11 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/sql/avro/AvroRelConverter.java | 49 ++++++++++++++++----
 .../samza/sql/TestSamzaSqlRelMessageSerde.java  |  2 +-
 2 files changed, 42 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/66525b51/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 5793d6e..f121983 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -80,11 +80,18 @@ public class AvroRelConverter implements SamzaRelConverter {
     Object value = samzaMessage.getValue();
     if (value instanceof IndexedRecord) {
       IndexedRecord record = (IndexedRecord) value;
+      // Please note that record schema and cached schema could be different 
due to schema evolution.
+      // Always represent record schema in the form of cached schema. This 
approach has the side-effect
+      // of dropping the newly added fields in the scenarios where the record 
schema has newer version
+      // than the cached schema. [TODO: SAMZA-1679]
+      Schema recordSchema = record.getSchema();
       fieldNames.addAll(avroSchema.getFields().stream()
           .map(Schema.Field::name)
           .collect(Collectors.toList()));
       fieldValues.addAll(fieldNames.stream()
-          .map(f -> 
convertToJavaObject(record.get(avroSchema.getField(f).pos()), 
avroSchema.getField(f).schema()))
+          .map(f -> convertToJavaObject(
+              recordSchema.getField(f) != null ? 
record.get(recordSchema.getField(f).pos()) : null,
+              getNonNullUnionSchema(avroSchema.getField(f).schema())))
           .collect(Collectors.toList()));
     } else if (value == null) {
       
fieldNames.addAll(avroSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
@@ -95,7 +102,14 @@ public class AvroRelConverter implements SamzaRelConverter {
       throw new SamzaException(msg);
     }
 
-    return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, 
fieldValues);
+    Object key = samzaMessage.getKey();
+    if (key != null && key instanceof IndexedRecord) {
+      IndexedRecord keyRecord = (IndexedRecord) key;
+      Schema keySchema = keyRecord.getSchema();
+      key = convertToJavaObject(samzaMessage.getKey(), keySchema);
+    }
+
+    return new SamzaSqlRelMessage(key, fieldNames, fieldValues);
   }
 
   private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
@@ -109,7 +123,7 @@ public class AvroRelConverter implements SamzaRelConverter {
       values.addAll(avroRecord.getSchema().getFields()
           .stream()
           .map(f -> 
convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()),
-              avroRecord.getSchema().getField(f.name()).schema()))
+              
getNonNullUnionSchema(avroRecord.getSchema().getField(f.name()).schema())))
           .collect(Collectors.toList()));
     } else {
       String msg = "Avro Record is null";
@@ -141,13 +155,13 @@ public class AvroRelConverter implements 
SamzaRelConverter {
         Object relObj = values.get(index);
         String fieldName = fieldNames.get(index);
         Schema fieldSchema = schema.getField(fieldName).schema();
-        record.put(fieldName, convertToAvroObject(relObj, fieldSchema));
+        record.put(fieldName, convertToAvroObject(relObj, 
getNonNullUnionSchema(fieldSchema)));
       }
     }
     return record;
   }
 
-  private Object convertToAvroObject(Object relObj, Schema schema) {
+  public Object convertToAvroObject(Object relObj, Schema schema) {
     if (relObj == null) {
       return null;
     }
@@ -155,17 +169,27 @@ public class AvroRelConverter implements 
SamzaRelConverter {
       case RECORD:
         return convertToGenericRecord((SamzaSqlRelRecord) relObj, 
getNonNullUnionSchema(schema));
       case ARRAY:
+        if (((List<Object>) relObj).size() == 0) {
+          return null;
+        }
         List<Object> avroList = ((List<Object>) relObj).stream()
             .map(o -> convertToAvroObject(o, 
getNonNullUnionSchema(schema).getElementType()))
             .collect(Collectors.toList());
         return avroList;
       case MAP:
+        if (((Map<String, ?>) relObj).size() == 0) {
+          return null;
+        }
         return ((Map<String, ?>) relObj).entrySet()
             .stream()
             .collect(Collectors.toMap(Map.Entry::getKey, e -> 
convertToAvroObject(e.getValue(),
                 getNonNullUnionSchema(schema).getValueType())));
       case UNION:
         return convertToAvroObject(relObj, getNonNullUnionSchema(schema));
+      case FIXED:
+        return new GenericData.Fixed(schema, ((String) relObj).getBytes());
+      case ENUM:
+        return new GenericData.EnumSymbol(schema, (String) relObj);
       default:
         return relObj;
     }
@@ -173,7 +197,7 @@ public class AvroRelConverter implements SamzaRelConverter {
 
   // Not doing any validations of data types with Avro schema considering the 
resource cost per message.
   // Casting would fail if the data types are not in sync with the schema.
-  private Object convertToJavaObject(Object avroObj, Schema schema) {
+  public Object convertToJavaObject(Object avroObj, Schema schema) {
     switch(schema.getType()) {
       case RECORD:
         if (avroObj == null) {
@@ -183,11 +207,13 @@ public class AvroRelConverter implements 
SamzaRelConverter {
       case ARRAY: {
         ArrayList<Object> retVal = new ArrayList<>();
         if (avroObj != null) {
-          List<Object> avroArray = null;
+          List<Object> avroArray;
           if (avroObj instanceof GenericData.Array) {
             avroArray = (GenericData.Array) avroObj;
           } else if (avroObj instanceof List) {
             avroArray = (List) avroObj;
+          } else {
+            throw new SamzaException("Unsupported array type " + 
avroObj.getClass().getSimpleName());
           }
 
           if (avroArray != null) {
@@ -214,13 +240,20 @@ public class AvroRelConverter implements 
SamzaRelConverter {
           return null;
         }
         return convertToJavaObject(avroObj, getNonNullUnionSchema(schema));
+      case ENUM:
+      case FIXED:
+        if (avroObj == null) {
+          return null;
+        }
+        return avroObj.toString();
+
       default:
         return avroObj;
     }
   }
 
   // Two non-nullable types in a union is not yet supported.
-  private Schema getNonNullUnionSchema(Schema schema) {
+  public Schema getNonNullUnionSchema(Schema schema) {
     if (schema.getType().equals(Schema.Type.UNION)) {
       if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
         return schema.getTypes().get(0);

http://git-wip-us.apache.org/repos/asf/samza/blob/66525b51/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
index 883abbf..94695c4 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
@@ -81,7 +81,7 @@ public class TestSamzaSqlRelMessageSerde {
 
     for (Schema.Field field : Profile.SCHEMA$.getFields()) {
       // equals() on GenericRecord does the nested record equality check as 
well.
-      Assert.assertEquals(recordPostConversion.get(field.name()), 
messageRecordPair.getValue().get(field.name()));
+      Assert.assertEquals(messageRecordPair.getValue().get(field.name()), 
recordPostConversion.get(field.name()));
     }
   }
 

Reply via email to