SAMZA-1619: Samza-sql: Support serialization of nested samza sql relational message
Added support for serialization of nested samza sql rel message and accordingly fixed the conversion of nested avro records to rel message. Please note that we still do not have support for the sql queries that point to fields in nested records (beyond the top level record). Author: Aditya Toomula <atoom...@linkedin.com> Author: Aditya Toomula <atoom...@atoomula-ld1.linkedin.biz> Reviewers: Srinivasulu P<spun...@linkedin.com> Closes #464 from atoomula/serde Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/58c39e37 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/58c39e37 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/58c39e37 Branch: refs/heads/master Commit: 58c39e37121c9e11a5266cf7eaa3aaf6560ac6c9 Parents: bf2a2f7 Author: Aditya Toomula <atoom...@linkedin.com> Authored: Mon Apr 16 10:26:51 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Mon Apr 16 10:26:51 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/sql/avro/AvroRelConverter.java | 228 +++++++++++-------- .../samza/sql/avro/AvroTypeFactoryImpl.java | 9 +- .../samza/sql/data/SamzaSqlCompositeKey.java | 2 +- .../samza/sql/data/SamzaSqlRelMessage.java | 135 +++++++---- .../SamzaSqlRelMessageSerdeFactory.java | 67 ++++++ .../samza/sql/translator/FilterTranslator.java | 6 +- .../samza/sql/translator/JoinTranslator.java | 7 +- .../samza/sql/translator/ProjectTranslator.java | 9 +- .../SamzaSqlRelMessageJoinFunction.java | 6 +- .../samza/sql/TestSamzaSqlRelMessage.java | 6 +- .../sql/TestSamzaSqlRelMessageJoinFunction.java | 15 +- .../samza/sql/TestSamzaSqlRelMessageSerde.java | 68 +++++- .../samza/sql/avro/TestAvroRelConversion.java | 121 ++++++++-- .../samza/sql/avro/schemas/AddressRecord.java | 52 +++++ .../sql/avro/schemas/EnrichedPageView.avsc | 38 +++- .../sql/avro/schemas/EnrichedPageView.java | 6 +- .../org/apache/samza/sql/avro/schemas/Kind.java | 30 +++ .../samza/sql/avro/schemas/PhoneNumber.java | 50 ++++ .../apache/samza/sql/avro/schemas/Profile.avsc | 106 ++++++++- .../apache/samza/sql/avro/schemas/Profile.java | 18 +- .../samza/sql/avro/schemas/SimpleRecord.avsc | 2 +- .../samza/sql/avro/schemas/SimpleRecord.java | 2 +- .../samza/sql/avro/schemas/StreetNumRecord.java | 48 ++++ .../samza/sql/system/TestAvroSystemFactory.java | 74 +++++- .../test/samzasql/TestSamzaSqlEndToEnd.java | 34 +++ .../tools/avro/AvroSchemaGenRelConverter.java | 4 +- .../tools/json/JsonRelConverterFactory.java | 4 +- 27 files changed, 941 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java index e247415..5793d6e 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java @@ -29,7 +29,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.calcite.rel.type.RelDataType; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; @@ -39,6 +38,8 @@ import org.apache.samza.system.SystemStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord; + /** * This class converts a Samza Avro messages to Relational messages and vice versa. @@ -48,141 +49,186 @@ import org.slf4j.LoggerFactory; * The key part of the samza message is represented as a special column {@link SamzaSqlRelMessage#KEY_NAME} * in relational message. * - * The value part of the samza message is expected to be {@link IndexedRecord}, All the fields in the IndexedRecord form - * the corresponding fields of the relational message. + * The value part of the samza message is expected to be {@link IndexedRecord}, All the fields in the IndexedRecord + * form the corresponding fields of the relational message. * * Conversion from Relational to Samza Message : * This converts the Samza relational message into Avro {@link GenericRecord}. - * All the fields of the relational message is become fields of the Avro GenericRecord except of the field with name + * All the fields of the relational message become fields of the Avro GenericRecord except the field with name * {@link SamzaSqlRelMessage#KEY_NAME}. This special field becomes the Key in the output Samza message. */ public class AvroRelConverter implements SamzaRelConverter { protected final Config config; private final Schema avroSchema; - private final RelDataType relationalSchema; - - /** - * Class that converts the avro field to their corresponding relational fields - * Array fields are converted from Avro {@link org.apache.avro.generic.GenericData.Array} to {@link ArrayList} - */ - public enum AvroToRelObjConverter { - - /** - * If the relational field type is ArraySqlType, We expect the avro field to be of type either - * {@link GenericData.Array} or {@link List} which then is converted to Rel field of type {@link ArrayList} - */ - ArraySqlType { - @Override - Object convert(Object avroObj) { - ArrayList<Object> retVal = new ArrayList<>(); - if (avroObj != null) { - if (avroObj instanceof GenericData.Array) { - retVal.addAll(((GenericData.Array) avroObj)); - } else if (avroObj instanceof List) { - retVal.addAll((List) avroObj); - } - } - - return retVal; - } - }, - - /** - * If the relational field type is MapSqlType, We expect the avro field to be of type - * {@link Map} - */ - MapSqlType { - @Override - Object convert(Object obj) { - Map<String, Object> retVal = new HashMap<>(); - if (obj != null) { - retVal.putAll((Map<String, ?>) obj); - } - return retVal; - } - }, - - /** - * If the relational field type is RelRecordType, The field is considered an object - * and moved to rel field without any translation. - */ - RelRecordType { - @Override - Object convert(Object obj) { - return obj; - } - }, - - /** - * If the relational field type is BasicSqlType, The field is moved to rel field without any translation. - */ - BasicSqlType { - @Override - Object convert(Object obj) { - return obj; - } - }; - - abstract Object convert(Object obj); - } private static final Logger LOG = LoggerFactory.getLogger(AvroRelConverter.class); - private final Schema arraySchema = Schema.parse( - "{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]},\"java-class\":\"java.util.List\"}"); - private final Schema mapSchema = Schema.parse( - "{\"type\":\"map\",\"values\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]}}"); - public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) { this.config = config; - this.relationalSchema = schemaProvider.getRelationalSchema(); this.avroSchema = Schema.parse(schemaProvider.getSchema(systemStream)); } + /** + * Converts the nested avro object in SamzaMessage to relational message corresponding to + * the tableName with relational schema. + */ @Override public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> samzaMessage) { - List<Object> values = new ArrayList<>(); + List<Object> fieldValues = new ArrayList<>(); List<String> fieldNames = new ArrayList<>(); Object value = samzaMessage.getValue(); if (value instanceof IndexedRecord) { IndexedRecord record = (IndexedRecord) value; - fieldNames.addAll(relationalSchema.getFieldNames()); - values.addAll(relationalSchema.getFieldList() - .stream() - .map(x -> getRelField(x.getType(), record.get(this.avroSchema.getField(x.getName()).pos()))) + fieldNames.addAll(avroSchema.getFields().stream() + .map(Schema.Field::name) + .collect(Collectors.toList())); + fieldValues.addAll(fieldNames.stream() + .map(f -> convertToJavaObject(record.get(avroSchema.getField(f).pos()), avroSchema.getField(f).schema())) .collect(Collectors.toList())); } else if (value == null) { - fieldNames.addAll(relationalSchema.getFieldNames()); - IntStream.range(0, fieldNames.size()).forEach(x -> values.add(null)); + fieldNames.addAll(avroSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList())); + IntStream.range(0, fieldNames.size()).forEach(x -> fieldValues.add(null)); } else { String msg = "Avro message converter doesn't support messages of type " + value.getClass(); LOG.error(msg); throw new SamzaException(msg); } - return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, values); + return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, fieldValues); + } + + private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) { + List<Object> values = new ArrayList<>(); + List<String> fieldNames = new ArrayList<>(); + if (avroRecord != null) { + fieldNames.addAll(avroRecord.getSchema().getFields() + .stream() + .map(Schema.Field::name) + .collect(Collectors.toList())); + values.addAll(avroRecord.getSchema().getFields() + .stream() + .map(f -> convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()), + avroRecord.getSchema().getField(f.name()).schema())) + .collect(Collectors.toList())); + } else { + String msg = "Avro Record is null"; + LOG.error(msg); + throw new SamzaException(msg); + } + + return new SamzaSqlRelRecord(fieldNames, values); } + /** + * Convert the nested relational message to the output samza message. + */ @Override public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) { return convertToSamzaMessage(relMessage, this.avroSchema); } protected KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage, Schema avroSchema) { - GenericRecord record = new GenericData.Record(avroSchema); - List<String> fieldNames = relMessage.getFieldNames(); - List<Object> values = relMessage.getFieldValues(); + return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), avroSchema)); + } + + private GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, Schema schema) { + GenericRecord record = new GenericData.Record(schema); + List<String> fieldNames = relRecord.getFieldNames(); + List<Object> values = relRecord.getFieldValues(); for (int index = 0; index < fieldNames.size(); index++) { if (!fieldNames.get(index).equalsIgnoreCase(SamzaSqlRelMessage.KEY_NAME)) { - record.put(fieldNames.get(index), values.get(index)); + Object relObj = values.get(index); + String fieldName = fieldNames.get(index); + Schema fieldSchema = schema.getField(fieldName).schema(); + record.put(fieldName, convertToAvroObject(relObj, fieldSchema)); } } + return record; + } - return new KV<>(relMessage.getKey(), record); + private Object convertToAvroObject(Object relObj, Schema schema) { + if (relObj == null) { + return null; + } + switch(schema.getType()) { + case RECORD: + return convertToGenericRecord((SamzaSqlRelRecord) relObj, getNonNullUnionSchema(schema)); + case ARRAY: + List<Object> avroList = ((List<Object>) relObj).stream() + .map(o -> convertToAvroObject(o, getNonNullUnionSchema(schema).getElementType())) + .collect(Collectors.toList()); + return avroList; + case MAP: + return ((Map<String, ?>) relObj).entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> convertToAvroObject(e.getValue(), + getNonNullUnionSchema(schema).getValueType()))); + case UNION: + return convertToAvroObject(relObj, getNonNullUnionSchema(schema)); + default: + return relObj; + } } - private Object getRelField(RelDataType relType, Object avroObj) { - return AvroToRelObjConverter.valueOf(relType.getClass().getSimpleName()).convert(avroObj); + // Not doing any validations of data types with Avro schema considering the resource cost per message. + // Casting would fail if the data types are not in sync with the schema. + private Object convertToJavaObject(Object avroObj, Schema schema) { + switch(schema.getType()) { + case RECORD: + if (avroObj == null) { + return null; + } + return convertToRelRecord((IndexedRecord) avroObj); + case ARRAY: { + ArrayList<Object> retVal = new ArrayList<>(); + if (avroObj != null) { + List<Object> avroArray = null; + if (avroObj instanceof GenericData.Array) { + avroArray = (GenericData.Array) avroObj; + } else if (avroObj instanceof List) { + avroArray = (List) avroObj; + } + + if (avroArray != null) { + retVal.addAll( + avroArray.stream() + .map(v -> convertToJavaObject(v, getNonNullUnionSchema(schema).getElementType())) + .collect(Collectors.toList())); + } + } + return retVal; + } + case MAP: { + Map<String, Object> retVal = new HashMap<>(); + if (avroObj != null) { + retVal.putAll(((Map<String, ?>) avroObj).entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> convertToJavaObject(e.getValue(), getNonNullUnionSchema(schema).getValueType())))); + } + return retVal; + } + case UNION: + if (avroObj == null) { + return null; + } + return convertToJavaObject(avroObj, getNonNullUnionSchema(schema)); + default: + return avroObj; + } + } + + // Two non-nullable types in a union is not yet supported. + private Schema getNonNullUnionSchema(Schema schema) { + if (schema.getType().equals(Schema.Type.UNION)) { + if (schema.getTypes().get(0).getType() != Schema.Type.NULL) { + return schema.getTypes().get(0); + } + if (schema.getTypes().get(1).getType() != Schema.Type.NULL) { + return schema.getTypes().get(1); + } + } + return schema; } } http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java index 74e15e9..288fdc4 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java @@ -28,11 +28,9 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeFieldImpl; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rel.type.RelRecordType; -import org.apache.calcite.sql.type.ArraySqlType; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.samza.SamzaException; -import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +55,10 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl { throw new SamzaException(msg); } + return convertRecordType(schema); + } + + private RelDataType convertRecordType(Schema schema) { List<RelDataTypeField> relFields = getRelFields(schema.getFields()); return new RelRecordType(relFields); } @@ -101,8 +103,7 @@ public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl { case LONG: return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), true); case RECORD: -// List<RelDataTypeField> relFields = getRelFields(fieldSchema); -// return new RelRecordType(relFields); + // return createTypeWithNullability(convertRecordType(fieldSchema), true); // TODO Calcite execution engine doesn't support record type yet. return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true); case MAP: http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java index f646d9a..54c8391 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java @@ -74,7 +74,7 @@ public class SamzaSqlCompositeKey implements Serializable { public static SamzaSqlCompositeKey createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) { ArrayList<Object> keyParts = new ArrayList<>(); for (int idx : relIdx) { - keyParts.add(message.getFieldValues().get(idx)); + keyParts.add(message.getSamzaSqlRelRecord().getFieldValues().get(idx)); } return new SamzaSqlCompositeKey(keyParts); } http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java index b54634f..9bf1870 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java @@ -23,17 +23,20 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.commons.lang.Validate; +import org.apache.samza.SamzaException; import org.codehaus.jackson.annotate.JsonProperty; /** * Samza sql relational message. Each Samza sql relational message represents a relational row in a table. - * Each row of the relational table and hence SamzaSqlRelMessage consists of list of column values and - * their associated column names. Right now we donot store any other metadata other than the column name in the - * SamzaSqlRelationalMessage, In future if we find a need, we could add additional column ddl metadata around - * primary Key, nullability, etc. - * TODO: SAMZA-1619 Support serialization of nested SamzaSqlRelMessage. + * Each row of the relational table consists of a primary key and {@link SamzaSqlRelRecord}, which consists of a list + * of column values and the associated column names. */ public class SamzaSqlRelMessage implements Serializable { @@ -41,36 +44,29 @@ public class SamzaSqlRelMessage implements Serializable { private final Object key; - @JsonProperty("fieldNames") - private final List<String> fieldNames; - @JsonProperty("fieldValues") - private final List<Object> fieldValues; + @JsonProperty("samzaSqlRelRecord") + private final SamzaSqlRelRecord samzaSqlRelRecord; /** * Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values. - * If the field list contains KEY, then it extracts the key out of the fields to creates a - * RelMessage with key and values otherwise creates a Relmessage without the key. + * If the field list contains KEY, then it extracts the key out of the fields to create a + * {@link SamzaSqlRelRecord} along with key, otherwise creates a {@link SamzaSqlRelRecord} + * without the key. * @param fieldNames Ordered list of field names in the row. - * @param fieldValues Ordered list of all the values in the row. Some of the fields can be null, This could be result of - * delete change capture event in the stream or because of the result of the outer join or the fields - * themselves are null in the original stream. + * @param fieldValues Ordered list of all the values in the row. Some of the fields can be null, This could be + * result of delete change capture event in the stream or because of the result of the outer join + * or the fields themselves are null in the original stream. */ - public SamzaSqlRelMessage(@JsonProperty("fieldNames") List<String> fieldNames, - @JsonProperty("fieldValues") List<Object> fieldValues) { + public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues) { Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length."); - this.fieldNames = new ArrayList<>(); - this.fieldValues = new ArrayList<>(); - int keyIndex = fieldNames.indexOf(KEY_NAME); Object key = null; if (keyIndex != -1) { key = fieldValues.get(keyIndex); } this.key = key; - - this.fieldNames.addAll(fieldNames); - this.fieldValues.addAll(fieldValues); + this.samzaSqlRelRecord = new SamzaSqlRelRecord(fieldNames, fieldValues); } /** @@ -85,29 +81,29 @@ public class SamzaSqlRelMessage implements Serializable { public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> fieldValues) { Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length."); - this.fieldNames = new ArrayList<>(); - this.fieldValues = new ArrayList<>(); + List<String> tmpFieldNames = new ArrayList<>(); + List<Object> tmpFieldValues = new ArrayList<>(); this.key = key; - this.fieldNames.add(KEY_NAME); - this.fieldValues.add(key); + tmpFieldNames.add(KEY_NAME); + tmpFieldValues.add(key); - this.fieldNames.addAll(fieldNames); - this.fieldValues.addAll(fieldValues); + tmpFieldNames.addAll(fieldNames); + tmpFieldValues.addAll(fieldValues); + + this.samzaSqlRelRecord = new SamzaSqlRelRecord(tmpFieldNames, tmpFieldValues); } /** - * Get the field names of all the columns in the relational message. - * @return the field names of all columns. + * Creates the SamzaSqlRelMessage from {@link SamzaSqlRelRecord}. */ - @JsonProperty("fieldNames") - public List<String> getFieldNames() { - return fieldNames; + public SamzaSqlRelMessage(@JsonProperty("samzaSqlRelRecord") SamzaSqlRelRecord samzaSqlRelRecord) { + this(samzaSqlRelRecord.getFieldNames(), samzaSqlRelRecord.getFieldValues()); } - @JsonProperty("fieldValues") - public List<Object> getFieldValues() { - return this.fieldValues; + @JsonProperty("samzaSqlRelRecord") + public SamzaSqlRelRecord getSamzaSqlRelRecord() { + return samzaSqlRelRecord; } public Object getKey() { @@ -115,17 +111,66 @@ public class SamzaSqlRelMessage implements Serializable { } /** - * Get the value of the field corresponding to the field name. - * @param name Name of the field. - * @return returns the value of the field. + * Samza sql relational record. A record consists of list of column values and the associated column names. + * A column value could be nested, meaning, it could be another SamzaSqlRelRecord. + * Right now we do not store any metadata (like nullability, etc) other than the column name in the SamzaSqlRelRecord. */ - public Optional<Object> getField(String name) { - for (int index = 0; index < fieldNames.size(); index++) { - if (fieldNames.get(index).equals(name)) { - return Optional.ofNullable(fieldValues.get(index)); - } + public static class SamzaSqlRelRecord implements Serializable { + + @JsonProperty("fieldNames") + private final List<String> fieldNames; + @JsonProperty("fieldValues") + private final List<Object> fieldValues; + + /** + * Creates a {@link SamzaSqlRelRecord} from the list of relational fields and values. + * @param fieldNames Ordered list of field names in the row. + * @param fieldValues Ordered list of all the values in the row. Some of the fields can be null. This could be + * result of delete change capture event in the stream or because of the result of the outer + * join or the fields themselves are null in the original stream. + */ + public SamzaSqlRelRecord(@JsonProperty("fieldNames") List<String> fieldNames, + @JsonProperty("fieldValues") List<Object> fieldValues) { + Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length."); + + this.fieldNames = new ArrayList<>(); + this.fieldValues = new ArrayList<>(); + + this.fieldNames.addAll(fieldNames); + this.fieldValues.addAll(fieldValues); } - return Optional.empty(); + /** + * Get the field names of all the columns in the relational message. + * @return the field names of all columns. + */ + @JsonProperty("fieldNames") + public List<String> getFieldNames() { + return this.fieldNames; + } + + /** + * Get the field values of all the columns in the relational message. + * @return the field values of all columns. + */ + @JsonProperty("fieldValues") + public List<Object> getFieldValues() { + return this.fieldValues; + } + + /** + * Get the value of the field corresponding to the field name. + * @param name Name of the field. + * @return returns the value of the field. + */ + public Optional<Object> getField(String name) { + for (int index = 0; index < fieldNames.size(); index++) { + if (fieldNames.get(index).equals(name)) { + return Optional.ofNullable(fieldValues.get(index)); + } + } + + return Optional.empty(); + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java new file mode 100644 index 0000000..45542ca --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java @@ -0,0 +1,67 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.serializers; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + + +/** + * A serializer for {@link SamzaSqlRelMessage}. This serializer preserves the type information as + * {@link SamzaSqlRelMessage} contains nested {@link org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord} + * records. + */ +public final class SamzaSqlRelMessageSerdeFactory implements SerdeFactory<SamzaSqlRelMessage> { + public Serde<SamzaSqlRelMessage> getSerde(String name, Config config) { + return new SamzaSqlRelMessageSerde(); + } + + public final static class SamzaSqlRelMessageSerde implements Serde<SamzaSqlRelMessage> { + + @Override + public SamzaSqlRelMessage fromBytes(byte[] bytes) { + try { + ObjectMapper mapper = new ObjectMapper(); + // Enable object typing to handle nested records + mapper.enableDefaultTyping(); + return mapper.readValue(new String(bytes, "UTF-8"), new TypeReference<SamzaSqlRelMessage>() {}); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @Override + public byte[] toBytes(SamzaSqlRelMessage p) { + try { + ObjectMapper mapper = new ObjectMapper(); + // Enable object typing to handle nested records + mapper.enableDefaultTyping(); + return mapper.writeValueAsString(p).getBytes("UTF-8"); + } catch (Exception e) { + throw new SamzaException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java index 798f0b3..5832b21 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java @@ -55,11 +55,13 @@ class FilterTranslator { return inputStream.filter(message -> { Object[] result = new Object[1]; - expr.execute(context.getExecutionContext(), context.getDataContext(), message.getFieldValues().toArray(), result); + expr.execute(context.getExecutionContext(), context.getDataContext(), + message.getSamzaSqlRelRecord().getFieldValues().toArray(), result); if (result.length > 0 && result[0] instanceof Boolean) { boolean retVal = (Boolean) result[0]; log.debug( - String.format("return value for input %s is %s", Arrays.asList(message.getFieldValues()).toString(), retVal)); + String.format("return value for input %s is %s", + Arrays.asList(message.getSamzaSqlRelRecord().getFieldValues()).toString(), retVal)); return retVal; } else { log.error("return value is not boolean"); http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java index 70c1968..899ca41 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java @@ -43,12 +43,14 @@ import org.apache.samza.serializers.Serde; import org.apache.samza.sql.data.SamzaSqlCompositeKey; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SourceResolver; +import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; import org.apache.samza.storage.kv.RocksDbTableDescriptor; import org.apache.samza.table.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*; +import static org.apache.samza.sql.data.SamzaSqlCompositeKey.createSamzaSqlCompositeKey; +import static org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde; /** @@ -94,7 +96,8 @@ class JoinTranslator { tableKeyIds); JsonSerdeV2<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class); - JsonSerdeV2<SamzaSqlRelMessage> relMsgSerde = new JsonSerdeV2<>(SamzaSqlRelMessage.class); + SamzaSqlRelMessageSerde relMsgSerde = + (SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); Table table = loadLocalTable(isTablePosOnRight, tableKeyIds, keySerde, relMsgSerde, join, context); http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java index 0f31fb6..a0bd45f 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java @@ -66,7 +66,8 @@ class ProjectTranslator { MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(m -> { RelDataType type = project.getRowType(); Object[] output = new Object[type.getFieldCount()]; - expr.execute(context.getExecutionContext(), context.getDataContext(), m.getFieldValues().toArray(), output); + expr.execute(context.getExecutionContext(), context.getDataContext(), + m.getSamzaSqlRelRecord().getFieldValues().toArray(), output); List<String> names = new ArrayList<>(); for (int index = 0; index < output.length; index++) { names.add(index, project.getNamedProjects().get(index).getValue()); @@ -81,14 +82,14 @@ class ProjectTranslator { private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer flattenIndex, MessageStream<SamzaSqlRelMessage> inputStream) { return inputStream.flatMap(message -> { - Object field = message.getFieldValues().get(flattenIndex); + Object field = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex); if (field != null && field instanceof List) { List<SamzaSqlRelMessage> outMessages = new ArrayList<>(); for (Object fieldValue : (List) field) { - List<Object> newValues = new ArrayList<>(message.getFieldValues()); + List<Object> newValues = new ArrayList<>(message.getSamzaSqlRelRecord().getFieldValues()); newValues.set(flattenIndex, Collections.singletonList(fieldValue)); - outMessages.add(new SamzaSqlRelMessage(message.getFieldNames(), newValues)); + outMessages.add(new SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues)); } return outMessages; } else { http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java index 69e4e09..df88a7c 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java @@ -85,12 +85,12 @@ public class SamzaSqlRelMessageJoinFunction // If table position is on the right, add the stream message fields first if (isTablePosOnRight) { - outFieldValues.addAll(message.getFieldValues()); + outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues()); } // Add the table record fields. if (record != null) { - outFieldValues.addAll(record.getValue().getFieldValues()); + outFieldValues.addAll(record.getValue().getSamzaSqlRelRecord().getFieldValues()); } else { // Table record could be null as the record could not be found in the store. This can // happen for outer joins. Add nulls to all the field values in the output message. @@ -99,7 +99,7 @@ public class SamzaSqlRelMessageJoinFunction // If table position is on the left, add the stream message fields last if (!isTablePosOnRight) { - outFieldValues.addAll(message.getFieldValues()); + outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues()); } return new SamzaSqlRelMessage(outFieldNames, outFieldValues); http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java index e58563c..689af72 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java @@ -34,13 +34,13 @@ public class TestSamzaSqlRelMessage { @Test public void testGetField() { SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); - Assert.assertEquals(values.get(0), message.getField(names.get(0)).get()); - Assert.assertEquals(values.get(1), message.getField(names.get(1)).get()); + Assert.assertEquals(values.get(0), message.getSamzaSqlRelRecord().getField(names.get(0)).get()); + Assert.assertEquals(values.get(1), message.getSamzaSqlRelRecord().getField(names.get(1)).get()); } @Test public void testGetNonExistentField() { SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); - Assert.assertFalse(message.getField("field3").isPresent()); + Assert.assertFalse(message.getSamzaSqlRelRecord().getField("field3").isPresent()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java index 3da004a..90fce3b 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java @@ -53,12 +53,13 @@ public class TestSamzaSqlRelMessageJoinFunction { new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames); SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record); - Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size()); + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), + outMsg.getSamzaSqlRelRecord().getFieldNames().size()); List<String> expectedFieldNames = new ArrayList<>(streamFieldNames); expectedFieldNames.addAll(tableFieldNames); List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues); expectedFieldValues.addAll(tableFieldValues); - Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues); + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues); } @Test @@ -75,12 +76,13 @@ public class TestSamzaSqlRelMessageJoinFunction { new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, tableFieldNames); SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record); - Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size()); + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), + outMsg.getSamzaSqlRelRecord().getFieldNames().size()); List<String> expectedFieldNames = new ArrayList<>(tableFieldNames); expectedFieldNames.addAll(streamFieldNames); List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues); expectedFieldValues.addAll(streamFieldValues); - Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues); + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues); } @Test @@ -106,11 +108,12 @@ public class TestSamzaSqlRelMessageJoinFunction { tableFieldNames); SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null); - Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size()); + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), + outMsg.getSamzaSqlRelRecord().getFieldNames().size()); List<String> expectedFieldNames = new ArrayList<>(streamFieldNames); expectedFieldNames.addAll(tableFieldNames); List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues); expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null ).collect(Collectors.toList())); - Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues); + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java index 3416ee1..883abbf 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java @@ -20,12 +20,29 @@ package org.apache.samza.sql; import java.util.Arrays; +import java.util.HashMap; import java.util.List; -import org.apache.samza.serializers.JsonSerdeV2; +import java.util.Map; +import javafx.util.Pair; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.samza.config.MapConfig; +import org.apache.samza.operators.KV; +import org.apache.samza.sql.avro.AvroRelConverter; +import org.apache.samza.sql.avro.AvroRelSchemaProvider; +import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory; +import org.apache.samza.sql.avro.schemas.AddressRecord; +import org.apache.samza.sql.avro.schemas.Profile; +import org.apache.samza.sql.avro.schemas.StreetNumRecord; import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; +import org.apache.samza.system.SystemStream; import org.junit.Assert; import org.junit.Test; +import static org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde; + public class TestSamzaSqlRelMessageSerde { @@ -35,9 +52,52 @@ public class TestSamzaSqlRelMessageSerde { @Test public void testWithDifferentFields() { SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); - JsonSerdeV2<SamzaSqlRelMessage> serde = new JsonSerdeV2<>(SamzaSqlRelMessage.class); + SamzaSqlRelMessageSerde serde = + (SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(message)); - Assert.assertEquals(resultMsg.getFieldNames(), names); - Assert.assertEquals(resultMsg.getFieldValues(), values); + Assert.assertEquals(names, resultMsg.getSamzaSqlRelRecord().getFieldNames()); + Assert.assertEquals(values, resultMsg.getSamzaSqlRelRecord().getFieldValues()); } + + @Test + public void testNestedRecordConversion() { + Map<String, String> props = new HashMap<>(); + SystemStream ss1 = new SystemStream("test", "nestedRecord"); + props.put( + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss1.getSystem(), ss1.getStream()), + Profile.SCHEMA$.toString()); + ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory(); + AvroRelSchemaProvider nestedRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props)); + AvroRelConverter nestedRecordAvroRelConverter = new AvroRelConverter(ss1, nestedRecordSchemaProvider, new MapConfig()); + + Pair<SamzaSqlRelMessage, GenericData.Record> messageRecordPair = + createNestedSamzaSqlRelMessage(nestedRecordAvroRelConverter); + SamzaSqlRelMessageSerde serde = + (SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); + SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(messageRecordPair.getKey())); + nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg); + KV<Object, Object> samzaMessage = nestedRecordAvroRelConverter.convertToSamzaMessage(resultMsg); + GenericRecord recordPostConversion = (GenericRecord) samzaMessage.getValue(); + + for (Schema.Field field : Profile.SCHEMA$.getFields()) { + // equals() on GenericRecord does the nested record equality check as well. + Assert.assertEquals(recordPostConversion.get(field.name()), messageRecordPair.getValue().get(field.name())); + } + } + + private Pair<SamzaSqlRelMessage, GenericData.Record> createNestedSamzaSqlRelMessage( + AvroRelConverter nestedRecordAvroRelConverter) { + GenericData.Record record = new GenericData.Record(Profile.SCHEMA$); + record.put("id", 1); + record.put("name", "name1"); + record.put("companyId", 0); + GenericData.Record addressRecord = new GenericData.Record(AddressRecord.SCHEMA$); + addressRecord.put("zip", 90000); + record.put("address", addressRecord); + GenericData.Record streetNumRecord = new GenericData.Record(StreetNumRecord.SCHEMA$); + streetNumRecord.put("number", 1200); + addressRecord.put("streetnum", streetNumRecord); + return new Pair<>(nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)), record); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java index 21c666b..61abdfc 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java @@ -48,8 +48,13 @@ import org.apache.calcite.rel.type.RelRecordType; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.samza.config.MapConfig; import org.apache.samza.operators.KV; +import org.apache.samza.sql.avro.schemas.AddressRecord; import org.apache.samza.sql.avro.schemas.ComplexRecord; +import org.apache.samza.sql.avro.schemas.Kind; +import org.apache.samza.sql.avro.schemas.PhoneNumber; +import org.apache.samza.sql.avro.schemas.Profile; import org.apache.samza.sql.avro.schemas.SimpleRecord; +import org.apache.samza.sql.avro.schemas.StreetNumRecord; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.system.SystemStream; import org.junit.Assert; @@ -63,8 +68,10 @@ public class TestAvroRelConversion { private static final Logger LOG = LoggerFactory.getLogger(TestAvroRelConversion.class); private final AvroRelConverter simpleRecordAvroRelConverter; private final AvroRelConverter complexRecordAvroRelConverter; + private final AvroRelConverter nestedRecordAvroRelConverter; private final AvroRelSchemaProvider simpleRecordSchemaProvider; - private final AvroRelSchemaProvider complexRecordSchemProvider; + private final AvroRelSchemaProvider complexRecordSchemaProvider; + private final AvroRelSchemaProvider nestedRecordSchemaProvider; private int id = 1; private boolean boolValue = true; @@ -85,19 +92,25 @@ public class TestAvroRelConversion { Map<String, String> props = new HashMap<>(); SystemStream ss1 = new SystemStream("test", "complexRecord"); SystemStream ss2 = new SystemStream("test", "simpleRecord"); + SystemStream ss3 = new SystemStream("test", "nestedRecord"); props.put( String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss1.getSystem(), ss1.getStream()), ComplexRecord.SCHEMA$.toString()); props.put( String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss2.getSystem(), ss2.getStream()), SimpleRecord.SCHEMA$.toString()); + props.put( + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss3.getSystem(), ss3.getStream()), + Profile.SCHEMA$.toString()); ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory(); - complexRecordSchemProvider = (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props)); + complexRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props)); simpleRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss2, new MapConfig(props)); - complexRecordAvroRelConverter = new AvroRelConverter(ss1, complexRecordSchemProvider, new MapConfig()); + nestedRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss3, new MapConfig(props)); + complexRecordAvroRelConverter = new AvroRelConverter(ss1, complexRecordSchemaProvider, new MapConfig()); simpleRecordAvroRelConverter = new AvroRelConverter(ss2, simpleRecordSchemaProvider, new MapConfig()); + nestedRecordAvroRelConverter = new AvroRelConverter(ss3, nestedRecordSchemaProvider, new MapConfig()); } @Test @@ -119,7 +132,14 @@ public class TestAvroRelConversion { @Test public void testComplexSchemaConversion() { - RelDataType relSchema = complexRecordSchemProvider.getRelationalSchema(); + RelDataType relSchema = complexRecordSchemaProvider.getRelationalSchema(); + + LOG.info("Relational schema " + relSchema); + } + + @Test + public void testNestedSchemaConversion() { + RelDataType relSchema = nestedRecordSchemaProvider.getRelationalSchema(); LOG.info("Relational schema " + relSchema); } @@ -132,21 +152,23 @@ public class TestAvroRelConversion { record.put("name", "name1"); SamzaSqlRelMessage message = simpleRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)); - LOG.info(Joiner.on(",").join(message.getFieldValues())); - LOG.info(Joiner.on(",").join(message.getFieldNames())); + LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldValues())); + LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames())); } @Test public void testEmptyRecordConversion() { GenericData.Record record = new GenericData.Record(SimpleRecord.SCHEMA$); SamzaSqlRelMessage message = simpleRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)); - Assert.assertEquals(message.getFieldNames().size(), message.getFieldValues().size()); + Assert.assertEquals(message.getSamzaSqlRelRecord().getFieldNames().size(), + message.getSamzaSqlRelRecord().getFieldValues().size()); } @Test public void testNullRecordConversion() { SamzaSqlRelMessage message = simpleRecordAvroRelConverter.convertToRelMessage(new KV<>("key", null)); - Assert.assertEquals(message.getFieldNames().size(), message.getFieldValues().size()); + Assert.assertEquals(message.getSamzaSqlRelRecord().getFieldNames().size(), + message.getSamzaSqlRelRecord().getFieldValues().size()); } public static <T> byte[] encodeAvroSpecificRecord(Class<T> clazz, T record) throws IOException { @@ -192,6 +214,57 @@ public class TestAvroRelConversion { validateAvroSerializedData(serializedData); } + @Test + public void testNestedRecordConversion() throws IOException { + GenericData.Record record = new GenericData.Record(Profile.SCHEMA$); + record.put("id", 1); + record.put("name", "name1"); + record.put("companyId", 0); + GenericData.Record addressRecord = new GenericData.Record(AddressRecord.SCHEMA$); + addressRecord.put("zip", 90000); + GenericData.Record streetNumRecord = new GenericData.Record(StreetNumRecord.SCHEMA$); + streetNumRecord.put("number", 1200); + addressRecord.put("streetnum", streetNumRecord); + record.put("address", addressRecord); + record.put("selfEmployed", "True"); + + + GenericData.Record phoneNumberRecordH = new GenericData.Record(PhoneNumber.SCHEMA$); + phoneNumberRecordH.put("kind", Kind.Home); + phoneNumberRecordH.put("number", "111-111-1111"); + GenericData.Record phoneNumberRecordC = new GenericData.Record(PhoneNumber.SCHEMA$); + phoneNumberRecordC.put("kind", Kind.Cell); + phoneNumberRecordC.put("number", "111-111-1112"); + List<GenericData.Record> phoneNumbers = new ArrayList<>(); + phoneNumbers.add(phoneNumberRecordH); + phoneNumbers.add(phoneNumberRecordC); + record.put("phoneNumbers", phoneNumbers); + + GenericData.Record simpleRecord1 = new GenericData.Record(SimpleRecord.SCHEMA$); + simpleRecord1.put("id", 1); + simpleRecord1.put("name", "name1"); + GenericData.Record simpleRecord2 = new GenericData.Record(SimpleRecord.SCHEMA$); + simpleRecord2.put("id", 2); + simpleRecord2.put("name", "name2"); + HashMap<String, IndexedRecord> mapValues = new HashMap<>(); + mapValues.put("key1", simpleRecord1); + mapValues.put("key2", simpleRecord2); + record.put("mapValues", mapValues); + + SamzaSqlRelMessage relMessage = nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)); + + LOG.info(Joiner.on(",").join(relMessage.getSamzaSqlRelRecord().getFieldValues())); + LOG.info(Joiner.on(",").join(relMessage.getSamzaSqlRelRecord().getFieldNames())); + + KV<Object, Object> samzaMessage = nestedRecordAvroRelConverter.convertToSamzaMessage(relMessage); + GenericRecord recordPostConversion = (GenericRecord) samzaMessage.getValue(); + + for (Schema.Field field : Profile.SCHEMA$.getFields()) { + // equals() on GenericRecord does the nested record equality check as well. + Assert.assertEquals(record.get(field.name()), recordPostConversion.get(field.name())); + } + } + private static <T> T genericRecordFromBytes(byte[] bytes, Schema schema) throws IOException { BinaryDecoder binDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null); GenericDatumReader<T> reader = new GenericDatumReader<>(schema); @@ -212,29 +285,29 @@ public class TestAvroRelConversion { private void validateAvroSerializedData(byte[] serializedData) throws IOException { GenericRecord complexRecordValue = genericRecordFromBytes(serializedData, ComplexRecord.SCHEMA$); - String streamName = "stream"; - RelDataType dataType = complexRecordSchemProvider.getRelationalSchema(); - SamzaSqlRelMessage message = complexRecordAvroRelConverter.convertToRelMessage(new KV<>("key", complexRecordValue)); - Assert.assertEquals(message.getFieldNames().size(), ComplexRecord.SCHEMA$.getFields().size() + 1); - - Assert.assertEquals(message.getField("id").get(), id); - Assert.assertEquals(message.getField("bool_value").get(), boolValue); - Assert.assertEquals(message.getField("double_value").get(), doubleValue); - Assert.assertEquals(message.getField("string_value").get(), new Utf8(testStrValue)); - Assert.assertEquals(message.getField("float_value").get(), floatValue); - Assert.assertEquals(message.getField("long_value").get(), longValue); + Assert.assertEquals(message.getSamzaSqlRelRecord().getFieldNames().size(), ComplexRecord.SCHEMA$.getFields().size() + 1); + + Assert.assertEquals(message.getSamzaSqlRelRecord().getField("id").get(), id); + Assert.assertEquals(message.getSamzaSqlRelRecord().getField("bool_value").get(), boolValue); + Assert.assertEquals(message.getSamzaSqlRelRecord().getField("double_value").get(), doubleValue); + Assert.assertEquals(message.getSamzaSqlRelRecord().getField("string_value").get(), new Utf8(testStrValue)); + Assert.assertEquals(message.getSamzaSqlRelRecord().getField("float_value").get(), floatValue); + Assert.assertEquals(message.getSamzaSqlRelRecord().getField("long_value").get(), longValue); Assert.assertTrue( - arrayValue.stream().map(Utf8::new).collect(Collectors.toList()).equals(message.getField("array_values").get())); + arrayValue.stream() + .map(Utf8::new) + .collect(Collectors.toList()) + .equals(message.getSamzaSqlRelRecord().getField("array_values").get())); Assert.assertTrue(mapValue.entrySet() .stream() .collect(Collectors.toMap(x -> new Utf8(x.getKey()), y -> new Utf8(y.getValue()))) - .equals(message.getField("map_values").get())); + .equals(message.getSamzaSqlRelRecord().getField("map_values").get())); - Assert.assertTrue(message.getField("bytes_value").get().equals(testBytes)); + Assert.assertTrue(message.getSamzaSqlRelRecord().getField("bytes_value").get().equals(testBytes)); - LOG.info(Joiner.on(",").useForNull("null").join(message.getFieldValues())); - LOG.info(Joiner.on(",").join(message.getFieldNames())); + LOG.info(Joiner.on(",").useForNull("null").join(message.getSamzaSqlRelRecord().getFieldValues())); + LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames())); KV<Object, Object> samzaMessage = complexRecordAvroRelConverter.convertToSamzaMessage(message); GenericRecord record = (GenericRecord) samzaMessage.getValue(); http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/AddressRecord.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/AddressRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/AddressRecord.java new file mode 100644 index 0000000..f94abca --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/AddressRecord.java @@ -0,0 +1,52 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.samza.sql.avro.schemas; + +@SuppressWarnings("all") +public class AddressRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"AddressRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"zip\",\"type\":[\"null\",\"int\"],\"doc\":\"zip code.\",\"default\":null},{\"name\":\"streetnum\",\"type\":{\"type\":\"record\",\"name\":\"StreetNumRecord\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"doc\":\"street number.\",\"default\":null}]},\"doc\":\"Street Number\",\"default\":null}]}"); + /** zip code. */ + public java.lang.Integer zip; + /** Street Number */ + public org.apache.samza.sql.avro.schemas.StreetNumRecord streetnum; + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return zip; + case 1: return streetnum; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: zip = (java.lang.Integer)value$; break; + case 1: streetnum = (org.apache.samza.sql.avro.schemas.StreetNumRecord)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc index 5117a5e..5edb3ec 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc @@ -20,7 +20,7 @@ { "name": "EnrichedPageView", "version" : 1, - "namespace": "org.apache.samza.sql.system.avro", + "namespace": "org.apache.samza.sql.avro.schemas", "type": "record", "fields": [ { @@ -40,6 +40,42 @@ "doc" : "Profile name.", "type": ["null", "string"], "default":null + }, + { + "name": "profileAddress", + "doc": "Profile Address", + "default":null, + "type": { + "name": "AddressRecord", + "namespace": "org.apache.samza.sql.avro.schemas", + "type": "record", + "fields": [ + { + "name": "zip", + "doc" : "zip code.", + "type": ["null", "int"], + "default":null + }, + { + "name": "streetnum", + "doc": "Street Number", + "default":null, + "type": { + "name": "StreetNumRecord", + "namespace": "org.apache.samza.sql.avro.schemas", + "type": "record", + "fields": [ + { + "name": "number", + "doc": "street number.", + "type": ["null", "int"], + "default": null + } + ] + } + } + ] + } } ] } http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java index cf3f62d..820002b 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java @@ -26,13 +26,15 @@ package org.apache.samza.sql.avro.schemas; @SuppressWarnings("all") public class EnrichedPageView extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"EnrichedPageView\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"pageKey\",\"type\":[\"null\",\"string\"],\"doc\":\"Page key.\",\"default\":null},{\"name\":\"companyName\",\"type\":[\"null\",\"string\"],\"doc\":\"Company name.\",\"default\":null},{\"name\":\"profileName\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile name.\",\"default\":null}]}"); + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"EnrichedPageView\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"pageKey\",\"type\":[\"null\",\"string\"],\"doc\":\"Page key.\",\"default\":null},{\"name\":\"companyName\",\"type\":[\"null\",\"string\"],\"doc\":\"Company name.\",\"default\":null},{\"name\":\"profileName\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile name.\",\"default\":null},{\"name\":\"profileAddress\",\"type\":{\"type\":\"record\",\"name\":\"AddressRecord\",\"fields\":[{\"name\":\"zip\",\"type\":[\"null\",\"int\"],\"doc\":\"zip code.\",\"default\":null},{\"name\":\"streetnum\",\"type\":{\"type\":\"record\",\"name\":\"StreetNumRecord\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"doc\":\"street number.\",\"default\":null}]},\"doc\":\"Street Number\",\"default\":null}]},\"doc\":\"Profile Address\",\"default\":null}]}"); /** Page key. */ public java.lang.CharSequence pageKey; /** Company name. */ public java.lang.CharSequence companyName; /** Profile name. */ public java.lang.CharSequence profileName; + /** Profile Address */ + public org.apache.samza.sql.avro.schemas.AddressRecord profileAddress; public org.apache.avro.Schema getSchema() { return SCHEMA$; } // Used by DatumWriter. Applications should not call. public java.lang.Object get(int field$) { @@ -40,6 +42,7 @@ public class EnrichedPageView extends org.apache.avro.specific.SpecificRecordBas case 0: return pageKey; case 1: return companyName; case 2: return profileName; + case 3: return profileAddress; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } @@ -50,6 +53,7 @@ public class EnrichedPageView extends org.apache.avro.specific.SpecificRecordBas case 0: pageKey = (java.lang.CharSequence)value$; break; case 1: companyName = (java.lang.CharSequence)value$; break; case 2: profileName = (java.lang.CharSequence)value$; break; + case 3: profileAddress = (org.apache.samza.sql.avro.schemas.AddressRecord)value$; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Kind.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Kind.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Kind.java new file mode 100644 index 0000000..8cfcce2 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Kind.java @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.samza.sql.avro.schemas; + +@SuppressWarnings("all") +public enum Kind { + Home, Work, Cell +} http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PhoneNumber.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PhoneNumber.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PhoneNumber.java new file mode 100644 index 0000000..e9ced19 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PhoneNumber.java @@ -0,0 +1,50 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.samza.sql.avro.schemas; + +@SuppressWarnings("all") +public class PhoneNumber extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"PhoneNumber\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"kind\",\"type\":{\"type\":\"enum\",\"name\":\"Kind\",\"symbols\":[\"Home\",\"Work\",\"Cell\"]}},{\"name\":\"number\",\"type\":\"string\"}]}"); + public org.apache.samza.sql.avro.schemas.Kind kind; + public java.lang.CharSequence number; + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return kind; + case 1: return number; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: kind = (org.apache.samza.sql.avro.schemas.Kind)value$; break; + case 1: number = (java.lang.CharSequence)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc index 4e5e7dc..f07dd75 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc @@ -20,7 +20,7 @@ { "name": "Profile", "version" : 1, - "namespace": "org.apache.samza.sql.system.avro", + "namespace": "org.apache.samza.sql.avro.schemas", "type": "record", "fields": [ { @@ -40,6 +40,110 @@ "doc" : "Company id.", "type": ["null", "int"], "default":null + }, + { + "name": "address", + "doc": "Profile Address", + "default":null, + "type": { + "name": "AddressRecord", + "namespace": "org.apache.samza.sql.avro.schemas", + "type": "record", + "fields": [ + { + "name": "zip", + "doc" : "zip code.", + "type": ["null", "int"], + "default":null + }, + { + "name": "streetnum", + "doc": "Street Number", + "default":null, + "type": { + "name": "StreetNumRecord", + "namespace": "org.apache.samza.sql.avro.schemas", + "type": "record", + "fields": [ + { + "name": "number", + "doc": "street number.", + "type": ["null", "int"], + "default": null + } + ] + } + } + ] + } + }, + { + "name": "selfEmployed", + "doc": "Boolean Value.", + "type": ["null", "boolean"], + "default":null + }, + { + "name": "phoneNumbers", + "doc" : "array values in the record.", + "default": null, + "type": [ "null", + { + "type": "array", + "items":{ + "name":"PhoneNumber", + "type":"record", + "fields":[ + { + "name": "kind", + "type": { + "name": "Kind", + "type": "enum", + "symbols" : ["Home", "Work", "Cell"] + } + }, + {"name":"number", "type":"string"} + ] + } + } + ] + }, + { + "name": "mapValues", + "doc" : "map values in the record.", + "default": null, + "type": [ "null", + { + "type": "map", + "values":[ + { + "name": "SimpleRecord", + "version" : 1, + "type": "record", + "fields": [ + { + "name": "id", + "doc": "Record id.", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "name", + "doc": "Some name.", + "type": [ + "null", + "string" + ], + "default": null + } + ] + } + ] + } + ] } ] } http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java index b5c1828..5c72930 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java @@ -26,13 +26,21 @@ package org.apache.samza.sql.avro.schemas; @SuppressWarnings("all") public class Profile extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Profile\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Profile id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile name.\",\"default\":null},{\"name\":\"companyId\",\"type\":[\"null\",\"int\"],\"doc\":\"Company id.\",\"default\":null}]}"); + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Profile\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Profile id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile name.\",\"default\":null},{\"name\":\"companyId\",\"type\":[\"null\",\"int\"],\"doc\":\"Company id.\",\"default\":null},{\"name\":\"address\",\"type\":{\"type\":\"record\",\"name\":\"AddressRecord\",\"fields\":[{\"name\":\"zip\",\"type\":[\"null\",\"int\"],\"doc\":\"zip code.\",\"default\":null},{\"name\":\"streetnum\",\"type\":{\"type\":\"record\",\"name\":\"StreetNumRecord\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"doc\":\"street number.\",\"default\":null}]},\"doc\":\"Street Number\",\"default\":null}]},\"doc\":\"Profile Address\",\"default\":null},{\"name\":\"selfEmployed\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Bo olean Value.\",\"default\":null},{\"name\":\"phoneNumbers\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"PhoneNumber\",\"fields\":[{\"name\":\"kind\",\"type\":{\"type\":\"enum\",\"name\":\"Kind\",\"symbols\":[\"Home\",\"Work\",\"Cell\"]}},{\"name\":\"number\",\"type\":\"string\"}]}}],\"doc\":\"array values in the record.\",\"default\":null},{\"name\":\"mapValues\",\"type\":[\"null\",{\"type\":\"map\",\"values\":[{\"type\":\"record\",\"name\":\"SimpleRecord\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Some name.\",\"default\":null}]}]}],\"doc\":\"map values in the record.\",\"default\":null}]}"); /** Profile id. */ public java.lang.Integer id; /** Profile name. */ public java.lang.CharSequence name; /** Company id. */ public java.lang.Integer companyId; + /** Profile Address */ + public org.apache.samza.sql.avro.schemas.AddressRecord address; + /** Boolean Value. */ + public java.lang.Boolean selfEmployed; + /** array values in the record. */ + public java.util.List<org.apache.samza.sql.avro.schemas.PhoneNumber> phoneNumbers; + /** map values in the record. */ + public java.util.Map<java.lang.CharSequence,java.lang.Object> mapValues; public org.apache.avro.Schema getSchema() { return SCHEMA$; } // Used by DatumWriter. Applications should not call. public java.lang.Object get(int field$) { @@ -40,6 +48,10 @@ public class Profile extends org.apache.avro.specific.SpecificRecordBase impleme case 0: return id; case 1: return name; case 2: return companyId; + case 3: return address; + case 4: return selfEmployed; + case 5: return phoneNumbers; + case 6: return mapValues; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } @@ -50,6 +62,10 @@ public class Profile extends org.apache.avro.specific.SpecificRecordBase impleme case 0: id = (java.lang.Integer)value$; break; case 1: name = (java.lang.CharSequence)value$; break; case 2: companyId = (java.lang.Integer)value$; break; + case 3: address = (org.apache.samza.sql.avro.schemas.AddressRecord)value$; break; + case 4: selfEmployed = (java.lang.Boolean)value$; break; + case 5: phoneNumbers = (java.util.List<org.apache.samza.sql.avro.schemas.PhoneNumber>)value$; break; + case 6: mapValues = (java.util.Map<java.lang.CharSequence,java.lang.Object>)value$; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc index 6b010a4..2316246 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.avsc @@ -20,7 +20,7 @@ { "name": "SimpleRecord", "version" : 1, - "namespace": "org.apache.samza.sql.system.avro", + "namespace": "org.apache.samza.sql.avro.schemas", "type": "record", "fields": [ { http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java index baf3812..23cfb7d 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/SimpleRecord.java @@ -26,7 +26,7 @@ package org.apache.samza.sql.avro.schemas; @SuppressWarnings("all") public class SimpleRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"SimpleRecord\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Some name.\",\"default\":null}]}"); + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"SimpleRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Some name.\",\"default\":null}]}"); /** Record id. */ public java.lang.Integer id; /** Some name. */ http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/StreetNumRecord.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/StreetNumRecord.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/StreetNumRecord.java new file mode 100644 index 0000000..aca20e1 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/StreetNumRecord.java @@ -0,0 +1,48 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.samza.sql.avro.schemas; + +@SuppressWarnings("all") +public class StreetNumRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"StreetNumRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"doc\":\"street number.\",\"default\":null}]}"); + /** street number. */ + public java.lang.Integer number; + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return number; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: number = (java.lang.Integer)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } +}