Repository: samza
Updated Branches:
  refs/heads/master db6996ed9 -> dec16392d


http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index c9f59e6..e5d3659 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -22,7 +22,9 @@ package org.apache.samza.sql.translator;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
@@ -43,6 +45,9 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
+import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*;
+
+
 public class TestQueryTranslator {
 
   // Helper functions to validate the cloned copies of TranslatorContext and 
SamzaSqlExecutionContext
@@ -79,14 +84,20 @@ public class TestQueryTranslator {
   public void testTranslate() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select MyTest(id) from 
testavro.level1.level2.SIMPLE1 as s where s.id = 10");
+        "Insert into testavro.outputTopic(id) select MyTest(id) from 
testavro.level1.level2.SIMPLE1 as s where s.id = 10");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl appDesc = new 
StreamApplicationDescriptorImpl(descriptor -> { },samzaConfig);
+    StreamApplicationDescriptorImpl appDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
 
-    translator.translate(queryInfo, appDesc);
+    translator.translate(queryInfo.get(0), appDesc);
     OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -128,17 +139,20 @@ public class TestQueryTranslator {
   public void testTranslateComplex() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select Flatten(array_values) from 
testavro.COMPLEX1");
-//    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-//        "Insert into testavro.foo2 select string_value, SUM(id) from 
testavro.COMPLEX1 "
-//            + "GROUP BY TumbleWindow(CURRENT_TIME, INTERVAL '1' HOUR), 
string_value");
+        "Insert into testavro.outputTopic(string_value) select 
Flatten(array_values) from testavro.COMPLEX1");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -163,14 +177,21 @@ public class TestQueryTranslator {
   public void testTranslateSubQuery() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select Flatten(a), id from (select 
id, array_values a, string_value s from testavro.COMPLEX1)");
+        "Insert into testavro.outputTopic(string_value, id) select Flatten(a), 
id "
+            + " from (select id, array_values a, string_value s from 
testavro.COMPLEX1)");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -195,115 +216,151 @@ public class TestQueryTranslator {
   public void testTranslateStreamTableJoinWithoutJoinOperator() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"
             + " where p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableJoinWithFullJoinOperator() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " full join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = IllegalStateException.class)
   public void testTranslateStreamTableJoinWithSelfJoinOperator() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName)"
             + " select p1.name as profileName"
             + " from testavro.PROFILE.`$table` as p1"
             + " join testavro.PROFILE.`$table` as p2"
             + " on p1.id = p2.id";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableJoinWithThetaCondition() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.PROFILE.`$table` as p"
             + " on p.id <> pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableCrossJoin() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableJoinWithAndLiteralCondition() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId and p.name = 'John'";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableJoinWithSubQuery() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " where exists "
