SAMZA-1619: Samza-sql: Support serialization of nested samza sql relational 
message

Added support for serialization of nested samza sql rel message and accordingly 
fixed the conversion of nested avro records to rel message. Please note that we 
still do not have support for the sql queries that point to fields in nested 
records (beyond the top level record).

Author: Aditya Toomula <atoom...@linkedin.com>
Author: Aditya Toomula <atoom...@atoomula-ld1.linkedin.biz>

Reviewers: Srinivasulu P<spun...@linkedin.com>

Closes #464 from atoomula/serde


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/58c39e37
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/58c39e37
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/58c39e37

Branch: refs/heads/master
Commit: 58c39e37121c9e11a5266cf7eaa3aaf6560ac6c9
Parents: bf2a2f7
Author: Aditya Toomula <atoom...@linkedin.com>
Authored: Mon Apr 16 10:26:51 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Mon Apr 16 10:26:51 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/sql/avro/AvroRelConverter.java | 228 +++++++++++--------
 .../samza/sql/avro/AvroTypeFactoryImpl.java     |   9 +-
 .../samza/sql/data/SamzaSqlCompositeKey.java    |   2 +-
 .../samza/sql/data/SamzaSqlRelMessage.java      | 135 +++++++----
 .../SamzaSqlRelMessageSerdeFactory.java         |  67 ++++++
 .../samza/sql/translator/FilterTranslator.java  |   6 +-
 .../samza/sql/translator/JoinTranslator.java    |   7 +-
 .../samza/sql/translator/ProjectTranslator.java |   9 +-
 .../SamzaSqlRelMessageJoinFunction.java         |   6 +-
 .../samza/sql/TestSamzaSqlRelMessage.java       |   6 +-
 .../sql/TestSamzaSqlRelMessageJoinFunction.java |  15 +-
 .../samza/sql/TestSamzaSqlRelMessageSerde.java  |  68 +++++-
 .../samza/sql/avro/TestAvroRelConversion.java   | 121 ++++++++--
 .../samza/sql/avro/schemas/AddressRecord.java   |  52 +++++
 .../sql/avro/schemas/EnrichedPageView.avsc      |  38 +++-
 .../sql/avro/schemas/EnrichedPageView.java      |   6 +-
 .../org/apache/samza/sql/avro/schemas/Kind.java |  30 +++
 .../samza/sql/avro/schemas/PhoneNumber.java     |  50 ++++
 .../apache/samza/sql/avro/schemas/Profile.avsc  | 106 ++++++++-
 .../apache/samza/sql/avro/schemas/Profile.java  |  18 +-
 .../samza/sql/avro/schemas/SimpleRecord.avsc    |   2 +-
 .../samza/sql/avro/schemas/SimpleRecord.java    |   2 +-
 .../samza/sql/avro/schemas/StreetNumRecord.java |  48 ++++
 .../samza/sql/system/TestAvroSystemFactory.java |  74 +++++-
 .../test/samzasql/TestSamzaSqlEndToEnd.java     |  34 +++
 .../tools/avro/AvroSchemaGenRelConverter.java   |   4 +-
 .../tools/json/JsonRelConverterFactory.java     |   4 +-
 27 files changed, 941 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index e247415..5793d6e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -29,7 +29,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
@@ -39,6 +38,8 @@ import org.apache.samza.system.SystemStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord;
+
 
 /**
  * This class converts a Samza Avro messages to Relational messages and vice 
versa.
@@ -48,141 +49,186 @@ import org.slf4j.LoggerFactory;
  *     The key part of the samza message is represented as a special column 
{@link SamzaSqlRelMessage#KEY_NAME}
  *     in relational message.
  *
- *     The value part of the samza message is expected to be {@link 
IndexedRecord}, All the fields in the IndexedRecord form
- *     the corresponding fields of the relational message.
+ *     The value part of the samza message is expected to be {@link 
IndexedRecord}, All the fields in the IndexedRecord
+ *     form the corresponding fields of the relational message.
  *
  * Conversion from Relational to Samza Message :
  *     This converts the Samza relational message into Avro {@link 
GenericRecord}.
- *     All the fields of the relational message is become fields of the Avro 
GenericRecord except of the field with name
+ *     All the fields of the relational message become fields of the Avro 
GenericRecord except the field with name
  *     {@link SamzaSqlRelMessage#KEY_NAME}. This special field becomes the Key 
in the output Samza message.
  */
 public class AvroRelConverter implements SamzaRelConverter {
 
   protected final Config config;
   private final Schema avroSchema;
-  private final RelDataType relationalSchema;
-
-  /**
-   * Class that converts the avro field to their corresponding relational 
fields
-   * Array fields are converted from Avro {@link 
org.apache.avro.generic.GenericData.Array} to {@link ArrayList}
-   */
-  public enum AvroToRelObjConverter {
-
-    /**
-     * If the relational field type is ArraySqlType, We expect the avro field 
to be of type either
-     * {@link GenericData.Array} or {@link List} which then is converted to 
Rel field of type {@link ArrayList}
-     */
-    ArraySqlType {
-      @Override
-      Object convert(Object avroObj) {
-        ArrayList<Object> retVal = new ArrayList<>();
-        if (avroObj != null) {
-          if (avroObj instanceof GenericData.Array) {
-            retVal.addAll(((GenericData.Array) avroObj));
-          } else if (avroObj instanceof List) {
-            retVal.addAll((List) avroObj);
-          }
-        }
-
-        return retVal;
-      }
-    },
-
-    /**
-     * If the relational field type is MapSqlType, We expect the avro field to 
be of type
-     * {@link Map}
-     */
-    MapSqlType {
-      @Override
-      Object convert(Object obj) {
-        Map<String, Object> retVal = new HashMap<>();
-        if (obj != null) {
-          retVal.putAll((Map<String, ?>) obj);
-        }
-        return retVal;
-      }
-    },
-
-    /**
-     * If the relational field type is RelRecordType, The field is considered 
an object
-     * and moved to rel field without any translation.
-     */
-    RelRecordType {
-      @Override
-      Object convert(Object obj) {
-        return obj;
-      }
-    },
-
-    /**
-     * If the relational field type is BasicSqlType, The field is moved to rel 
field without any translation.
-     */
-    BasicSqlType {
-      @Override
-      Object convert(Object obj) {
-        return obj;
-      }
-    };
-
-    abstract Object convert(Object obj);
-  }
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AvroRelConverter.class);
 
-  private final Schema arraySchema = Schema.parse(
-      
"{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]},\"java-class\":\"java.util.List\"}");
-  private final Schema mapSchema = Schema.parse(
-      
"{\"type\":\"map\",\"values\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]}}");
-
   public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider 
schemaProvider, Config config) {
     this.config = config;
-    this.relationalSchema = schemaProvider.getRelationalSchema();
     this.avroSchema = Schema.parse(schemaProvider.getSchema(systemStream));
   }
 
+  /**
+   * Converts the nested avro object in SamzaMessage to relational message 
corresponding to
+   * the tableName with relational schema.
+   */
   @Override
   public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> 
samzaMessage) {
-    List<Object> values = new ArrayList<>();
+    List<Object> fieldValues = new ArrayList<>();
     List<String> fieldNames = new ArrayList<>();
     Object value = samzaMessage.getValue();
     if (value instanceof IndexedRecord) {
       IndexedRecord record = (IndexedRecord) value;
-      fieldNames.addAll(relationalSchema.getFieldNames());
-      values.addAll(relationalSchema.getFieldList()
-          .stream()
-          .map(x -> getRelField(x.getType(), 
record.get(this.avroSchema.getField(x.getName()).pos())))
+      fieldNames.addAll(avroSchema.getFields().stream()
+          .map(Schema.Field::name)
+          .collect(Collectors.toList()));
+      fieldValues.addAll(fieldNames.stream()
+          .map(f -> 
convertToJavaObject(record.get(avroSchema.getField(f).pos()), 
avroSchema.getField(f).schema()))
           .collect(Collectors.toList()));
     } else if (value == null) {
-      fieldNames.addAll(relationalSchema.getFieldNames());
-      IntStream.range(0, fieldNames.size()).forEach(x -> values.add(null));
+      
fieldNames.addAll(avroSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
+      IntStream.range(0, fieldNames.size()).forEach(x -> 
fieldValues.add(null));
     } else {
       String msg = "Avro message converter doesn't support messages of type " 
+ value.getClass();
       LOG.error(msg);
       throw new SamzaException(msg);
     }
 
-    return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, values);
+    return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, 
fieldValues);
+  }
+
+  private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
+    List<Object> values = new ArrayList<>();
+    List<String> fieldNames = new ArrayList<>();
+    if (avroRecord != null) {
+      fieldNames.addAll(avroRecord.getSchema().getFields()
+          .stream()
+          .map(Schema.Field::name)
+          .collect(Collectors.toList()));
+      values.addAll(avroRecord.getSchema().getFields()
+          .stream()
+          .map(f -> 
convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()),
+              avroRecord.getSchema().getField(f.name()).schema()))
+          .collect(Collectors.toList()));
+    } else {
+      String msg = "Avro Record is null";
+      LOG.error(msg);
+      throw new SamzaException(msg);
+    }
+
+    return new SamzaSqlRelRecord(fieldNames, values);
   }
 
+  /**
+   * Convert the nested relational message to the output samza message.
+   */
   @Override
   public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage 
relMessage) {
     return convertToSamzaMessage(relMessage, this.avroSchema);
   }
 
   protected KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage 
relMessage, Schema avroSchema) {
-    GenericRecord record = new GenericData.Record(avroSchema);
-    List<String> fieldNames = relMessage.getFieldNames();
-    List<Object> values = relMessage.getFieldValues();
+    return new KV<>(relMessage.getKey(), 
convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), avroSchema));
+  }
+
+  private GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, 
Schema schema) {
+    GenericRecord record = new GenericData.Record(schema);
+    List<String> fieldNames = relRecord.getFieldNames();
+    List<Object> values = relRecord.getFieldValues();
     for (int index = 0; index < fieldNames.size(); index++) {
       if 
(!fieldNames.get(index).equalsIgnoreCase(SamzaSqlRelMessage.KEY_NAME)) {
-        record.put(fieldNames.get(index), values.get(index));
+        Object relObj = values.get(index);
+        String fieldName = fieldNames.get(index);
+        Schema fieldSchema = schema.getField(fieldName).schema();
+        record.put(fieldName, convertToAvroObject(relObj, fieldSchema));
       }
     }
+    return record;
+  }
 
-    return new KV<>(relMessage.getKey(), record);
+  private Object convertToAvroObject(Object relObj, Schema schema) {
+    if (relObj == null) {
+      return null;
+    }
+    switch(schema.getType()) {
+      case RECORD:
+        return convertToGenericRecord((SamzaSqlRelRecord) relObj, 
getNonNullUnionSchema(schema));
+      case ARRAY:
+        List<Object> avroList = ((List<Object>) relObj).stream()
+            .map(o -> convertToAvroObject(o, 
getNonNullUnionSchema(schema).getElementType()))
+            .collect(Collectors.toList());
+        return avroList;
+      case MAP:
+        return ((Map<String, ?>) relObj).entrySet()
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
convertToAvroObject(e.getValue(),
+                getNonNullUnionSchema(schema).getValueType())));
+      case UNION:
+        return convertToAvroObject(relObj, getNonNullUnionSchema(schema));
+      default:
+        return relObj;
+    }
   }
 
-  private Object getRelField(RelDataType relType, Object avroObj) {
-    return 
AvroToRelObjConverter.valueOf(relType.getClass().getSimpleName()).convert(avroObj);
+  // Not doing any validations of data types with Avro schema considering the 
resource cost per message.
+  // Casting would fail if the data types are not in sync with the schema.
+  private Object convertToJavaObject(Object avroObj, Schema schema) {
+    switch(schema.getType()) {
+      case RECORD:
+        if (avroObj == null) {
+          return null;
+        }
+        return convertToRelRecord((IndexedRecord) avroObj);
+      case ARRAY: {
+        ArrayList<Object> retVal = new ArrayList<>();
+        if (avroObj != null) {
+          List<Object> avroArray = null;
+          if (avroObj instanceof GenericData.Array) {
+            avroArray = (GenericData.Array) avroObj;
+          } else if (avroObj instanceof List) {
+            avroArray = (List) avroObj;
+          }
+
+          if (avroArray != null) {
+            retVal.addAll(
+                avroArray.stream()
+                    .map(v -> convertToJavaObject(v, 
getNonNullUnionSchema(schema).getElementType()))
+                    .collect(Collectors.toList()));
+          }
+        }
+        return retVal;
+      }
+      case MAP: {
+        Map<String, Object> retVal = new HashMap<>();
+        if (avroObj != null) {
+          retVal.putAll(((Map<String, ?>) avroObj).entrySet().stream()
+              .collect(Collectors.toMap(
+                  Map.Entry::getKey,
+                  e -> convertToJavaObject(e.getValue(), 
getNonNullUnionSchema(schema).getValueType()))));
+        }
+        return retVal;
+      }
+      case UNION:
+        if (avroObj == null) {
+          return null;
+        }
+        return convertToJavaObject(avroObj, getNonNullUnionSchema(schema));
+      default:
+        return avroObj;
+    }
+  }
+
+  // Two non-nullable types in a union is not yet supported.
+  private Schema getNonNullUnionSchema(Schema schema) {
+    if (schema.getType().equals(Schema.Type.UNION)) {
+      if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
+        return schema.getTypes().get(0);
+      }
+      if (schema.getTypes().get(1).getType() != Schema.Type.NULL) {
+        return schema.getTypes().get(1);
+      }
+    }
+    return schema;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java 
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index 74e15e9..288fdc4 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -28,11 +28,9 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rel.type.RelRecordType;
-import org.apache.calcite.sql.type.ArraySqlType;
 import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.samza.SamzaException;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +55,10 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
       throw new SamzaException(msg);
     }
 
+    return convertRecordType(schema);
+  }
+
+  private RelDataType convertRecordType(Schema schema) {
     List<RelDataTypeField> relFields = getRelFields(schema.getFields());
     return new RelRecordType(relFields);
   }
@@ -101,8 +103,7 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl 
{
       case LONG:
         return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), 
true);
       case RECORD:
