Repository: samza
Updated Branches:
  refs/heads/master bf2a2f76d -> 58c39e371


http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 6dcad25..4bd9741 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -32,11 +32,15 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.sql.avro.schemas.AddressRecord;
 import org.apache.samza.sql.avro.schemas.Company;
 import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.Kind;
 import org.apache.samza.sql.avro.schemas.PageView;
+import org.apache.samza.sql.avro.schemas.PhoneNumber;
 import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
+import org.apache.samza.sql.avro.schemas.StreetNumRecord;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemAdmin;
@@ -55,33 +59,44 @@ public class TestAvroSystemFactory implements SystemFactory 
{
   public static final String CFG_INCLUDE_NULL_FOREIGN_KEYS = 
"includeNullForeignKeys";
   public static List<OutgoingMessageEnvelope> messages = new ArrayList<>();
 
-  public static final String[] profiles = {"John", "Mike", "Mary", "Joe", 
"Brad", "Jennifer"};
+  private static final String[] profileNames = {"John", "Mike", "Mary", "Joe", 
"Brad", "Jennifer"};
+  private static final int[] profileZips = {94000, 94001, 94002, 94003, 94004, 
94005};
+  private static final int[] streetNums = {1234, 1235, 1236, 1237, 1238, 1239};
+  private static final String[] phoneNumbers = {"000-000-0000", 
"111-111-1111", "222-222-2222", "333-333-3333",
+      "444-444-4444", "555-555-5555"};
   public static final String[] companies = {"MSFT", "LKND", "GOOG", "FB", 
"AMZN", "CSCO"};
-  public static final String[] pagekeys = {"inbox", "home", "search", "pymk", 
"group", "job"};
+  private static final String[] pagekeys = {"inbox", "home", "search", "pymk", 
"group", "job"};
 
   public static List<String> getPageKeyProfileNameJoin(int numMessages) {
     return IntStream.range(0, numMessages)
-                .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + 
profiles[i % profiles.length])
+                .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + 
profileNames[i % profileNames.length])
                 .collect(Collectors.toList());
   }
 
+  public static List<String> getPageKeyProfileNameAddressJoin(int numMessages) 
{
+    return IntStream.range(0, numMessages)
+        .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profileNames[i % 
profileNames.length] + "," +
+            profileZips[i % profileZips.length] + "," + streetNums[i % 
streetNums.length])
+        .collect(Collectors.toList());
+  }
+
   public static List<String> getPageKeyProfileNameJoinWithNullForeignKeys(int 
numMessages) {
     // All even profileId foreign keys are null
     return IntStream.range(0, numMessages / 2)
-        .mapToObj(i -> pagekeys[(i * 2 + 1) % pagekeys.length] + "," + 
profiles[(i * 2 + 1) % profiles.length])
+        .mapToObj(i -> pagekeys[(i * 2 + 1) % pagekeys.length] + "," + 
profileNames[(i * 2 + 1) % profileNames.length])
         .collect(Collectors.toList());
   }
 
   public static List<String> 
getPageKeyProfileNameOuterJoinWithNullForeignKeys(int numMessages) {
     // All even profileId foreign keys are null
     return IntStream.range(0, numMessages)
-        .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + ((i % 2 == 0) ? 
"null" : profiles[i % profiles.length]))
+        .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + ((i % 2 == 0) ? 
"null" : profileNames[i % profileNames.length]))
         .collect(Collectors.toList());
   }
 
   public static List<String> getPageKeyProfileCompanyNameJoin(int numMessages) 
{
     return IntStream.range(0, numMessages)
-        .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profiles[i % 
profiles.length] +
+        .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profileNames[i % 
profileNames.length] +
             "," + companies[i % companies.length])
         .collect(Collectors.toList());
   }
