Repository: samza Updated Branches: refs/heads/master db6996ed9 -> dec16392d
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java index c9f59e6..e5d3659 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java @@ -22,7 +22,9 @@ package org.apache.samza.sql.translator; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; @@ -43,6 +45,9 @@ import org.junit.Before; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; +import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*; + + public class TestQueryTranslator { // Helper functions to validate the cloned copies of TranslatorContext and SamzaSqlExecutionContext @@ -79,14 +84,20 @@ public class TestQueryTranslator { public void testTranslate() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, - "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10"); + "Insert into testavro.outputTopic(id) select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10"); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(descriptor -> { },samzaConfig); + StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig); - translator.translate(queryInfo, appDesc); + translator.translate(queryInfo.get(0), appDesc); OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph(); StreamConfig streamConfig = new StreamConfig(samzaConfig); @@ -128,17 +139,20 @@ public class TestQueryTranslator { public void testTranslateComplex() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, - "Insert into testavro.outputTopic select Flatten(array_values) from testavro.COMPLEX1"); -// config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, -// "Insert into testavro.foo2 select string_value, SUM(id) from testavro.COMPLEX1 " -// + "GROUP BY TumbleWindow(CURRENT_TIME, INTERVAL '1' HOUR), string_value"); + "Insert into testavro.outputTopic(string_value) select Flatten(array_values) from testavro.COMPLEX1"); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + translator.translate(queryInfo.get(0), streamAppDesc); OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); StreamConfig streamConfig = new StreamConfig(samzaConfig); @@ -163,14 +177,21 @@ public class TestQueryTranslator { public void testTranslateSubQuery() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, - "Insert into testavro.outputTopic select Flatten(a), id from (select id, array_values a, string_value s from testavro.COMPLEX1)"); + "Insert into testavro.outputTopic(string_value, id) select Flatten(a), id " + + " from (select id, array_values a, string_value s from testavro.COMPLEX1)"); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + translator.translate(queryInfo.get(0), streamAppDesc); OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); StreamConfig streamConfig = new StreamConfig(samzaConfig); @@ -195,115 +216,151 @@ public class TestQueryTranslator { public void testTranslateStreamTableJoinWithoutJoinOperator() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p" + " where p.id = pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = SamzaException.class) public void testTranslateStreamTableJoinWithFullJoinOperator() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv" + " full join testavro.PROFILE.`$table` as p" + " on p.id = pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = IllegalStateException.class) public void testTranslateStreamTableJoinWithSelfJoinOperator() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName)" + " select p1.name as profileName" + " from testavro.PROFILE.`$table` as p1" + " join testavro.PROFILE.`$table` as p2" + " on p1.id = p2.id"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = SamzaException.class) public void testTranslateStreamTableJoinWithThetaCondition() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv" + " join testavro.PROFILE.`$table` as p" + " on p.id <> pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = SamzaException.class) public void testTranslateStreamTableCrossJoin() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = SamzaException.class) public void testTranslateStreamTableJoinWithAndLiteralCondition() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv" + " join testavro.PROFILE.`$table` as p" + " on p.id = pv.profileId and p.name = 'John'"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = SamzaException.class) public void testTranslateStreamTableJoinWithSubQuery() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv" + " where exists " @@ -311,83 +368,113 @@ public class TestQueryTranslator { + " where p.id = pv.profileId)"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = SamzaException.class) public void testTranslateTableTableJoin() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW.`$table` as pv" + " join testavro.PROFILE.`$table` as p" + " on p.id = pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = SamzaException.class) public void testTranslateStreamStreamJoin() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv" + " join testavro.PROFILE as p" + " on p.id = pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = SamzaException.class) public void testTranslateJoinWithIncorrectLeftJoin() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW.`$table` as pv" + " left join testavro.PROFILE as p" + " on p.id = pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = SamzaException.class) public void testTranslateJoinWithIncorrectRightJoin() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv" + " right join testavro.PROFILE.`$table` as p" + " on p.id = pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = SamzaException.class) @@ -398,55 +485,73 @@ public class TestQueryTranslator { config.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY, ConfigBasedIOResolverFactory.class.getName()); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv" + " join testavro.`$table` as p" + " on p.id = pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test (expected = SamzaException.class) public void testTranslateStreamTableInnerJoinWithUdf() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv" + " join testavro.PROFILE.`$table` as p" + " on MyTest(p.id) = MyTest(pv.profileId)"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); } @Test public void testTranslateStreamTableInnerJoin() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv" + " join testavro.PROFILE.`$table` as p" + " on p.id = pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + translator.translate(queryInfo.get(0), streamAppDesc); OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); StreamConfig streamConfig = new StreamConfig(samzaConfig); @@ -487,19 +592,25 @@ public class TestQueryTranslator { public void testTranslateStreamTableLeftJoin() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PAGEVIEW as pv" + " left join testavro.PROFILE.`$table` as p" + " on p.id = pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + translator.translate(queryInfo.get(0), streamAppDesc); OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); @@ -541,18 +652,24 @@ public class TestQueryTranslator { public void testTranslateStreamTableRightJoin() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); String sql = - "Insert into testavro.enrichedPageViewTopic" + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" + " select p.name as profileName, pv.pageKey" + " from testavro.PROFILE.`$table` as p" + " right join testavro.PAGEVIEW as pv" + " on p.id = pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); @@ -594,19 +711,25 @@ public class TestQueryTranslator { public void testTranslateGroupBy() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); String sql = - "Insert into testavro.pageViewCountTopic" + "Insert into testavro.pageViewCountTopic(jobName, pageKey, `count`)" + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`" + " from testavro.PAGEVIEW as pv" + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'" + " group by (pv.pageKey)"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + translator.translate(queryInfo.get(0), streamAppDesc); OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); Assert.assertEquals(1, specGraph.getInputOperators().size()); @@ -619,16 +742,22 @@ public class TestQueryTranslator { public void testTranslateGroupByWithSumAggregator() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); String sql = - "Insert into testavro.pageViewCountTopic" + "Insert into testavro.pageViewCountTopic(jobName, pageKey, `sum`)" + " select 'SampleJob' as jobName, pv.pageKey, sum(pv.profileId) as `sum`" + " from testavro.PAGEVIEW as pv" + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'" + " group by (pv.pageKey)"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - translator.translate(queryInfo, streamAppDesc); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); + translator.translate(queryInfo.get(0), streamAppDesc); } } http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index c211f03..919c91a 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -75,7 +75,28 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { TestAvroSystemFactory.messages.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); - String sql1 = "Insert into testavro.outputTopic select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) + MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1"; + String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; + List<String> sqlStmts = Arrays.asList(sql1); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<Integer> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) + .sorted() + .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages)); + } + + @Test + public void testEndToEndWithProjection() throws Exception { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + String sql1 = "Insert into testavro.outputTopic(id, long_value) " + + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) + MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); @@ -97,9 +118,9 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath()); String sql1 = - "Insert into testavro.outputTopic " - + "select Flatten(array_values) as string_value, id, bytes_value, fixed_value " - + "from testavro.COMPLEX1"; + "Insert into testavro.outputTopic(string_value, id, bytes_value, fixed_value, float_value) " + + " select Flatten(array_values) as string_value, id, bytes_value, fixed_value, float_value " + + " from testavro.COMPLEX1"; List<String> sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); @@ -121,7 +142,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { TestAvroSystemFactory.messages.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); String sql1 = - "Insert into testavro.outputTopic select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)"; + "Insert into testavro.outputTopic(id) select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)"; List<String> sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); @@ -142,7 +163,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); - String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1"; + String sql1 = "Insert into testavro.outputTopic(id, long_value) " + + "select id, MyTest(id) as long_value from testavro.SIMPLE1"; List<String> sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); @@ -166,7 +188,11 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); - String sql1 = "Insert into testavro.outputTopic select id from testavro.SIMPLE1 where RegexMatch('.*4', Name)"; + String sql1 = + "Insert into testavro.outputTopic(id) " + + "select id " + + "from testavro.SIMPLE1 " + + "where RegexMatch('.*4', name)"; List<String> sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); @@ -186,7 +212,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { staticConfigs.putAll(configs); String sql = "Insert into testavro.enrichedPageViewTopic " - + "select pv.pageKey, p.name as profileName " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName," + + " p.name as profileName, p.address as profileAddress " + "from testavro.PROFILE.`$table` as p " + "join testavro.PAGEVIEW as pv " + " on p.id = pv.profileId"; @@ -215,7 +242,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { staticConfigs.putAll(configs); String sql = "Insert into testavro.enrichedPageViewTopic " - + "select pv.pageKey, p.name as profileName, p.address as profileAddress " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName," + + " p.address as profileAddress " + "from testavro.PROFILE.`$table` as p " + "join testavro.PAGEVIEW as pv " + " on p.id = pv.profileId"; @@ -249,7 +277,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { staticConfigs.putAll(configs); String sql = "Insert into testavro.enrichedPageViewTopic " - + "select pv.pageKey, p.name as profileName " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName," + + " p.address as profileAddress " + "from testavro.PROFILE.`$table` as p " + "join testavro.PAGEVIEW as pv " + " on p.id = pv.profileId " @@ -282,7 +311,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true); String sql = "Insert into testavro.enrichedPageViewTopic " - + "select pv.pageKey, p.name as profileName " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName," + + " p.address as profileAddress " + "from testavro.PAGEVIEW as pv " + "join testavro.PROFILE.`$table` as p " + " on pv.profileId = p.id"; @@ -311,7 +341,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true); String sql = "Insert into testavro.enrichedPageViewTopic " - + "select pv.pageKey, p.name as profileName " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName," + + " p.address as profileAddress " + "from testavro.PAGEVIEW as pv " + "left join testavro.PROFILE.`$table` as p " + " on pv.profileId = p.id"; @@ -340,7 +371,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true); String sql = "Insert into testavro.enrichedPageViewTopic " - + "select pv.pageKey, p.name as profileName " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName," + + " p.address as profileAddress " + "from testavro.PROFILE.`$table` as p " + "right join testavro.PAGEVIEW as pv " + " on p.id = pv.profileId"; @@ -369,7 +401,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); String sql = "Insert into testavro.enrichedPageViewTopic " - + "select pv.pageKey, p.name as profileName, c.name as companyName " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as companyName, p.name as profileName," + + " p.address as profileAddress " + "from testavro.PAGEVIEW as pv " + "join testavro.PROFILE.`$table` as p " + " on p.id = pv.profileId " @@ -399,7 +432,8 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); String sql = "Insert into testavro.enrichedPageViewTopic " - + "select pv.pageKey, p.name as profileName, c.name as companyName " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as companyName, p.name as profileName," + + " p.address as profileAddress " + "from testavro.PAGEVIEW as pv " + "join testavro.PROFILE.`$table` as p " + " on p.id = pv.profileId "