-//        List<RelDataTypeField> relFields = getRelFields(fieldSchema);
-//        return new RelRecordType(relFields);
+        // return createTypeWithNullability(convertRecordType(fieldSchema), 
true);
         // TODO Calcite execution engine doesn't support record type yet.
         return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
       case MAP:

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
index f646d9a..54c8391 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
@@ -74,7 +74,7 @@ public class SamzaSqlCompositeKey implements Serializable {
   public static SamzaSqlCompositeKey 
createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) {
     ArrayList<Object> keyParts = new ArrayList<>();
     for (int idx : relIdx) {
-      keyParts.add(message.getFieldValues().get(idx));
+      keyParts.add(message.getSamzaSqlRelRecord().getFieldValues().get(idx));
     }
     return new SamzaSqlCompositeKey(keyParts);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index b54634f..9bf1870 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -23,17 +23,20 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
 import org.codehaus.jackson.annotate.JsonProperty;
 
 
 /**
  * Samza sql relational message. Each Samza sql relational message represents 
a relational row in a table.
- * Each row of the relational table and hence SamzaSqlRelMessage consists of 
list of column values and
- * their associated column names. Right now we donot store any other metadata 
other than the column name in the
- * SamzaSqlRelationalMessage, In future if we find a need, we could add 
additional column ddl metadata around
- * primary Key, nullability, etc.
- * TODO: SAMZA-1619 Support serialization of nested SamzaSqlRelMessage.
+ * Each row of the relational table consists of a primary key and {@link 
SamzaSqlRelRecord}, which consists of a list
+ * of column values and the associated column names.
  */
 public class SamzaSqlRelMessage implements Serializable {
 
@@ -41,36 +44,29 @@ public class SamzaSqlRelMessage implements Serializable {
 
   private final Object key;
 
-  @JsonProperty("fieldNames")
-  private final List<String> fieldNames;
-  @JsonProperty("fieldValues")
-  private final List<Object> fieldValues;
+  @JsonProperty("samzaSqlRelRecord")
+  private final SamzaSqlRelRecord samzaSqlRelRecord;
 
   /**
    * Creates a {@link SamzaSqlRelMessage} from the list of relational fields 
and values.
-   * If the field list contains KEY, then it extracts the key out of the 
fields to creates a
-   * RelMessage with key and values otherwise creates a Relmessage without the 
key.
+   * If the field list contains KEY, then it extracts the key out of the 
fields to create a
+   * {@link SamzaSqlRelRecord} along with key, otherwise creates a {@link 
SamzaSqlRelRecord}
+   * without the key.
    * @param fieldNames Ordered list of field names in the row.
-   * @param fieldValues  Ordered list of all the values in the row. Some of 
the fields can be null, This could be result of
-   *               delete change capture event in the stream or because of the 
result of the outer join or the fields
-   *               themselves are null in the original stream.
+   * @param fieldValues  Ordered list of all the values in the row. Some of 
the fields can be null, This could be
+   *                     result of delete change capture event in the stream 
or because of the result of the outer join
+   *                     or the fields themselves are null in the original 
stream.
    */
-  public SamzaSqlRelMessage(@JsonProperty("fieldNames") List<String> 
fieldNames,
-      @JsonProperty("fieldValues") List<Object> fieldValues) {
+  public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues) 
{
     Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and 
values are not of same length.");
 
-    this.fieldNames = new ArrayList<>();
-    this.fieldValues = new ArrayList<>();
-
     int keyIndex = fieldNames.indexOf(KEY_NAME);
     Object key = null;
     if (keyIndex != -1) {
       key = fieldValues.get(keyIndex);
     }
     this.key = key;
-
-    this.fieldNames.addAll(fieldNames);
-    this.fieldValues.addAll(fieldValues);
+    this.samzaSqlRelRecord = new SamzaSqlRelRecord(fieldNames, fieldValues);
   }
 
   /**
@@ -85,29 +81,29 @@ public class SamzaSqlRelMessage implements Serializable {
   public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> 
fieldValues) {
     Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and 
values are not of same length.");
 
-    this.fieldNames = new ArrayList<>();
-    this.fieldValues = new ArrayList<>();
+    List<String> tmpFieldNames = new ArrayList<>();
+    List<Object> tmpFieldValues = new ArrayList<>();
 
     this.key = key;
-    this.fieldNames.add(KEY_NAME);
-    this.fieldValues.add(key);
+    tmpFieldNames.add(KEY_NAME);
+    tmpFieldValues.add(key);
 
-    this.fieldNames.addAll(fieldNames);
-    this.fieldValues.addAll(fieldValues);
+    tmpFieldNames.addAll(fieldNames);
+    tmpFieldValues.addAll(fieldValues);
+
+    this.samzaSqlRelRecord = new SamzaSqlRelRecord(tmpFieldNames, 
tmpFieldValues);
   }
 
   /**
-   * Get the field names of all the columns in the relational message.
-   * @return the field names of all columns.
+   * Creates the SamzaSqlRelMessage from {@link SamzaSqlRelRecord}.
    */
-  @JsonProperty("fieldNames")
-  public List<String> getFieldNames() {
-    return fieldNames;
+  public SamzaSqlRelMessage(@JsonProperty("samzaSqlRelRecord") 
SamzaSqlRelRecord samzaSqlRelRecord) {
+    this(samzaSqlRelRecord.getFieldNames(), 
samzaSqlRelRecord.getFieldValues());
   }
 
-  @JsonProperty("fieldValues")
-  public List<Object> getFieldValues() {
-    return this.fieldValues;
+  @JsonProperty("samzaSqlRelRecord")
+  public SamzaSqlRelRecord getSamzaSqlRelRecord() {
+    return samzaSqlRelRecord;
   }
 
   public Object getKey() {
@@ -115,17 +111,66 @@ public class SamzaSqlRelMessage implements Serializable {
   }
 
   /**
-   * Get the value of the field corresponding to the field name.
-   * @param name Name of the field.
-   * @return returns the value of the field.
+   * Samza sql relational record. A record consists of list of column values 
and the associated column names.
+   * A column value could be nested, meaning, it could be another 
SamzaSqlRelRecord.
+   * Right now we do not store any metadata (like nullability, etc) other than 
the column name in the SamzaSqlRelRecord.
    */
-  public Optional<Object> getField(String name) {
-    for (int index = 0; index < fieldNames.size(); index++) {
-      if (fieldNames.get(index).equals(name)) {
-        return Optional.ofNullable(fieldValues.get(index));
-      }
+  public static class SamzaSqlRelRecord implements Serializable {
+
+    @JsonProperty("fieldNames")
+    private final List<String> fieldNames;
+    @JsonProperty("fieldValues")
+    private final List<Object> fieldValues;
+
+    /**
+     * Creates a {@link SamzaSqlRelRecord} from the list of relational fields 
and values.
+     * @param fieldNames Ordered list of field names in the row.
+     * @param fieldValues  Ordered list of all the values in the row. Some of 
the fields can be null. This could be
+     *                     result of delete change capture event in the stream 
or because of the result of the outer
+     *                     join or the fields themselves are null in the 
original stream.
+     */
+    public SamzaSqlRelRecord(@JsonProperty("fieldNames") List<String> 
fieldNames,
+        @JsonProperty("fieldValues") List<Object> fieldValues) {
+      Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names 
and values are not of same length.");
+
+      this.fieldNames = new ArrayList<>();
+      this.fieldValues = new ArrayList<>();
+
+      this.fieldNames.addAll(fieldNames);
+      this.fieldValues.addAll(fieldValues);
     }
 
-    return Optional.empty();
+    /**
+     * Get the field names of all the columns in the relational message.
+     * @return the field names of all columns.
+     */
+    @JsonProperty("fieldNames")
+    public List<String> getFieldNames() {
+      return this.fieldNames;
+    }
+
+    /**
+     * Get the field values of all the columns in the relational message.
+     * @return the field values of all columns.
+     */
+    @JsonProperty("fieldValues")
+    public List<Object> getFieldValues() {
+      return this.fieldValues;
+    }
+
+    /**
+     * Get the value of the field corresponding to the field name.
+     * @param name Name of the field.
+     * @return returns the value of the field.
+     */
+    public Optional<Object> getField(String name) {
+      for (int index = 0; index < fieldNames.size(); index++) {
+        if (fieldNames.get(index).equals(name)) {
+          return Optional.ofNullable(fieldValues.get(index));
+        }
+      }
+
+      return Optional.empty();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java
new file mode 100644
index 0000000..45542ca
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java
@@ -0,0 +1,67 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.serializers;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+
+/**
+ * A serializer for {@link SamzaSqlRelMessage}. This serializer preserves the 
type information as
+ * {@link SamzaSqlRelMessage} contains nested {@link 
org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord}
+ * records.
+ */
+public final class SamzaSqlRelMessageSerdeFactory implements 
SerdeFactory<SamzaSqlRelMessage> {
+  public Serde<SamzaSqlRelMessage> getSerde(String name, Config config) {
+    return new SamzaSqlRelMessageSerde();
+  }
+
+  public final static class SamzaSqlRelMessageSerde implements 
Serde<SamzaSqlRelMessage> {
+
+    @Override
+    public SamzaSqlRelMessage fromBytes(byte[] bytes) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        // Enable object typing to handle nested records
+        mapper.enableDefaultTyping();
+        return mapper.readValue(new String(bytes, "UTF-8"), new 
TypeReference<SamzaSqlRelMessage>() {});
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+
+    @Override
+    public byte[] toBytes(SamzaSqlRelMessage p) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        // Enable object typing to handle nested records
+        mapper.enableDefaultTyping();
+        return mapper.writeValueAsString(p).getBytes("UTF-8");
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index 798f0b3..5832b21 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -55,11 +55,13 @@ class FilterTranslator {
 
     return inputStream.filter(message -> {
       Object[] result = new Object[1];
-      expr.execute(context.getExecutionContext(), context.getDataContext(), 
message.getFieldValues().toArray(), result);
+      expr.execute(context.getExecutionContext(), context.getDataContext(),
+          message.getSamzaSqlRelRecord().getFieldValues().toArray(), result);
       if (result.length > 0 && result[0] instanceof Boolean) {
         boolean retVal = (Boolean) result[0];
         log.debug(
-            String.format("return value for input %s is %s", 
Arrays.asList(message.getFieldValues()).toString(), retVal));
+            String.format("return value for input %s is %s",
+                
Arrays.asList(message.getSamzaSqlRelRecord().getFieldValues()).toString(), 
retVal));
         return retVal;
       } else {
         log.error("return value is not boolean");

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 70c1968..899ca41 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -43,12 +43,14 @@ import org.apache.samza.serializers.Serde;
 import org.apache.samza.sql.data.SamzaSqlCompositeKey;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.storage.kv.RocksDbTableDescriptor;
 import org.apache.samza.table.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*;
+import static 
org.apache.samza.sql.data.SamzaSqlCompositeKey.createSamzaSqlCompositeKey;
+import static 
org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde;
 
 
 /**
@@ -94,7 +96,8 @@ class JoinTranslator {
         tableKeyIds);
 
     JsonSerdeV2<SamzaSqlCompositeKey> keySerde = new 
JsonSerdeV2<>(SamzaSqlCompositeKey.class);
-    JsonSerdeV2<SamzaSqlRelMessage> relMsgSerde = new 
JsonSerdeV2<>(SamzaSqlRelMessage.class);
+    SamzaSqlRelMessageSerde relMsgSerde =
+        (SamzaSqlRelMessageSerde) new 
SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
 
     Table table = loadLocalTable(isTablePosOnRight, tableKeyIds, keySerde, 
relMsgSerde, join, context);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 0f31fb6..a0bd45f 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -66,7 +66,8 @@ class ProjectTranslator {
     MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(m -> {
       RelDataType type = project.getRowType();
       Object[] output = new Object[type.getFieldCount()];
-      expr.execute(context.getExecutionContext(), context.getDataContext(), 
m.getFieldValues().toArray(), output);
+      expr.execute(context.getExecutionContext(), context.getDataContext(),
+          m.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
       List<String> names = new ArrayList<>();
       for (int index = 0; index < output.length; index++) {
         names.add(index, project.getNamedProjects().get(index).getValue());
@@ -81,14 +82,14 @@ class ProjectTranslator {
   private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer 
flattenIndex,
       MessageStream<SamzaSqlRelMessage> inputStream) {
     return inputStream.flatMap(message -> {
-      Object field = message.getFieldValues().get(flattenIndex);
+      Object field = 
message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex);
 
       if (field != null && field instanceof List) {
         List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
         for (Object fieldValue : (List) field) {
-          List<Object> newValues = new ArrayList<>(message.getFieldValues());
+          List<Object> newValues = new 
ArrayList<>(message.getSamzaSqlRelRecord().getFieldValues());
           newValues.set(flattenIndex, Collections.singletonList(fieldValue));
-          outMessages.add(new SamzaSqlRelMessage(message.getFieldNames(), 
newValues));
+          outMessages.add(new 
SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues));
         }
         return outMessages;
       } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
index 69e4e09..df88a7c 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
@@ -85,12 +85,12 @@ public class SamzaSqlRelMessageJoinFunction
 
     // If table position is on the right, add the stream message fields first
     if (isTablePosOnRight) {
-      outFieldValues.addAll(message.getFieldValues());
+      outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues());
     }
 
     // Add the table record fields.
     if (record != null) {
-      outFieldValues.addAll(record.getValue().getFieldValues());
+      
outFieldValues.addAll(record.getValue().getSamzaSqlRelRecord().getFieldValues());
     } else {
       // Table record could be null as the record could not be found in the 
store. This can
       // happen for outer joins. Add nulls to all the field values in the 
output message.
@@ -99,7 +99,7 @@ public class SamzaSqlRelMessageJoinFunction
 
     // If table position is on the left, add the stream message fields last
     if (!isTablePosOnRight) {
-      outFieldValues.addAll(message.getFieldValues());
+      outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues());
     }
 
     return new SamzaSqlRelMessage(outFieldNames, outFieldValues);

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
index e58563c..689af72 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java
@@ -34,13 +34,13 @@ public class TestSamzaSqlRelMessage {
   @Test
   public void testGetField() {
     SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
-    Assert.assertEquals(values.get(0), message.getField(names.get(0)).get());
-    Assert.assertEquals(values.get(1), message.getField(names.get(1)).get());
+    Assert.assertEquals(values.get(0), 
message.getSamzaSqlRelRecord().getField(names.get(0)).get());
+    Assert.assertEquals(values.get(1), 
message.getSamzaSqlRelRecord().getField(names.get(1)).get());
   }
 
   @Test
   public void testGetNonExistentField() {
     SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
-    Assert.assertFalse(message.getField("field3").isPresent());
+    
Assert.assertFalse(message.getSamzaSqlRelRecord().getField("field3").isPresent());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
index 3da004a..90fce3b 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java
@@ -53,12 +53,13 @@ public class TestSamzaSqlRelMessageJoinFunction {
         new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, 
streamFieldNames, tableFieldNames);
     SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
 
-    Assert.assertEquals(outMsg.getFieldValues().size(), 
outMsg.getFieldNames().size());
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+        outMsg.getSamzaSqlRelRecord().getFieldNames().size());
     List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
     expectedFieldNames.addAll(tableFieldNames);
     List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
     expectedFieldValues.addAll(tableFieldValues);
-    Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues);
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), 
expectedFieldValues);
   }
 
   @Test
@@ -75,12 +76,13 @@ public class TestSamzaSqlRelMessageJoinFunction {
         new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, 
streamFieldNames, tableFieldNames);
     SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record);
 
-    Assert.assertEquals(outMsg.getFieldValues().size(), 
outMsg.getFieldNames().size());
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+        outMsg.getSamzaSqlRelRecord().getFieldNames().size());
     List<String> expectedFieldNames = new ArrayList<>(tableFieldNames);
     expectedFieldNames.addAll(streamFieldNames);
     List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues);
     expectedFieldValues.addAll(streamFieldValues);
-    Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues);
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), 
expectedFieldValues);
   }
 
   @Test
@@ -106,11 +108,12 @@ public class TestSamzaSqlRelMessageJoinFunction {
             tableFieldNames);
     SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null);
 
-    Assert.assertEquals(outMsg.getFieldValues().size(), 
outMsg.getFieldNames().size());
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(),
+        outMsg.getSamzaSqlRelRecord().getFieldNames().size());
     List<String> expectedFieldNames = new ArrayList<>(streamFieldNames);
     expectedFieldNames.addAll(tableFieldNames);
     List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues);
     expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null 
).collect(Collectors.toList()));
-    Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues);
+    Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), 
expectedFieldValues);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
index 3416ee1..883abbf 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
@@ -20,12 +20,29 @@
 package org.apache.samza.sql;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
-import org.apache.samza.serializers.JsonSerdeV2;
+import java.util.Map;
+import javafx.util.Pair;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.avro.AvroRelConverter;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
+import org.apache.samza.sql.avro.schemas.AddressRecord;
+import org.apache.samza.sql.avro.schemas.Profile;
+import org.apache.samza.sql.avro.schemas.StreetNumRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
+import org.apache.samza.system.SystemStream;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static 
org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde;
+
 
 public class TestSamzaSqlRelMessageSerde {
 
@@ -35,9 +52,52 @@ public class TestSamzaSqlRelMessageSerde {
   @Test
   public void testWithDifferentFields() {
     SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
-    JsonSerdeV2<SamzaSqlRelMessage> serde = new 
JsonSerdeV2<>(SamzaSqlRelMessage.class);
+    SamzaSqlRelMessageSerde serde =
+        (SamzaSqlRelMessageSerde) new 
SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
     SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(message));
-    Assert.assertEquals(resultMsg.getFieldNames(), names);
-    Assert.assertEquals(resultMsg.getFieldValues(), values);
+    Assert.assertEquals(names, 
resultMsg.getSamzaSqlRelRecord().getFieldNames());
+    Assert.assertEquals(values, 
resultMsg.getSamzaSqlRelRecord().getFieldValues());
   }
+
+  @Test
+  public void testNestedRecordConversion() {
+    Map<String, String> props = new HashMap<>();
+    SystemStream ss1 = new SystemStream("test", "nestedRecord");
+    props.put(
+        
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, 
ss1.getSystem(), ss1.getStream()),
+        Profile.SCHEMA$.toString());
+    ConfigBasedAvroRelSchemaProviderFactory factory = new 
ConfigBasedAvroRelSchemaProviderFactory();
+    AvroRelSchemaProvider nestedRecordSchemaProvider = (AvroRelSchemaProvider) 
factory.create(ss1, new MapConfig(props));
+    AvroRelConverter nestedRecordAvroRelConverter = new AvroRelConverter(ss1, 
nestedRecordSchemaProvider, new MapConfig());
+
+    Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair =
+        createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter);
+    SamzaSqlRelMessageSerde serde =
+        (SamzaSqlRelMessageSerde) new 
SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+    SamzaSqlRelMessage resultMsg = 
serde.fromBytes(serde.toBytes(messageRecordPair.getKey()));
+    nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg);
+    KV<Object, Object> samzaMessage = 
nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg);
+    GenericRecord recordPostConversion = (GenericRecord) 
samzaMessage.getValue();
+
+    for (Schema.Field field : Profile.SCHEMA$.getFields()) {
+      // equals() on GenericRecord does the nested record equality check as 
well.
+      Assert.assertEquals(recordPostConversion.get(field.name()), 
messageRecordPair.getValue().get(field.name()));
+    }
+  }
+
+  private Pair<SamzaSqlRelMessage, GenericData.Record> 
createNestedSamzaSqlRelMessage(
+      AvroRelConverter nestedRecordAvroRelConverter) {
+    GenericData.Record record = new GenericData.Record(Profile.SCHEMA$);
+    record.put("id", 1);
+    record.put("name", "name1");
+    record.put("companyId", 0);
+    GenericData.Record addressRecord = new 
GenericData.Record(AddressRecord.SCHEMA$);
+    addressRecord.put("zip", 90000);
+    record.put("address", addressRecord);
+    GenericData.Record streetNumRecord = new 
GenericData.Record(StreetNumRecord.SCHEMA$);
+    streetNumRecord.put("number", 1200);
+    addressRecord.put("streetnum", streetNumRecord);
+    return new Pair<>(nestedRecordAvroRelConverter.convertToRelMessage(new 
KV<>("key", record)), record);
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index 21c666b..61abdfc 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -48,8 +48,13 @@ import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
+import org.apache.samza.sql.avro.schemas.AddressRecord;
 import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.Kind;
+import org.apache.samza.sql.avro.schemas.PhoneNumber;
+import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
+import org.apache.samza.sql.avro.schemas.StreetNumRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.system.SystemStream;
 import org.junit.Assert;
@@ -63,8 +68,10 @@ public class TestAvroRelConversion {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestAvroRelConversion.class);
   private final AvroRelConverter simpleRecordAvroRelConverter;
   private final AvroRelConverter complexRecordAvroRelConverter;
+  private final AvroRelConverter nestedRecordAvroRelConverter;
   private final AvroRelSchemaProvider simpleRecordSchemaProvider;
-  private final AvroRelSchemaProvider complexRecordSchemProvider;
+  private final AvroRelSchemaProvider complexRecordSchemaProvider;
+  private final AvroRelSchemaProvider nestedRecordSchemaProvider;
 
   private int id = 1;
   private boolean boolValue = true;
@@ -85,19 +92,25 @@ public class TestAvroRelConversion {
     Map<String, String> props = new HashMap<>();
     SystemStream ss1 = new SystemStream("test", "complexRecord");
     SystemStream ss2 = new SystemStream("test", "simpleRecord");
+    SystemStream ss3 = new SystemStream("test", "nestedRecord");
     props.put(
         
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, 
ss1.getSystem(), ss1.getStream()),
         ComplexRecord.SCHEMA$.toString());
     props.put(
         
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, 
ss2.getSystem(), ss2.getStream()),
         SimpleRecord.SCHEMA$.toString());
+    props.put(
+        
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, 
ss3.getSystem(), ss3.getStream()),
+        Profile.SCHEMA$.toString());
 
     ConfigBasedAvroRelSchemaProviderFactory factory = new 
ConfigBasedAvroRelSchemaProviderFactory();
 
-    complexRecordSchemProvider = (AvroRelSchemaProvider) factory.create(ss1, 
new MapConfig(props));
+    complexRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss1, 
new MapConfig(props));
     simpleRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss2, 
new MapConfig(props));
-    complexRecordAvroRelConverter = new AvroRelConverter(ss1, 
complexRecordSchemProvider, new MapConfig());
+    nestedRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss3, 
new MapConfig(props));
+    complexRecordAvroRelConverter = new AvroRelConverter(ss1, 
complexRecordSchemaProvider, new MapConfig());
     simpleRecordAvroRelConverter = new AvroRelConverter(ss2, 
simpleRecordSchemaProvider, new MapConfig());
+    nestedRecordAvroRelConverter = new AvroRelConverter(ss3, 
nestedRecordSchemaProvider, new MapConfig());
   }
 
   @Test
@@ -119,7 +132,14 @@ public class TestAvroRelConversion {
 
   @Test
   public void testComplexSchemaConversion() {
-    RelDataType relSchema = complexRecordSchemProvider.getRelationalSchema();
+    RelDataType relSchema = complexRecordSchemaProvider.getRelationalSchema();
+
+    LOG.info("Relational schema " + relSchema);
+  }
+
+  @Test
+  public void testNestedSchemaConversion() {
+    RelDataType relSchema = nestedRecordSchemaProvider.getRelationalSchema();
 
     LOG.info("Relational schema " + relSchema);
   }
@@ -132,21 +152,23 @@ public class TestAvroRelConversion {
     record.put("name", "name1");
 
     SamzaSqlRelMessage message = 
simpleRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record));
-    LOG.info(Joiner.on(",").join(message.getFieldValues()));
-    LOG.info(Joiner.on(",").join(message.getFieldNames()));
+    
LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldValues()));
+    
LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames()));
   }
 
   @Test
   public void testEmptyRecordConversion() {
     GenericData.Record record = new GenericData.Record(SimpleRecord.SCHEMA$);
     SamzaSqlRelMessage message = 
simpleRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record));
-    Assert.assertEquals(message.getFieldNames().size(), 
message.getFieldValues().size());
+    Assert.assertEquals(message.getSamzaSqlRelRecord().getFieldNames().size(),
+        message.getSamzaSqlRelRecord().getFieldValues().size());
   }
 
   @Test
   public void testNullRecordConversion() {
     SamzaSqlRelMessage message = 
simpleRecordAvroRelConverter.convertToRelMessage(new KV<>("key", null));
-    Assert.assertEquals(message.getFieldNames().size(), 
message.getFieldValues().size());
+    Assert.assertEquals(message.getSamzaSqlRelRecord().getFieldNames().size(),
+        message.getSamzaSqlRelRecord().getFieldValues().size());
   }
 
   public static <T> byte[] encodeAvroSpecificRecord(Class<T> clazz, T record) 
throws IOException {
@@ -192,6 +214,57 @@ public class TestAvroRelConversion {
     validateAvroSerializedData(serializedData);
   }
 
+  @Test
+  public void testNestedRecordConversion() throws IOException {
+    GenericData.Record record = new GenericData.Record(Profile.SCHEMA$);
+    record.put("id", 1);
+    record.put("name", "name1");
+    record.put("companyId", 0);
+    GenericData.Record addressRecord = new 
GenericData.Record(AddressRecord.SCHEMA$);
+    addressRecord.put("zip", 90000);
+    GenericData.Record streetNumRecord = new 
GenericData.Record(StreetNumRecord.SCHEMA$);
+    streetNumRecord.put("number", 1200);
+    addressRecord.put("streetnum", streetNumRecord);
+    record.put("address", addressRecord);
+    record.put("selfEmployed", "True");
+
+
+    GenericData.Record phoneNumberRecordH = new 
GenericData.Record(PhoneNumber.SCHEMA$);
+    phoneNumberRecordH.put("kind", Kind.Home);
+    phoneNumberRecordH.put("number", "111-111-1111");
+    GenericData.Record phoneNumberRecordC = new 
GenericData.Record(PhoneNumber.SCHEMA$);
+    phoneNumberRecordC.put("kind", Kind.Cell);
+    phoneNumberRecordC.put("number", "111-111-1112");
+    List<GenericData.Record> phoneNumbers = new ArrayList<>();
+    phoneNumbers.add(phoneNumberRecordH);
+    phoneNumbers.add(phoneNumberRecordC);
+    record.put("phoneNumbers", phoneNumbers);
+
+    GenericData.Record simpleRecord1 = new 
GenericData.Record(SimpleRecord.SCHEMA$);
+    simpleRecord1.put("id", 1);
+    simpleRecord1.put("name", "name1");
+    GenericData.Record simpleRecord2 = new 
GenericData.Record(SimpleRecord.SCHEMA$);
+    simpleRecord2.put("id", 2);
+    simpleRecord2.put("name", "name2");
+    HashMap<String, IndexedRecord> mapValues = new HashMap<>();
+    mapValues.put("key1", simpleRecord1);
+    mapValues.put("key2", simpleRecord2);
+    record.put("mapValues", mapValues);
+
+    SamzaSqlRelMessage relMessage = 
nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record));
+
+    
LOG.info(Joiner.on(",").join(relMessage.getSamzaSqlRelRecord().getFieldValues()));
+    
LOG.info(Joiner.on(",").join(relMessage.getSamzaSqlRelRecord().getFieldNames()));
+
+    KV<Object, Object> samzaMessage = 
nestedRecordAvroRelConverter.convertToSamzaMessage(relMessage);
+    GenericRecord recordPostConversion = (GenericRecord) 
samzaMessage.getValue();
+
+    for (Schema.Field field : Profile.SCHEMA$.getFields()) {
+      // equals() on GenericRecord does the nested record equality check as 
well.
+      Assert.assertEquals(record.get(field.name()), 
recordPostConversion.get(field.name()));
+    }
+  }
+
   private static <T> T genericRecordFromBytes(byte[] bytes, Schema schema) 
throws IOException {
     BinaryDecoder binDecoder = 
DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
     GenericDatumReader<T> reader = new GenericDatumReader<>(schema);
@@ -212,29 +285,29 @@ public class TestAvroRelConversion {
   private void validateAvroSerializedData(byte[] serializedData) throws 
IOException {
     GenericRecord complexRecordValue = genericRecordFromBytes(serializedData, 
ComplexRecord.SCHEMA$);
 
-    String streamName = "stream";
-    RelDataType dataType = complexRecordSchemProvider.getRelationalSchema();
-
     SamzaSqlRelMessage message = 
complexRecordAvroRelConverter.convertToRelMessage(new KV<>("key", 
complexRecordValue));
-    Assert.assertEquals(message.getFieldNames().size(), 
ComplexRecord.SCHEMA$.getFields().size() + 1);
-
-    Assert.assertEquals(message.getField("id").get(), id);
-    Assert.assertEquals(message.getField("bool_value").get(), boolValue);
-    Assert.assertEquals(message.getField("double_value").get(), doubleValue);
-    Assert.assertEquals(message.getField("string_value").get(), new 
Utf8(testStrValue));
-    Assert.assertEquals(message.getField("float_value").get(), floatValue);
-    Assert.assertEquals(message.getField("long_value").get(), longValue);
+    Assert.assertEquals(message.getSamzaSqlRelRecord().getFieldNames().size(), 
ComplexRecord.SCHEMA$.getFields().size() + 1);
+
+    Assert.assertEquals(message.getSamzaSqlRelRecord().getField("id").get(), 
id);
+    
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("bool_value").get(),
 boolValue);
+    
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("double_value").get(),
 doubleValue);
+    
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("string_value").get(),
 new Utf8(testStrValue));
+    
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("float_value").get(),
 floatValue);
+    
Assert.assertEquals(message.getSamzaSqlRelRecord().getField("long_value").get(),
 longValue);
     Assert.assertTrue(
-        
arrayValue.stream().map(Utf8::new).collect(Collectors.toList()).equals(message.getField("array_values").get()));
+        arrayValue.stream()
+            .map(Utf8::new)
+            .collect(Collectors.toList())
+            
.equals(message.getSamzaSqlRelRecord().getField("array_values").get()));
     Assert.assertTrue(mapValue.entrySet()
         .stream()
         .collect(Collectors.toMap(x -> new Utf8(x.getKey()), y -> new 
Utf8(y.getValue())))
-        .equals(message.getField("map_values").get()));
+        .equals(message.getSamzaSqlRelRecord().getField("map_values").get()));
 
-    Assert.assertTrue(message.getField("bytes_value").get().equals(testBytes));
+    
Assert.assertTrue(message.getSamzaSqlRelRecord().getField("bytes_value").get().equals(testBytes));
 
-    LOG.info(Joiner.on(",").useForNull("null").join(message.getFieldValues()));
-    LOG.info(Joiner.on(",").join(message.getFieldNames()));
+    
LOG.info(Joiner.on(",").useForNull("null").join(message.getSamzaSqlRelRecord().getFieldValues()));
+    
LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames()));
 
     KV<Object, Object> samzaMessage = 
complexRecordAvroRelConverter.convertToSamzaMessage(message);
     GenericRecord record = (GenericRecord) samzaMessage.getValue();

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/AddressRecord.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/AddressRecord.java 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/AddressRecord.java
new file mode 100644
index 0000000..f94abca
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/AddressRecord.java
@@ -0,0 +1,52 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class AddressRecord extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"AddressRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"zip\",\"type\":[\"null\",\"int\"],\"doc\":\"zip
 
code.\",\"default\":null},{\"name\":\"streetnum\",\"type\":{\"type\":\"record\",\"name\":\"StreetNumRecord\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"doc\":\"street
 number.\",\"default\":null}]},\"doc\":\"Street Number\",\"default\":null}]}");
+  /** zip code. */
+  public java.lang.Integer zip;
+  /** Street Number */
+  public org.apache.samza.sql.avro.schemas.StreetNumRecord streetnum;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return zip;
+    case 1: return streetnum;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: zip = (java.lang.Integer)value$; break;
+    case 1: streetnum = 
(org.apache.samza.sql.avro.schemas.StreetNumRecord)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc
 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc
index 5117a5e..5edb3ec 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc
@@ -20,7 +20,7 @@
 {
     "name": "EnrichedPageView",
     "version" : 1,
-    "namespace": "org.apache.samza.sql.system.avro",
+    "namespace": "org.apache.samza.sql.avro.schemas",
     "type": "record",
     "fields": [
         {
@@ -40,6 +40,42 @@
             "doc" : "Profile name.",
             "type": ["null", "string"],
             "default":null
+        },
+        {
+            "name": "profileAddress",
+            "doc": "Profile Address",
+            "default":null,
+            "type": {
+                "name": "AddressRecord",
+                "namespace": "org.apache.samza.sql.avro.schemas",
+                "type": "record",
+                "fields": [
+                    {
+                        "name": "zip",
+                        "doc" : "zip code.",
+                        "type": ["null", "int"],
+                        "default":null
+                    },
+                    {
+                        "name": "streetnum",
+                        "doc": "Street Number",
+                        "default":null,
+                        "type": {
+                            "name": "StreetNumRecord",
+                            "namespace": "org.apache.samza.sql.avro.schemas",
+                            "type": "record",
+                            "fields": [
+                                {
+                                    "name": "number",
+                                    "doc": "street number.",
+                                    "type": ["null", "int"],
+                                    "default": null
+                                }
+                            ]
+                        }
+                    }
+                ]
+            }
         }
     ]
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java
index cf3f62d..820002b 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java
@@ -26,13 +26,15 @@ package org.apache.samza.sql.avro.schemas;
 
 @SuppressWarnings("all")
 public class EnrichedPageView extends 
org.apache.avro.specific.SpecificRecordBase implements 
org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"EnrichedPageView\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"pageKey\",\"type\":[\"null\",\"string\"],\"doc\":\"Page
 
key.\",\"default\":null},{\"name\":\"companyName\",\"type\":[\"null\",\"string\"],\"doc\":\"Company
 
name.\",\"default\":null},{\"name\":\"profileName\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile
 name.\",\"default\":null}]}");
+  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"EnrichedPageView\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"pageKey\",\"type\":[\"null\",\"string\"],\"doc\":\"Page
 
key.\",\"default\":null},{\"name\":\"companyName\",\"type\":[\"null\",\"string\"],\"doc\":\"Company
 
name.\",\"default\":null},{\"name\":\"profileName\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile
 
name.\",\"default\":null},{\"name\":\"profileAddress\",\"type\":{\"type\":\"record\",\"name\":\"AddressRecord\",\"fields\":[{\"name\":\"zip\",\"type\":[\"null\",\"int\"],\"doc\":\"zip
 
code.\",\"default\":null},{\"name\":\"streetnum\",\"type\":{\"type\":\"record\",\"name\":\"StreetNumRecord\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"doc\":\"street
 number.\",\"default\":null}]},\"doc\":\"Street 
Number\",\"default\":null}]},\"doc\":\"Profile Address\",\"default\":null}]}");
   /** Page key. */
   public java.lang.CharSequence pageKey;
   /** Company name. */
   public java.lang.CharSequence companyName;
   /** Profile name. */
   public java.lang.CharSequence profileName;
+  /** Profile Address */
+  public org.apache.samza.sql.avro.schemas.AddressRecord profileAddress;
   public org.apache.avro.Schema getSchema() { return SCHEMA$; }
   // Used by DatumWriter.  Applications should not call.
   public java.lang.Object get(int field$) {
@@ -40,6 +42,7 @@ public class EnrichedPageView extends 
org.apache.avro.specific.SpecificRecordBas
     case 0: return pageKey;
     case 1: return companyName;
     case 2: return profileName;
+    case 3: return profileAddress;
     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }
@@ -50,6 +53,7 @@ public class EnrichedPageView extends 
org.apache.avro.specific.SpecificRecordBas
     case 0: pageKey = (java.lang.CharSequence)value$; break;
     case 1: companyName = (java.lang.CharSequence)value$; break;
     case 2: profileName = (java.lang.CharSequence)value$; break;
+    case 3: profileAddress = 
(org.apache.samza.sql.avro.schemas.AddressRecord)value$; break;
     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Kind.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Kind.java 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Kind.java
new file mode 100644
index 0000000..8cfcce2
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Kind.java
@@ -0,0 +1,30 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public enum Kind {
+  Home, Work, Cell
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PhoneNumber.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PhoneNumber.java 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PhoneNumber.java
new file mode 100644
index 0000000..e9ced19
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PhoneNumber.java
@@ -0,0 +1,50 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class PhoneNumber extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"PhoneNumber\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"kind\",\"type\":{\"type\":\"enum\",\"name\":\"Kind\",\"symbols\":[\"Home\",\"Work\",\"Cell\"]}},{\"name\":\"number\",\"type\":\"string\"}]}");
+  public org.apache.samza.sql.avro.schemas.Kind kind;
+  public java.lang.CharSequence number;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return kind;
+    case 1: return number;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: kind = (org.apache.samza.sql.avro.schemas.Kind)value$; break;
+    case 1: number = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc
index 4e5e7dc..f07dd75 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc
@@ -20,7 +20,7 @@
 {
     "name": "Profile",
     "version" : 1,
-    "namespace": "org.apache.samza.sql.system.avro",
+    "namespace": "org.apache.samza.sql.avro.schemas",
     "type": "record",
     "fields": [
         {
@@ -40,6 +40,110 @@
             "doc" : "Company id.",
             "type": ["null", "int"],
             "default":null
+        },
+        {
+            "name": "address",
+            "doc": "Profile Address",
+            "default":null,
+            "type": {
+                "name": "AddressRecord",
+                "namespace": "org.apache.samza.sql.avro.schemas",
+                "type": "record",
+                "fields": [
+                    {
+                        "name": "zip",
+                        "doc" : "zip code.",
+                        "type": ["null", "int"],
+                        "default":null
+                    },
+                    {
+                        "name": "streetnum",
+                        "doc": "Street Number",
+                        "default":null,
+                        "type": {
+                            "name": "StreetNumRecord",
+                            "namespace": "org.apache.samza.sql.avro.schemas",
+                            "type": "record",
+                            "fields": [
+                                {
+                                    "name": "number",
+                                    "doc": "street number.",
+                                    "type": ["null", "int"],
+                                    "default": null
+                                }
+                            ]
+                        }
+                    }
+                ]
+            }
+        },
+        {
+            "name": "selfEmployed",
+            "doc": "Boolean Value.",
+            "type": ["null", "boolean"],
+            "default":null
+        },
+        {
+            "name": "phoneNumbers",
+            "doc" : "array values in the record.",
+            "default": null,
+            "type": [ "null",
+                {
+                    "type": "array",
+                    "items":{
+                        "name":"PhoneNumber",
+                        "type":"record",
+                        "fields":[
+                            {
+                                "name": "kind",
+                                "type": {
+                                    "name": "Kind",
+                                    "type": "enum",
+                                    "symbols" : ["Home", "Work", "Cell"]
+                                }
+                            },
+                            {"name":"number", "type":"string"}
+                        ]
+                    }
+                }
+            ]
+        },
+        {
+            "name": "mapValues",
+            "doc" : "map values in the record.",
+            "default": null,
+            "type": [ "null",
+                {
+                    "type": "map",
+                    "values":[
+                        {
+                            "name": "SimpleRecord",
+                            "version" : 1,
+                            "type": "record",
+                            "fields": [
+                                {
+                                    "name": "id",
+                                    "doc": "Record id.",
+                                    "type": [
+                                        "null",
+                                        "int"
+                                    ],
+                                    "default": null
+                                },
+                                {
+                                    "name": "name",
+                                    "doc": "Some name.",
+                                    "type": [
+                                        "null",
+                                        "string"
+                                    ],
+                                    "default": null
+                                }
+                            ]
+                        }
+                    ]
+                }
+            ]
         }
     ]
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java
index b5c1828..5c72930 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java
@@ -26,13 +26,21 @@ package org.apache.samza.sql.avro.schemas;
 
 @SuppressWarnings("all")
 public class Profile extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Profile\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Profile
 
id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile
 
name.\",\"default\":null},{\"name\":\"companyId\",\"type\":[\"null\",\"int\"],\"doc\":\"Company
 id.\",\"default\":null}]}");
+  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Profile\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Profile
 
id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile
 
name.\",\"default\":null},{\"name\":\"companyId\",\"type\":[\"null\",\"int\"],\"doc\":\"Company
 
id.\",\"default\":null},{\"name\":\"address\",\"type\":{\"type\":\"record\",\"name\":\"AddressRecord\",\"fields\":[{\"name\":\"zip\",\"type\":[\"null\",\"int\"],\"doc\":\"zip
 
code.\",\"default\":null},{\"name\":\"streetnum\",\"type\":{\"type\":\"record\",\"name\":\"StreetNumRecord\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"doc\":\"street
 number.\",\"default\":null}]},\"doc\":\"Street 
Number\",\"default\":null}]},\"doc\":\"Profile 
Address\",\"default\":null},{\"name\":\"selfEmployed\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Bo
 olean 
Value.\",\"default\":null},{\"name\":\"phoneNumbers\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"PhoneNumber\",\"fields\":[{\"name\":\"kind\",\"type\":{\"type\":\"enum\",\"name\":\"Kind\",\"symbols\":[\"Home\",\"Work\",\"Cell\"]}},{\"name\":\"number\",\"type\":\"string\"}]}}],\"doc\":\"array
 values in the 
record.\",\"default\":null},{\"name\":\"mapValues\",\"type\":[\"null\",{\"type\":\"map\",\"values\":[{\"type\":\"record\",\"name\":\"SimpleRecord\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record
 
id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Some
 name.\",\"default\":null}]}]}],\"doc\":\"map values in the 
record.\",\"default\":null}]}");
   /** Profile id. */
   public java.lang.Integer id;
   /** Profile name. */
   public java.lang.CharSequence name;
   /** Company id. */
   public java.lang.Integer companyId;
+  /** Profile Address */
+  public org.apache.samza.sql.avro.schemas.AddressRecord address;
+  /** Boolean Value. */
+  public java.lang.Boolean selfEmployed;
+  /** array values in the record. */
+  public java.util.List<org.apache.samza.sql.avro.schemas.PhoneNumber> 
phoneNumbers;
+  /** map values in the record. */
+  public java.util.Map<java.lang.CharSequence,java.lang.Object> mapValues;
   public org.apache.avro.Schema getSchema() { return SCHEMA$; }
   // Used by DatumWriter.  Applications should not call.
   public java.lang.Object get(int field$) {
@@ -40,6 +48,10 @@ public class Profile extends 
org.apache.avro.specific.SpecificRecordBase impleme
     case 0: return id;
     case 1: return name;
     case 2: return companyId;
+    case 3: return address;
+    case 4: return selfEmployed;
+    case 5: return phoneNumbers;
+    case 6: return mapValues;
     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }
@@ -50,6 +62,10 @@ public class Profile extends 
org.apache.avro.specific.SpecificRecordBase impleme
     case 0: id = (java.lang.Integer)value$; break;
     case 1: name = (java.lang.CharSequence)value$; break;
     case 2: companyId = (java.lang.Integer)value$; break;
+    case 3: address = (org.apache.samza.sql.avro.schemas.AddressRecord)value$; 
break;
+    case 4: selfEmployed = (java.lang.Boolean)value$; break;
+    case 5: phoneNumbers = 
(java.util.List<org.apache.samza.sql.avro.schemas.PhoneNumber>)value$; break;
+    case 6: mapValues = 
(java.util.Map<java.lang.CharSequence,java.lang.Object>)value$; break;
     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc
index 6b010a4..2316246 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc
@@ -20,7 +20,7 @@
 {
     "name": "SimpleRecord",
     "version" : 1,
-    "namespace": "org.apache.samza.sql.system.avro",
+    "namespace": "org.apache.samza.sql.avro.schemas",
     "type": "record",
     "fields": [
         {

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java
index baf3812..23cfb7d 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java
@@ -26,7 +26,7 @@ package org.apache.samza.sql.avro.schemas;
 
 @SuppressWarnings("all")
 public class SimpleRecord extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"SimpleRecord\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record
 
id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Some
 name.\",\"default\":null}]}");
+  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"SimpleRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record
 
id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Some
 name.\",\"default\":null}]}");
   /** Record id. */
   public java.lang.Integer id;
   /** Some name. */

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/StreetNumRecord.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/StreetNumRecord.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/StreetNumRecord.java
new file mode 100644
index 0000000..aca20e1
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/StreetNumRecord.java
@@ -0,0 +1,48 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.sql.avro.schemas;
+
+@SuppressWarnings("all")
+public class StreetNumRecord extends 
org.apache.avro.specific.SpecificRecordBase implements 
org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"StreetNumRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"doc\":\"street
 number.\",\"default\":null}]}");
+  /** street number. */
+  public java.lang.Integer number;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return number;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: number = (java.lang.Integer)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+}

Reply via email to