@@ -180,8 +195,53 @@ public class TestAvroSystemFactory implements 
SystemFactory {
     private Object createProfileRecord(int index) {
       GenericRecord record = new GenericData.Record(Profile.SCHEMA$);
       record.put("id", index);
-      record.put("name", profiles[index % profiles.length]);
+      record.put("name", profileNames[index % profileNames.length]);
+      record.put("address", createProfileAddressRecord(index));
       record.put("companyId", includeNullForeignKeys && (index % 2 == 0) ? 
null : index % companies.length);
+      record.put("phoneNumbers", createProfilePhoneNumbers(index % 
phoneNumbers.length));
+      return record;
+    }
+
+    private Object createProfileAddressRecord(int index) {
+      GenericRecord record = new GenericData.Record(AddressRecord.SCHEMA$);
+      record.put("streetnum", createProfileStreetNumRecord(index));
+      record.put("zip", profileZips[index % profileNames.length]);
+      return record;
+    }
+
+    private Object createProfileStreetNumRecord(int index) {
+      GenericRecord record = new GenericData.Record(StreetNumRecord.SCHEMA$);
+      record.put("number", streetNums[index % streetNums.length]);
+      return record;
+    }
+
+    private List<Object> createProfilePhoneNumbers(int index) {
+      List<Object> phoneNums = new ArrayList<>();
+      phoneNums.add(createPhoneNumberRecord(index, Kind.Home));
+      phoneNums.add(createPhoneNumberRecord(index, Kind.Work));
+      phoneNums.add(createPhoneNumberRecord(index, Kind.Cell));
+      return phoneNums;
+    }
+
+    private Object createPhoneNumberRecord(int index, Kind kind) {
+      GenericRecord record = new GenericData.Record(PhoneNumber.SCHEMA$);
+      StringBuilder number = new StringBuilder(phoneNumbers[index]);
+      int lastCharIdx = number.length() - 1;
+      String suffix = "";
+      switch (kind) {
+        case Home:
+          suffix = "1";
+          break;
+        case Work:
+          suffix = "2";
+          break;
+        case Cell:
+          suffix = "3";
+          break;
+      }
+      number.replace(lastCharIdx, lastCharIdx + 1, suffix);
+      record.put("number", number);
+      record.put("kind", kind);
       return record;
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 3fff0f3..a41463e 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -201,6 +201,40 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
   }
 
   @Test
+  public void testEndToEndStreamTableInnerJoinWithNestedRecord() throws 
Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    staticConfigs.putAll(configs);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey, p.name as profileName, p.address as 
profileAddress "
+            + "from testavro.PROFILE.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.id = pv.profileId";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> {
+            GenericRecord profileAddr = (GenericRecord) ((GenericRecord) 
x.getMessage()).get("profileAddress");
+            GenericRecord streetNum = (GenericRecord) 
(profileAddr.get("streetnum"));
+            return ((GenericRecord) x.getMessage()).get("pageKey").toString() 
+ ","
+                + (((GenericRecord) x.getMessage()).get("profileName") == null 
? "null" :
+                ((GenericRecord) 
x.getMessage()).get("profileName").toString()) + ","
+                + profileAddr.get("zip") + "," + streetNum.get("number");
+          })
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    List<String> expectedOutMessages = 
TestAvroSystemFactory.getPageKeyProfileNameAddressJoin(numMessages);
+    Assert.assertEquals(outMessages, expectedOutMessages);
+  }
+
+  @Test
   public void testEndToEndStreamTableInnerJoinWithFilter() throws Exception {
     int numMessages = 20;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
index 198b84b..6c7a8c2 100644
--- 
a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java
@@ -58,8 +58,8 @@ public class AvroSchemaGenRelConverter extends 
AvroRelConverter {
 
   private Schema computeSchema(String streamName, SamzaSqlRelMessage 
relMessage) {
     List<Schema.Field> keyFields = new ArrayList<>();
-    List<String> fieldNames = relMessage.getFieldNames();
-    List<Object> values = relMessage.getFieldValues();
+    List<String> fieldNames = 
relMessage.getSamzaSqlRelRecord().getFieldNames();
+    List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues();
 
     for (int index = 0; index < fieldNames.size(); index++) {
       if (fieldNames.get(index).equals(SamzaSqlRelMessage.KEY_NAME) || 
values.get(index) == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
index de8dec8..ce257f1 100644
--- 
a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
@@ -59,8 +59,8 @@ public class JsonRelConverterFactory implements 
SamzaRelConverterFactory {
       String jsonValue;
       ObjectNode node = mapper.createObjectNode();
 
-      List<String> fieldNames = relMessage.getFieldNames();
-      List<Object> values = relMessage.getFieldValues();
+      List<String> fieldNames = 
relMessage.getSamzaSqlRelRecord().getFieldNames();
+      List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues();
 
       for (int index = 0; index < fieldNames.size(); index++) {
         Object value = values.get(index);

Reply via email to