@@ -311,83 +368,113 @@ public class TestQueryTranslator {
             + " where p.id = pv.profileId)";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateTableTableJoin() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW.`$table` as pv"
             + " join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamStreamJoin() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.PROFILE as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateJoinWithIncorrectLeftJoin() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW.`$table` as pv"
             + " left join testavro.PROFILE as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateJoinWithIncorrectRightJoin() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " right join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
@@ -398,55 +485,73 @@ public class TestQueryTranslator {
     config.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
         ConfigBasedIOResolverFactory.class.getName());
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableInnerJoinWithUdf() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.PROFILE.`$table` as p"
             + " on MyTest(p.id) = MyTest(pv.profileId)";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
   @Test
   public void testTranslateStreamTableInnerJoin() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     StreamConfig streamConfig = new StreamConfig(samzaConfig);
@@ -487,19 +592,25 @@ public class TestQueryTranslator {
   public void testTranslateStreamTableLeftJoin() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PAGEVIEW as pv"
             + " left join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
 
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
@@ -541,18 +652,24 @@ public class TestQueryTranslator {
   public void testTranslateStreamTableRightJoin() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.enrichedPageViewTopic"
+        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
             + " select p.name as profileName, pv.pageKey"
             + " from testavro.PROFILE.`$table` as p"
             + " right join testavro.PAGEVIEW as pv"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
 
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
@@ -594,19 +711,25 @@ public class TestQueryTranslator {
   public void testTranslateGroupBy() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.pageViewCountTopic"
+        "Insert into testavro.pageViewCountTopic(jobName, pageKey, `count`)"
             + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`"
             + " from testavro.PAGEVIEW as pv"
             + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'"
             + " group by (pv.pageKey)";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
 
-    translator.translate(queryInfo, streamAppDesc);
+    translator.translate(queryInfo.get(0), streamAppDesc);
     OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
 
     Assert.assertEquals(1, specGraph.getInputOperators().size());
@@ -619,16 +742,22 @@ public class TestQueryTranslator {
   public void testTranslateGroupByWithSumAggregator() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
     String sql =
-        "Insert into testavro.pageViewCountTopic"
+        "Insert into testavro.pageViewCountTopic(jobName, pageKey, `sum`)"
             + " select 'SampleJob' as jobName, pv.pageKey, sum(pv.profileId) 
as `sum`"
             + " from testavro.PAGEVIEW as pv" + " where pv.pageKey = 'job' or 
pv.pageKey = 'inbox'"
             + " group by (pv.pageKey)";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config));
+
+    List<String> sqlStmts = fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toSet()),
+        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
-    SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig);
-    translator.translate(queryInfo, streamAppDesc);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
+    translator.translate(queryInfo.get(0), streamAppDesc);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index c211f03..919c91a 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -75,7 +75,28 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id, 
TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) + MONTH(CURRENT_DATE) as 
long_value from testavro.SIMPLE1";
+    String sql1 = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    Assert.assertTrue(IntStream.range(0, 
numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
+  }
+
+  @Test
+  public void testEndToEndWithProjection() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+        + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) 
+ MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
@@ -97,9 +118,9 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     LOG.info(" Class Path : " + 
RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
     String sql1 =
-        "Insert into testavro.outputTopic "
-            + "select Flatten(array_values) as string_value, id, bytes_value, 
fixed_value "
-            + "from testavro.COMPLEX1";
+        "Insert into testavro.outputTopic(string_value, id, bytes_value, 
fixed_value, float_value) "
+            + " select Flatten(array_values) as string_value, id, bytes_value, 
fixed_value, float_value "
+            + " from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
@@ -121,7 +142,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql1 =
-        "Insert into testavro.outputTopic select Flatten(a) as id from (select 
MyTestArray(id) a from testavro.SIMPLE1)";
+        "Insert into testavro.outputTopic(id) select Flatten(a) as id from 
(select MyTestArray(id) a from testavro.SIMPLE1)";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
@@ -142,7 +163,8 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as 
long_value from testavro.SIMPLE1";
+    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+        + "select id, MyTest(id) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
@@ -166,7 +188,11 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id from 
testavro.SIMPLE1 where RegexMatch('.*4', Name)";
+    String sql1 =
+        "Insert into testavro.outputTopic(id) "
+            + "select id "
+            + "from testavro.SIMPLE1 "
+            + "where RegexMatch('.*4', name)";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
@@ -186,7 +212,8 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     staticConfigs.putAll(configs);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, 
coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
             + "from testavro.PROFILE.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
             + " on p.id = pv.profileId";
@@ -215,7 +242,8 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     staticConfigs.putAll(configs);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName, p.address as 
profileAddress "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as 
companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PROFILE.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
             + " on p.id = pv.profileId";
@@ -249,7 +277,8 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     staticConfigs.putAll(configs);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as 
companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PROFILE.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
             + " on p.id = pv.profileId "
@@ -282,7 +311,8 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as 
companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PAGEVIEW as pv "
             + "join testavro.PROFILE.`$table` as p "
             + " on pv.profileId = p.id";
@@ -311,7 +341,8 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as 
companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PAGEVIEW as pv "
             + "left join testavro.PROFILE.`$table` as p "
             + " on pv.profileId = p.id";
@@ -340,7 +371,8 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as 
companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PROFILE.`$table` as p "
             + "right join testavro.PAGEVIEW as pv "
             + " on p.id = pv.profileId";
@@ -369,7 +401,8 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName, c.name as companyName 
"
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as 
companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PAGEVIEW as pv "
             + "join testavro.PROFILE.`$table` as p "
             + " on p.id = pv.profileId "
@@ -399,7 +432,8 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
-            + "select pv.pageKey, p.name as profileName, c.name as companyName 
"
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as 
companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
             + "from testavro.PAGEVIEW as pv "
             + "join testavro.PROFILE.`$table` as p "
             + " on p.id = pv.profileId "

Reply via email to