Repository: samza Updated Branches: refs/heads/master bf2a2f76d -> 58c39e371
http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java index 6dcad25..4bd9741 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java @@ -32,11 +32,15 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.samza.config.Config; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.sql.avro.schemas.AddressRecord; import org.apache.samza.sql.avro.schemas.Company; import org.apache.samza.sql.avro.schemas.ComplexRecord; +import org.apache.samza.sql.avro.schemas.Kind; import org.apache.samza.sql.avro.schemas.PageView; +import org.apache.samza.sql.avro.schemas.PhoneNumber; import org.apache.samza.sql.avro.schemas.Profile; import org.apache.samza.sql.avro.schemas.SimpleRecord; +import org.apache.samza.sql.avro.schemas.StreetNumRecord; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemAdmin; @@ -55,33 +59,44 @@ public class TestAvroSystemFactory implements SystemFactory { public static final String CFG_INCLUDE_NULL_FOREIGN_KEYS = "includeNullForeignKeys"; public static List<OutgoingMessageEnvelope> messages = new ArrayList<>(); - public static final String[] profiles = {"John", "Mike", "Mary", "Joe", "Brad", "Jennifer"}; + private static final String[] profileNames = {"John", "Mike", "Mary", "Joe", "Brad", "Jennifer"}; + private static final int[] profileZips = {94000, 94001, 94002, 94003, 94004, 94005}; + private static final int[] streetNums = {1234, 1235, 1236, 1237, 1238, 1239}; + private static final String[] phoneNumbers = {"000-000-0000", "111-111-1111", "222-222-2222", "333-333-3333", + "444-444-4444", "555-555-5555"}; public static final String[] companies = {"MSFT", "LKND", "GOOG", "FB", "AMZN", "CSCO"}; - public static final String[] pagekeys = {"inbox", "home", "search", "pymk", "group", "job"}; + private static final String[] pagekeys = {"inbox", "home", "search", "pymk", "group", "job"}; public static List<String> getPageKeyProfileNameJoin(int numMessages) { return IntStream.range(0, numMessages) - .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profiles[i % profiles.length]) + .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profileNames[i % profileNames.length]) .collect(Collectors.toList()); } + public static List<String> getPageKeyProfileNameAddressJoin(int numMessages) { + return IntStream.range(0, numMessages) + .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profileNames[i % profileNames.length] + "," + + profileZips[i % profileZips.length] + "," + streetNums[i % streetNums.length]) + .collect(Collectors.toList()); + } + public static List<String> getPageKeyProfileNameJoinWithNullForeignKeys(int numMessages) { // All even profileId foreign keys are null return IntStream.range(0, numMessages / 2) - .mapToObj(i -> pagekeys[(i * 2 + 1) % pagekeys.length] + "," + profiles[(i * 2 + 1) % profiles.length]) + .mapToObj(i -> pagekeys[(i * 2 + 1) % pagekeys.length] + "," + profileNames[(i * 2 + 1) % profileNames.length]) .collect(Collectors.toList()); } public static List<String> getPageKeyProfileNameOuterJoinWithNullForeignKeys(int numMessages) { // All even profileId foreign keys are null return IntStream.range(0, numMessages) - .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + ((i % 2 == 0) ? "null" : profiles[i % profiles.length])) + .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + ((i % 2 == 0) ? "null" : profileNames[i % profileNames.length])) .collect(Collectors.toList()); } public static List<String> getPageKeyProfileCompanyNameJoin(int numMessages) { return IntStream.range(0, numMessages) - .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profiles[i % profiles.length] + + .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profileNames[i % profileNames.length] + "," + companies[i % companies.length]) .collect(Collectors.toList()); } @@ -180,8 +195,53 @@ public class TestAvroSystemFactory implements SystemFactory { private Object createProfileRecord(int index) { GenericRecord record = new GenericData.Record(Profile.SCHEMA$); record.put("id", index); - record.put("name", profiles[index % profiles.length]); + record.put("name", profileNames[index % profileNames.length]); + record.put("address", createProfileAddressRecord(index)); record.put("companyId", includeNullForeignKeys && (index % 2 == 0) ? null : index % companies.length); + record.put("phoneNumbers", createProfilePhoneNumbers(index % phoneNumbers.length)); + return record; + } + + private Object createProfileAddressRecord(int index) { + GenericRecord record = new GenericData.Record(AddressRecord.SCHEMA$); + record.put("streetnum", createProfileStreetNumRecord(index)); + record.put("zip", profileZips[index % profileNames.length]); + return record; + } + + private Object createProfileStreetNumRecord(int index) { + GenericRecord record = new GenericData.Record(StreetNumRecord.SCHEMA$); + record.put("number", streetNums[index % streetNums.length]); + return record; + } + + private List<Object> createProfilePhoneNumbers(int index) { + List<Object> phoneNums = new ArrayList<>(); + phoneNums.add(createPhoneNumberRecord(index, Kind.Home)); + phoneNums.add(createPhoneNumberRecord(index, Kind.Work)); + phoneNums.add(createPhoneNumberRecord(index, Kind.Cell)); + return phoneNums; + } + + private Object createPhoneNumberRecord(int index, Kind kind) { + GenericRecord record = new GenericData.Record(PhoneNumber.SCHEMA$); + StringBuilder number = new StringBuilder(phoneNumbers[index]); + int lastCharIdx = number.length() - 1; + String suffix = ""; + switch (kind) { + case Home: + suffix = "1"; + break; + case Work: + suffix = "2"; + break; + case Cell: + suffix = "3"; + break; + } + number.replace(lastCharIdx, lastCharIdx + 1, suffix); + record.put("number", number); + record.put("kind", kind); return record; } http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index 3fff0f3..a41463e 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -201,6 +201,40 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { } @Test + public void testEndToEndStreamTableInnerJoinWithNestedRecord() throws Exception { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + staticConfigs.putAll(configs); + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey, p.name as profileName, p.address as profileAddress " + + "from testavro.PROFILE.`$table` as p " + + "join testavro.PAGEVIEW as pv " + + " on p.id = pv.profileId"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> { + GenericRecord profileAddr = (GenericRecord) ((GenericRecord) x.getMessage()).get("profileAddress"); + GenericRecord streetNum = (GenericRecord) (profileAddr.get("streetnum")); + return ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : + ((GenericRecord) x.getMessage()).get("profileName").toString()) + "," + + profileAddr.get("zip") + "," + streetNum.get("number"); + }) + .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameAddressJoin(numMessages); + Assert.assertEquals(outMessages, expectedOutMessages); + } + + @Test public void testEndToEndStreamTableInnerJoinWithFilter() throws Exception { int numMessages = 20; http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java ---------------------------------------------------------------------- diff --git a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java index 198b84b..6c7a8c2 100644 --- a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java +++ b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java @@ -58,8 +58,8 @@ public class AvroSchemaGenRelConverter extends AvroRelConverter { private Schema computeSchema(String streamName, SamzaSqlRelMessage relMessage) { List<Schema.Field> keyFields = new ArrayList<>(); - List<String> fieldNames = relMessage.getFieldNames(); - List<Object> values = relMessage.getFieldValues(); + List<String> fieldNames = relMessage.getSamzaSqlRelRecord().getFieldNames(); + List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues(); for (int index = 0; index < fieldNames.size(); index++) { if (fieldNames.get(index).equals(SamzaSqlRelMessage.KEY_NAME) || values.get(index) == null) { http://git-wip-us.apache.org/repos/asf/samza/blob/58c39e37/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java ---------------------------------------------------------------------- diff --git a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java index de8dec8..ce257f1 100644 --- a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java +++ b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java @@ -59,8 +59,8 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory { String jsonValue; ObjectNode node = mapper.createObjectNode(); - List<String> fieldNames = relMessage.getFieldNames(); - List<Object> values = relMessage.getFieldValues(); + List<String> fieldNames = relMessage.getSamzaSqlRelRecord().getFieldNames(); + List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues(); for (int index = 0; index < fieldNames.size(); index++) { Object value = values.get(index);