SAMZA-1671: sql: initial insert support for table destination

Table is another main type of IO abstraction in Samza which supports
both read and write (optional). For the tables that do support writes, we
should be able to allow Samza SQL users to write a query to do that. One
example is to insert into a database. The current code only supports
inserting into a stream. This change adds the initial support for table
insert operation.

Author: Peng Du <p...@linkedin.com>

Reviewers: Jagadish <jagad...@apache.org>, Srini P<spun...@linkedin.com>

Closes #465 from pdu-mn1/sql-insert-table


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2f4d0b95
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2f4d0b95
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2f4d0b95

Branch: refs/heads/master
Commit: 2f4d0b953c635e8676b83cc1a4a28213dc21a49d
Parents: 3cc2a05
Author: Peng Du <p...@linkedin.com>
Authored: Fri Apr 20 09:18:31 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Fri Apr 20 09:18:31 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/config/JavaTableConfig.java    |   2 +-
 .../sql/impl/ConfigBasedIOResolverFactory.java  | 122 ++++++++++++
 .../impl/ConfigBasedSourceResolverFactory.java  | 109 -----------
 .../samza/sql/impl/ConfigBasedUdfResolver.java  |   1 -
 .../samza/sql/interfaces/SourceResolver.java    |  44 -----
 .../sql/interfaces/SourceResolverFactory.java   |  36 ----
 .../samza/sql/interfaces/SqlIOConfig.java       | 136 +++++++++++++
 .../samza/sql/interfaces/SqlIOResolver.java     |  45 +++++
 .../sql/interfaces/SqlIOResolverFactory.java    |  36 ++++
 .../sql/interfaces/SqlSystemSourceConfig.java   | 129 ------------
 .../apache/samza/sql/planner/QueryPlanner.java  |  10 +-
 .../sql/runner/SamzaSqlApplicationConfig.java   |  50 ++---
 .../sql/runner/SamzaSqlApplicationRunner.java   |  12 +-
 .../samza/sql/testutil/SamzaSqlQueryParser.java |  62 +++---
 .../samza/sql/translator/JoinTranslator.java    |  66 ++++---
 .../samza/sql/translator/QueryTranslator.java   |  39 +++-
 .../samza/sql/translator/ScanTranslator.java    |   8 +-
 .../apache/samza/sql/TestQueryTranslator.java   |   8 +-
 .../sql/TestSamzaSqlApplicationConfig.java      |  14 +-
 .../samza/sql/TestSamzaSqlQueryParser.java      |  14 +-
 .../apache/samza/sql/e2e/TestSamzaSqlTable.java |  69 +++++++
 .../samza/sql/testutil/SamzaSqlTestConfig.java  |  34 +++-
 .../sql/testutil/TestIOResolverFactory.java     | 195 +++++++++++++++++++
 .../sql/testutil/TestSourceResolverFactory.java |  66 -------
 .../org/apache/samza/tools/SamzaSqlConsole.java |  28 +--
 25 files changed, 799 insertions(+), 536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
index 6cc3986..ed013c4 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
@@ -53,7 +53,7 @@ public class JavaTableConfig extends MapConfig {
     Config subConfig = subset(TABLES_PREFIX, true);
     Set<String> tableNames = subConfig.keySet().stream()
         .filter(k -> k.endsWith(TABLE_PROVIDER_FACTORY_SUFFIX))
-        .map(k -> k.substring(0, k.indexOf(".")))
+        .map(k -> k.replace(TABLE_PROVIDER_FACTORY_SUFFIX, ""))
         .collect(Collectors.toSet());
     return new LinkedList<>(tableNames);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
new file mode 100644
index 0000000..0887dc4
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -0,0 +1,122 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.impl;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Source Resolver implementation that uses static config to return a config 
corresponding to a system stream.
+ * This Source resolver implementation supports sources of type 
{systemName}.{streamName}[.$table]
+ * {systemName}.{streamName} indicates a stream
+ * {systemName}.{streamName}.$table indicates a table
+ */
+public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConfigBasedIOResolverFactory.class);
+
+  public static final String CFG_FMT_SAMZA_PREFIX = "systems.%s.";
+
+  @Override
+  public SqlIOResolver create(Config config) {
+    return new ConfigBasedIOResolver(config);
+  }
+
+  private class ConfigBasedIOResolver implements SqlIOResolver {
+    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
+    private final Config config;
+
+    public ConfigBasedIOResolver(Config config) {
+      this.config = config;
+    }
+
+    @Override
+    public SqlIOConfig fetchSourceInfo(String source) {
+      String[] sourceComponents = source.split("\\.");
+      boolean isTable = isTable(sourceComponents);
+
+      // This source resolver expects sources of format 
{systemName}.{streamName}[.$table]
+      //  * First source part is always system name.
+      //  * The last source part could be either a "$table" keyword or stream 
name. If it is "$table", then stream name
+      //    should be the one before the last source part.
+      int endIdx = sourceComponents.length - 1;
+      int streamIdx = isTable ? endIdx - 1 : endIdx;
+      boolean invalidQuery = false;
+
+      if (sourceComponents.length != 2) {
+        if (sourceComponents.length != 3 ||
+            
!sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+          invalidQuery = true;
+        }
+      } else {
+        if 
(sourceComponents[0].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD) ||
+            
sourceComponents[1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+          invalidQuery = true;
+        }
+      }
+
+      if (invalidQuery) {
+        String msg = String.format("Source %s is not of the format 
{systemName}.{streamName}[.%s]", source,
+            SAMZA_SQL_QUERY_TABLE_KEYWORD);
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+
+      String systemName = sourceComponents[0];
+      String streamName = sourceComponents[streamIdx];
+
+      TableDescriptor tableDescriptor = null;
+      if (isTable) {
+        tableDescriptor = new RocksDbTableDescriptor("InputTable-" + source)
+            .withSerde(KVSerde.of(
+                new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
+                new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+      }
+
+      return new SqlIOConfig(systemName, streamName, 
fetchSystemConfigs(systemName), tableDescriptor);
+    }
+
+    @Override
+    public SqlIOConfig fetchSinkInfo(String sink) {
+      throw new NotImplementedException("No sink support in 
ConfigBasedIOResolver.");
+    }
+
+    private boolean isTable(String[] sourceComponents) {
+      return sourceComponents[sourceComponents.length - 
1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD);
+    }
+
+    private Config fetchSystemConfigs(String systemName) {
+      return config.subset(systemName + ".");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
deleted file mode 100644
index 5348d3d..0000000
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.impl;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.interfaces.SourceResolverFactory;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Source Resolver implementation that uses static config to return a config 
corresponding to a system stream.
- * This Source resolver implementation supports sources of type 
{systemName}.{streamName}[.$table]
- * {systemName}.{streamName} indicates a stream
- * {systemName}.{streamName}.$table indicates a table
- */
-public class ConfigBasedSourceResolverFactory implements SourceResolverFactory 
{
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ConfigBasedSourceResolverFactory.class);
-
-  public static final String CFG_FMT_SAMZA_PREFIX = "systems.%s.";
-
-  @Override
-  public SourceResolver create(Config config) {
-    return new ConfigBasedSourceResolver(config);
-  }
-
-  private class ConfigBasedSourceResolver implements SourceResolver {
-    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
-    private final Config config;
-
-    public ConfigBasedSourceResolver(Config config) {
-      this.config = config;
-    }
-
-    @Override
-    public SqlSystemSourceConfig fetchSourceInfo(String source) {
-      String[] sourceComponents = source.split("\\.");
-      boolean isTable = false;
-
-      // This source resolver expects sources of format 
{systemName}.{streamName}[.$table]
-      //  * First source part is always system name.
-      //  * The last source part could be either a "$table" keyword or stream 
name. If it is "$table", then stream name
-      //    should be the one before the last source part.
-      int endIdx = sourceComponents.length - 1;
-      int streamIdx = endIdx;
-      boolean invalidQuery = false;
-
-      if (sourceComponents.length != 2) {
-        if (sourceComponents.length != 3 ||
-            
!sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
-          invalidQuery = true;
-        }
-      } else {
-        if 
(sourceComponents[0].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD) ||
-            
sourceComponents[1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
-          invalidQuery = true;
-        }
-      }
-
-      if (invalidQuery) {
-        String msg = String.format("Source %s is not of the format 
{systemName}.{streamName}[.%s]", source,
-            SAMZA_SQL_QUERY_TABLE_KEYWORD);
-        LOG.error(msg);
-        throw new SamzaException(msg);
-      }
-
-      if 
(sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
-        isTable = true;
-        streamIdx = endIdx - 1;
-      }
-
-      String systemName = sourceComponents[0];
-      String streamName = sourceComponents[streamIdx];
-
-      return new SqlSystemSourceConfig(systemName, streamName, 
fetchSystemConfigs(systemName), isTable);
-    }
-
-    @Override
-    public boolean isTable(String sourceName) {
-      String[] sourceComponents = sourceName.split("\\.");
-      return sourceComponents[sourceComponents.length - 
1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD);
-    }
-
-    private Config fetchSystemConfigs(String systemName) {
-      return config.subset(systemName + ".");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
index 412ff3b..a7eed84 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import org.apache.commons.lang.StringUtils;

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
deleted file mode 100644
index c161a0d..0000000
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.interfaces;
-
-/**
- * Source Resolvers are used by Samza Sql application to fetch the {@link 
SqlSystemSourceConfig} corresponding to the source.
- */
-public interface SourceResolver {
-  /**
-   * Returns the SystemStream config corresponding to the source name
-   * @param sourceName
-   *  source whose systemstreamconfig needs to be fetched.
-   * @return
-   *  System stream config corresponding to the source.
-   */
-  SqlSystemSourceConfig fetchSourceInfo(String sourceName);
-
-  /**
-   * Returns if a given source is a table. Different source resolvers could 
have different notations in the source
-   * name for denoting a table. Eg: system.stream.$table
-   * @param sourceName
-   *  source that needs to be checked if it is a table.
-   * @return
-   *  true if the source is a table, else false.
-   */
-  boolean isTable(String sourceName);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
deleted file mode 100644
index 274a6b1..0000000
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.interfaces;
-
-import org.apache.samza.config.Config;
-
-
-/**
- * Factory that is used to create {@link SourceResolver}.
- */
-public interface SourceResolverFactory {
-
-  /**
-   * Create the {@link SourceResolver}. This is called during the application 
initialization.
-   * @param config config for the SourceResolver
-   * @return Returns the created {@link SourceResolver}
-   */
-  SourceResolver create(Config config);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
new file mode 100644
index 0000000..3a73e09
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Configs associated with an IO resource. Both stream and table resources are 
supported.
+ */
+public class SqlIOConfig {
+
+  public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
+  public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
+
+  private final String systemName;
+
+  private final String streamName;
+
+  private final String samzaRelConverterName;
+  private final SystemStream systemStream;
+
+  private final String source;
+  private final String relSchemaProviderName;
+
+  private final Config config;
+
+  private final List<String> sourceParts;
+
+  private final Optional<TableDescriptor> tableDescriptor;
+
+  public SqlIOConfig(String systemName, String streamName, Config 
systemConfig) {
+    this(systemName, streamName, Arrays.asList(systemName, streamName), 
systemConfig, null);
+  }
+
+  public SqlIOConfig(String systemName, String streamName, Config 
systemConfig, TableDescriptor tableDescriptor) {
+    this(systemName, streamName, Arrays.asList(systemName, streamName), 
systemConfig, tableDescriptor);
+  }
+
+  public SqlIOConfig(String systemName, String streamName, List<String> 
sourceParts,
+      Config systemConfig, TableDescriptor tableDescriptor) {
+    HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
+    this.systemName = systemName;
+    this.streamName = streamName;
+    this.source = getSourceFromSourceParts(sourceParts);
+    this.sourceParts = sourceParts;
+    this.systemStream = new SystemStream(systemName, streamName);
+    this.tableDescriptor = Optional.ofNullable(tableDescriptor);
+
+    samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
+    Validate.notEmpty(samzaRelConverterName,
+        String.format("%s is not set or empty for system %s", 
CFG_SAMZA_REL_CONVERTER, systemName));
+
+    relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
+
+    // Removing the Samza SQL specific configs to get the remaining Samza 
configs.
+    streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
+    streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
+
+    // Currently, only local table is supported. And it is assumed that all 
tables are local tables.
+    if (tableDescriptor != null) {
+      streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), 
streamName), "true");
+      
streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(),
 streamName), "oldest");
+    }
+
+    config = new MapConfig(streamConfigs);
+  }
+
+  public static String getSourceFromSourceParts(List<String> sourceParts) {
+    return Joiner.on(".").join(sourceParts);
+  }
+
+  public List<String> getSourceParts() {
+    return sourceParts;
+  }
+
+  public String getSystemName() {
+    return systemName;
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public String getSamzaRelConverterName() {
+    return samzaRelConverterName;
+  }
+
+  public String getRelSchemaProviderName() {
+    return relSchemaProviderName;
+  }
+
+  public SystemStream getSystemStream() {
+    return systemStream;
+  }
+
+  public Config getConfig() {
+    return config;
+  }
+
+  public String getSource() {
+    return source;
+  }
+
+  public Optional<TableDescriptor> getTableDescriptor() {
+    return tableDescriptor;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolver.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolver.java 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolver.java
new file mode 100644
index 0000000..b2c3cd7
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolver.java
@@ -0,0 +1,45 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+
+/**
+ * IO Resolvers are used by Samza SQL application to fetch the {@link 
SqlIOConfig} corresponding
+ * to the input and output system, including both Samza stream and table 
systems.
+ */
+public interface SqlIOResolver {
+  /**
+   * Returns the SQL IO config corresponding to the source name
+   * @param sourceName
+   *  source whose IOConfig needs to be fetched.
+   * @return
+   *  IOConfig corresponding to the source.
+   */
+  SqlIOConfig fetchSourceInfo(String sourceName);
+
+  /**
+   * Returns the SQL IO config corresponding to the sink name
+   * @param sinkName
+   *  sink whose IOConfig needs to be fetched.
+   * @return
+   *  IOConfig corresponding to the sink.
+   */
+  SqlIOConfig fetchSinkInfo(String sinkName);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolverFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolverFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolverFactory.java
new file mode 100644
index 0000000..6efa57c
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOResolverFactory.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Factory that is used to create {@link SqlIOResolver}.
+ */
+public interface SqlIOResolverFactory {
+
+  /**
+   * Create the {@link SqlIOResolver}. This is called during the application 
initialization.
+   * @param config config for the SqlIOResolver
+   * @return Returns the created {@link SqlIOResolver}
+   */
+  SqlIOResolver create(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
deleted file mode 100644
index 02ec18a..0000000
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.interfaces;
-
-import com.google.common.base.Joiner;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import org.apache.commons.lang.Validate;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * Configs associated with a system source. Both streams and table sources are 
supported.
- * For now, only local tables are supported.
- */
-public class SqlSystemSourceConfig {
-
-  public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
-  public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
-
-  private final String systemName;
-
-  private final String streamName;
-
-  private final String samzaRelConverterName;
-  private final SystemStream systemStream;
-
-  private final String source;
-  private String relSchemaProviderName;
-
-  private Config config;
-
-  private List<String> sourceParts;
-
-  public SqlSystemSourceConfig(String systemName, String streamName, Config 
systemConfig) {
-    this(systemName, streamName, Arrays.asList(systemName, streamName), 
systemConfig, false);
-  }
-
-  public SqlSystemSourceConfig(String systemName, String streamName, Config 
systemConfig, boolean isTable) {
-    this(systemName, streamName, Arrays.asList(systemName, streamName), 
systemConfig, isTable);
-  }
-
-  public SqlSystemSourceConfig(String systemName, String streamName, 
List<String> sourceParts,
-      Config systemConfig, boolean isTable) {
-
-
-    HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
-    this.systemName = systemName;
-    this.streamName = streamName;
-    this.source = getSourceFromSourceParts(sourceParts);
-    this.sourceParts = sourceParts;
-    this.systemStream = new SystemStream(systemName, streamName);
-
-    samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
-    Validate.notEmpty(samzaRelConverterName,
-        String.format("%s is not set or empty for system %s", 
CFG_SAMZA_REL_CONVERTER, systemName));
-
-    relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
-
-    // Removing the Samza SQL specific configs to get the remaining Samza 
configs.
-    streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
-    streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
-
-    // Currently, only local table is supported. And it is assumed that all 
tables are local tables.
-    if (isTable) {
-      streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), 
streamName), "true");
-      
streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(),
 streamName), "oldest");
-    }
-
-    config = new MapConfig(streamConfigs);
-  }
-
-  public static String getSourceFromSourceParts(List<String> sourceParts) {
-    return Joiner.on(".").join(sourceParts);
-  }
-
-  public List<String> getSourceParts() {
-    return sourceParts;
-  }
-
-  public String getSystemName() {
-    return systemName;
-  }
-
-  public String getStreamName() {
-    return streamName;
-  }
-
-  public String getSamzaRelConverterName() {
-    return samzaRelConverterName;
-  }
-
-  public String getRelSchemaProviderName() {
-    return relSchemaProviderName;
-  }
-
-  public SystemStream getSystemStream() {
-    return systemStream;
-  }
-
-  public Config getConfig() {
-    return config;
-  }
-
-  public String getSource() {
-    return source;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index f21eccf..8ebb885 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -55,7 +55,7 @@ import org.apache.calcite.tools.Planner;
 import org.apache.samza.SamzaException;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,11 +72,11 @@ public class QueryPlanner {
   // Mapping between the source to the RelSchemaProvider corresponding to the 
source.
   private final Map<String, RelSchemaProvider> relSchemaProviders;
 
-  // Mapping between the source to the SqlSystemSourceConfig corresponding to 
the source.
-  private final Map<String, SqlSystemSourceConfig> systemStreamConfigBySource;
+  // Mapping between the source to the SqlIOConfig corresponding to the source.
+  private final Map<String, SqlIOConfig> systemStreamConfigBySource;
 
   public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
-      Map<String, SqlSystemSourceConfig> systemStreamConfigBySource, 
Collection<UdfMetadata> udfMetadata) {
+      Map<String, SqlIOConfig> systemStreamConfigBySource, 
Collection<UdfMetadata> udfMetadata) {
     this.relSchemaProviders = relSchemaProviders;
     this.systemStreamConfigBySource = systemStreamConfigBySource;
     this.udfMetadata = udfMetadata;
@@ -88,7 +88,7 @@ public class QueryPlanner {
       CalciteConnection calciteConnection = 
connection.unwrap(CalciteConnection.class);
       SchemaPlus rootSchema = calciteConnection.getRootSchema();
 
-      for (SqlSystemSourceConfig ssc : systemStreamConfigBySource.values()) {
+      for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
         SchemaPlus previousLevelSchema = rootSchema;
         List<String> sourceParts = ssc.getSourceParts();
         RelSchemaProvider relSchemaProvider = 
relSchemaProviders.get(ssc.getSource());

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index bcefae2..b7d9a59 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -40,9 +40,9 @@ import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
-import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.interfaces.SourceResolverFactory;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.apache.samza.sql.interfaces.UdfResolver;
 import org.apache.samza.sql.testutil.JsonUtil;
@@ -72,8 +72,8 @@ public class SamzaSqlApplicationConfig {
   public static final String CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN = 
"samza.sql.relSchemaProvider.%s.";
   public static final String CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN = 
"samza.sql.relConverter.%s.";
 
-  public static final String CFG_SOURCE_RESOLVER = "samza.sql.sourceResolver";
-  public static final String CFG_FMT_SOURCE_RESOLVER_DOMAIN = 
"samza.sql.sourceResolver.%s.";
+  public static final String CFG_IO_RESOLVER = "samza.sql.ioResolver";
+  public static final String CFG_FMT_SOURCE_RESOLVER_DOMAIN = 
"samza.sql.ioResolver.%s.";
 
   public static final String CFG_UDF_RESOLVER = "samza.sql.udfResolver";
   public static final String CFG_FMT_UDF_RESOLVER_DOMAIN = 
"samza.sql.udfResolver.%s.";
@@ -85,13 +85,13 @@ public class SamzaSqlApplicationConfig {
   private final Map<String, RelSchemaProvider> relSchemaProvidersBySource;
   private final Map<String, SamzaRelConverter> samzaRelConvertersBySource;
 
-  private SourceResolver sourceResolver;
+  private SqlIOResolver ioResolver;
   private UdfResolver udfResolver;
 
   private final Collection<UdfMetadata> udfMetadata;
 
-  private final Map<String, SqlSystemSourceConfig> 
inputSystemStreamConfigBySource;
-  private final Map<String, SqlSystemSourceConfig> 
outputSystemStreamConfigsBySource;
+  private final Map<String, SqlIOConfig> inputSystemStreamConfigBySource;
+  private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource;
 
   private final List<String> sql;
 
@@ -105,31 +105,31 @@ public class SamzaSqlApplicationConfig {
 
     queryInfo = fetchQueryInfo(sql);
 
-    sourceResolver = createSourceResolver(staticConfig);
+    ioResolver = createIOResolver(staticConfig);
 
     udfResolver = createUdfResolver(staticConfig);
     udfMetadata = udfResolver.getUdfs();
 
     inputSystemStreamConfigBySource = queryInfo.stream()
-        .map(QueryInfo::getInputSources)
+        .map(QueryInfo::getSources)
         .flatMap(Collection::stream)
-        .collect(Collectors.toMap(Function.identity(), 
sourceResolver::fetchSourceInfo));
+        .collect(Collectors.toMap(Function.identity(), src -> 
ioResolver.fetchSourceInfo(src)));
 
-    Set<SqlSystemSourceConfig> systemStreamConfigs = new 
HashSet<>(inputSystemStreamConfigBySource.values());
+    Set<SqlIOConfig> systemStreamConfigs = new 
HashSet<>(inputSystemStreamConfigBySource.values());
 
     outputSystemStreamConfigsBySource = queryInfo.stream()
-        .map(QueryInfo::getOutputSource)
-        .collect(Collectors.toMap(Function.identity(), x -> 
sourceResolver.fetchSourceInfo(x)));
+        .map(QueryInfo::getSink)
+        .collect(Collectors.toMap(Function.identity(), x -> 
ioResolver.fetchSinkInfo(x)));
     systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
 
     relSchemaProvidersBySource = systemStreamConfigs.stream()
-        .collect(Collectors.toMap(SqlSystemSourceConfig::getSource,
+        .collect(Collectors.toMap(SqlIOConfig::getSource,
             x -> initializePlugin("RelSchemaProvider", 
x.getRelSchemaProviderName(), staticConfig,
                 CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
                 (o, c) -> ((RelSchemaProviderFactory) 
o).create(x.getSystemStream(), c))));
 
     samzaRelConvertersBySource = systemStreamConfigs.stream()
-        .collect(Collectors.toMap(SqlSystemSourceConfig::getSource,
+        .collect(Collectors.toMap(SqlIOConfig::getSource,
             x -> initializePlugin("SamzaRelConverter", 
x.getSamzaRelConverterName(), staticConfig,
                 CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> 
((SamzaRelConverterFactory) o).create(x.getSystemStream(),
                     relSchemaProvidersBySource.get(x.getSource()), c))));
@@ -183,11 +183,11 @@ public class SamzaSqlApplicationConfig {
     return JsonUtil.toJson(sqlStmts);
   }
 
-  public static SourceResolver createSourceResolver(Config config) {
-    String sourceResolveValue = config.get(CFG_SOURCE_RESOLVER);
-    Validate.notEmpty(sourceResolveValue, "sourceResolver config is not set or 
empty");
-    return initializePlugin("SourceResolver", sourceResolveValue, config, 
CFG_FMT_SOURCE_RESOLVER_DOMAIN,
-        (o, c) -> ((SourceResolverFactory) o).create(c));
+  public static SqlIOResolver createIOResolver(Config config) {
+    String sourceResolveValue = config.get(CFG_IO_RESOLVER);
+    Validate.notEmpty(sourceResolveValue, "ioResolver config is not set or 
empty");
+    return initializePlugin("SqlIOResolver", sourceResolveValue, config, 
CFG_FMT_SOURCE_RESOLVER_DOMAIN,
+        (o, c) -> ((SqlIOResolverFactory) o).create(c));
   }
 
   private UdfResolver createUdfResolver(Map<String, String> config) {
@@ -234,11 +234,11 @@ public class SamzaSqlApplicationConfig {
     return udfMetadata;
   }
 
-  public Map<String, SqlSystemSourceConfig> 
getInputSystemStreamConfigBySource() {
+  public Map<String, SqlIOConfig> getInputSystemStreamConfigBySource() {
     return inputSystemStreamConfigBySource;
   }
 
-  public Map<String, SqlSystemSourceConfig> 
getOutputSystemStreamConfigsBySource() {
+  public Map<String, SqlIOConfig> getOutputSystemStreamConfigsBySource() {
     return outputSystemStreamConfigsBySource;
   }
 
@@ -250,8 +250,8 @@ public class SamzaSqlApplicationConfig {
     return relSchemaProvidersBySource;
   }
 
-  public SourceResolver getSourceResolver() {
-    return sourceResolver;
+  public SqlIOResolver getIoResolver() {
+    return ioResolver;
   }
 
   public long getWindowDurationMs() {

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index f54ca42..4497a7c 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -31,8 +31,8 @@ import org.apache.samza.runtime.AbstractApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.runtime.RemoteApplicationRunner;
-import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +70,7 @@ public class SamzaSqlApplicationRunner extends 
AbstractApplicationRunner {
   public static Config computeSamzaConfigs(Boolean localRunner, Config config) 
{
     Map<String, String> newConfig = new HashMap<>();
 
-    SourceResolver sourceResolver = 
SamzaSqlApplicationConfig.createSourceResolver(config);
+    SqlIOResolver ioResolver = 
SamzaSqlApplicationConfig.createIOResolver(config);
     // Parse the sql and find the input stream streams
     List<String> sqlStmts = 
SamzaSqlApplicationConfig.fetchSqlFromConfig(config);
 
@@ -81,14 +81,14 @@ public class SamzaSqlApplicationRunner extends 
AbstractApplicationRunner {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = 
SamzaSqlApplicationConfig.fetchQueryInfo(sqlStmts);
     for (SamzaSqlQueryParser.QueryInfo query : queryInfo) {
       // Populate stream to system mapping config for input and output system 
streams
-      for (String inputSource : query.getInputSources()) {
-        SqlSystemSourceConfig inputSystemStreamConfig = 
sourceResolver.fetchSourceInfo(inputSource);
+      for (String inputSource : query.getSources()) {
+        SqlIOConfig inputSystemStreamConfig = 
ioResolver.fetchSourceInfo(inputSource);
         newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, 
inputSystemStreamConfig.getStreamName()),
             inputSystemStreamConfig.getSystemName());
         newConfig.putAll(inputSystemStreamConfig.getConfig());
       }
 
-      SqlSystemSourceConfig outputSystemStreamConfig = 
sourceResolver.fetchSourceInfo(query.getOutputSource());
+      SqlIOConfig outputSystemStreamConfig = 
ioResolver.fetchSinkInfo(query.getSink());
       newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, 
outputSystemStreamConfig.getStreamName()),
           outputSystemStreamConfig.getSystemName());
       newConfig.putAll(outputSystemStreamConfig.getConfig());

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
index faf903a..39ea092 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
@@ -52,7 +52,7 @@ import org.apache.samza.SamzaException;
 
 
 /**
- * Utility class that is used to parse the Samza sql query to figure out the 
inputs, outputs etc..
+ * Utility class that is used to parse the Samza sql query to figure out the 
sources, sink etc..
  */
 public class SamzaSqlQueryParser {
 
@@ -60,26 +60,26 @@ public class SamzaSqlQueryParser {
   }
 
   public static class QueryInfo {
-    private final List<String> inputSources;
+    private final List<String> sources;
     private String selectQuery;
-    private String outputSource;
+    private String sink;
 
-    public QueryInfo(String selectQuery, List<String> inputSources, String 
outputSource) {
+    public QueryInfo(String selectQuery, List<String> sources, String sink) {
       this.selectQuery = selectQuery;
-      this.outputSource = outputSource;
-      this.inputSources = inputSources;
+      this.sink = sink;
+      this.sources = sources;
     }
 
-    public List<String> getInputSources() {
-      return inputSources;
+    public List<String> getSources() {
+      return sources;
     }
 
     public String getSelectQuery() {
       return selectQuery;
     }
 
-    public String getOutputSource() {
-      return outputSource;
+    public String getSink() {
+      return sink;
     }
   }
 
@@ -99,16 +99,16 @@ public class SamzaSqlQueryParser {
       throw new SamzaException(e);
     }
 
-    String outputSource;
+    String sink;
     String selectQuery;
-    ArrayList<String> inputSources;
+    ArrayList<String> sources;
     if (sqlNode instanceof SqlInsert) {
       SqlInsert sqlInsert = ((SqlInsert) sqlNode);
-      outputSource = sqlInsert.getTargetTable().toString();
+      sink = sqlInsert.getTargetTable().toString();
       if (sqlInsert.getSource() instanceof SqlSelect) {
         SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
         selectQuery = m.group(2);
-        inputSources = getInputsFromSelectQuery(sqlSelect);
+        sources = getSourcesFromSelectQuery(sqlSelect);
       } else {
         throw new SamzaException("Sql query is not of the expected format");
       }
@@ -116,7 +116,7 @@ public class SamzaSqlQueryParser {
       throw new SamzaException("Sql query is not of the expected format");
     }
 
-    return new QueryInfo(selectQuery, inputSources, outputSource);
+    return new QueryInfo(selectQuery, sources, sink);
   }
 
   private static Planner createPlanner() {
@@ -146,38 +146,38 @@ public class SamzaSqlQueryParser {
     return Frameworks.getPlanner(frameworkConfig);
   }
 
-  private static ArrayList<String> getInputsFromSelectQuery(SqlSelect 
sqlSelect) {
-    ArrayList<String> input = new ArrayList<>();
-    getInput(sqlSelect.getFrom(), input);
-    if (input.size() < 1) {
+  private static ArrayList<String> getSourcesFromSelectQuery(SqlSelect 
sqlSelect) {
+    ArrayList<String> sources = new ArrayList<>();
+    getSource(sqlSelect.getFrom(), sources);
+    if (sources.size() < 1) {
       throw new SamzaException("Unsupported query " + sqlSelect);
     }
 
-    return input;
+    return sources;
   }
 
-  private static void getInput(SqlNode node, ArrayList<String> 
inputSourceList) {
+  private static void getSource(SqlNode node, ArrayList<String> sourceList) {
     if (node instanceof SqlJoin) {
       SqlJoin joinNode = (SqlJoin) node;
-      ArrayList<String> inputsLeft = new ArrayList<>();
-      ArrayList<String> inputsRight = new ArrayList<>();
-      getInput(joinNode.getLeft(), inputsLeft);
-      getInput(joinNode.getRight(), inputsRight);
+      ArrayList<String> sourcesLeft = new ArrayList<>();
+      ArrayList<String> sourcesRight = new ArrayList<>();
+      getSource(joinNode.getLeft(), sourcesLeft);
+      getSource(joinNode.getRight(), sourcesRight);
 
-      inputSourceList.addAll(inputsLeft);
-      inputSourceList.addAll(inputsRight);
+      sourceList.addAll(sourcesLeft);
+      sourceList.addAll(sourcesRight);
     } else if (node instanceof SqlIdentifier) {
-      inputSourceList.add(node.toString());
+      sourceList.add(node.toString());
     } else if (node instanceof SqlBasicCall) {
       SqlBasicCall basicCall = ((SqlBasicCall) node);
       if (basicCall.getOperator() instanceof SqlAsOperator) {
-        getInput(basicCall.operand(0), inputSourceList);
+        getSource(basicCall.operand(0), sourceList);
       } else if (basicCall.getOperator() instanceof SqlUnnestOperator && 
basicCall.operand(0) instanceof SqlSelect) {
-        inputSourceList.addAll(getInputsFromSelectQuery(basicCall.operand(0)));
+        sourceList.addAll(getSourcesFromSelectQuery(basicCall.operand(0)));
         return;
       }
     } else if (node instanceof SqlSelect) {
-      getInput(((SqlSelect) node).getFrom(), inputSourceList);
+      getSource(((SqlSelect) node).getFrom(), sourceList);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 899ca41..6df3421 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -21,6 +21,7 @@ package org.apache.samza.sql.translator;
 
 import java.util.LinkedList;
 import java.util.List;
+
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
@@ -42,15 +43,13 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.sql.data.SamzaSqlCompositeKey;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.table.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static 
org.apache.samza.sql.data.SamzaSqlCompositeKey.createSamzaSqlCompositeKey;
-import static 
org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde;
 
 
 /**
@@ -75,11 +74,11 @@ class JoinTranslator {
 
   private static final Logger log = 
LoggerFactory.getLogger(JoinTranslator.class);
   private int joinId;
-  private SourceResolver sourceResolver;
+  private SqlIOResolver ioResolver;
 
-  JoinTranslator(int joinId, SourceResolver sourceResolver) {
+  JoinTranslator(int joinId, SqlIOResolver ioResolver) {
     this.joinId = joinId;
-    this.sourceResolver = sourceResolver;
+    this.ioResolver = ioResolver;
   }
 
   void translate(final LogicalJoin join, final TranslatorContext context) {
@@ -95,11 +94,7 @@ class JoinTranslator {
     populateStreamAndTableKeyIds(((RexCall) 
join.getCondition()).getOperands(), join, isTablePosOnRight, streamKeyIds,
         tableKeyIds);
 
-    JsonSerdeV2<SamzaSqlCompositeKey> keySerde = new 
JsonSerdeV2<>(SamzaSqlCompositeKey.class);
-    SamzaSqlRelMessageSerde relMsgSerde =
-        (SamzaSqlRelMessageSerde) new 
SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
-
-    Table table = loadLocalTable(isTablePosOnRight, tableKeyIds, keySerde, 
relMsgSerde, join, context);
+    Table table = loadLocalTable(isTablePosOnRight, tableKeyIds, join, 
context);
 
     MessageStream<SamzaSqlRelMessage> inputStream =
         isTablePosOnRight ?
@@ -117,13 +112,16 @@ class JoinTranslator {
         new SamzaSqlRelMessageJoinFunction(join.getJoinType(), 
isTablePosOnRight, streamKeyIds, streamFieldNames,
             tableFieldNames);
 
+    Serde<SamzaSqlCompositeKey> keySerde = new 
JsonSerdeV2<>(SamzaSqlCompositeKey.class);
+    Serde<SamzaSqlRelMessage> valueSerde = new 
JsonSerdeV2<>(SamzaSqlRelMessage.class);
+
     // Always re-partition the messages from the input stream by the composite 
key and then join the messages
     // with the table.
     MessageStream<SamzaSqlRelMessage> outputStream =
         inputStream
             .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds),
                 m -> m,
-                KVSerde.of(keySerde, relMsgSerde),
+                KVSerde.of(keySerde, valueSerde),
                 "stream_" + joinId)
             .map(KV::getValue)
             .join(table, joinFn);
@@ -249,34 +247,48 @@ class JoinTranslator {
         SqlExplainLevel.EXPPLAN_ATTRIBUTES);
   }
 
+  private SqlIOConfig resolveSourceConfig(RelNode relNode) {
+    String sourceName = String.join(".", 
relNode.getTable().getQualifiedName());
+    SqlIOConfig sourceConfig = ioResolver.fetchSourceInfo(sourceName);
+    if (sourceConfig == null) {
+      throw new SamzaException("Unsupported source found in join statement: " 
+ sourceName);
+    }
+    return sourceConfig;
+  }
+
   private boolean isTable(RelNode relNode) {
     // NOTE: Any intermediate form of a join is always a stream. Eg: For the 
second level join of
     // stream-table-table join, the left side of the join is join output, 
which we always
     // assume to be a stream. The intermediate stream won't be an instance of 
EnumerableTableScan.
-    return relNode instanceof EnumerableTableScan &&
-        sourceResolver.isTable(String.join(".", 
relNode.getTable().getQualifiedName()));
+    if (relNode instanceof EnumerableTableScan) {
+      return resolveSourceConfig(relNode).getTableDescriptor().isPresent();
+    } else {
+      return false;
+    }
   }
 
-  private Table loadLocalTable(boolean isTablePosOnRight, List<Integer> 
tableKeyIds, Serde keySerde, Serde relMsgSerde,
-      LogicalJoin join, TranslatorContext context) {
-    MessageStream<SamzaSqlRelMessage> inputTable =
-        isTablePosOnRight ?
-            context.getMessageStream(join.getRight().getId()) : 
context.getMessageStream(join.getLeft().getId());
+  private Table loadLocalTable(boolean isTablePosOnRight, List<Integer> 
tableKeyIds, LogicalJoin join, TranslatorContext context) {
+    RelNode relNode = isTablePosOnRight ? join.getRight() : join.getLeft();
+
+    MessageStream<SamzaSqlRelMessage> relOutputStream = 
context.getMessageStream(relNode.getId());
+
+    SqlIOConfig sourceConfig = resolveSourceConfig(relNode);
+
+    if (!sourceConfig.getTableDescriptor().isPresent()) {
+      String errMsg = "Failed to resolve table source in join operation: 
node=" + relNode;
+      log.error(errMsg);
+      throw new SamzaException(errMsg);
+    }
 
     // Create a table backed by RocksDb store with the fields in the join 
condition as composite key and relational
     // message as the value. Send the messages from the input stream denoted 
as 'table' to the created table store.
     Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table =
-        context.getStreamGraph()
-            .getTable(new RocksDbTableDescriptor("table_" + joinId)
-                .withSerde(KVSerde.of(keySerde, relMsgSerde)));
+        
context.getStreamGraph().getTable(sourceConfig.getTableDescriptor().get());
 
-    inputTable
+    relOutputStream
         .map(m -> new KV(createSamzaSqlCompositeKey(m, tableKeyIds), m))
         .sendTo(table);
 
     return table;
   }
-
-  private void logStringAndTableJoinKeys(List<String> fieldNames, 
List<Integer> fieldIds) {
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index c8d55e8..eda73a7 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.sql.translator;
 
+import java.util.Optional;
+
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.RelShuttleImpl;
@@ -27,18 +29,23 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.samza.SamzaException;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.table.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -47,6 +54,7 @@ import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
  * It then walks the relational graph and then populates the Samza's {@link 
StreamGraph} accordingly.
  */
 public class QueryTranslator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(QueryTranslator.class);
 
   private final ScanTranslator scanTranslator;
   private final SamzaSqlApplicationConfig sqlConfig;
@@ -65,6 +73,7 @@ public class QueryTranslator {
     final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
     final TranslatorContext context = new TranslatorContext(streamGraph, 
relRoot, executionContext);
     final RelNode node = relRoot.project();
+    final SqlIOResolver ioResolver = 
context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
 
     node.accept(new RelShuttleImpl() {
       int windowId = 0;
@@ -95,8 +104,7 @@ public class QueryTranslator {
       public RelNode visit(LogicalJoin join) {
         RelNode node = super.visit(join);
         joinId++;
-        SourceResolver sourceResolver = 
context.getExecutionContext().getSamzaSqlApplicationConfig().getSourceResolver();
-        new JoinTranslator(joinId, sourceResolver).translate(join, context);
+        new JoinTranslator(joinId, ioResolver).translate(join, context);
         return node;
       }
 
@@ -109,12 +117,23 @@ public class QueryTranslator {
       }
     });
 
-    SqlSystemSourceConfig outputSystemConfig =
-        
sqlConfig.getOutputSystemStreamConfigsBySource().get(queryInfo.getOutputSource());
-    SamzaRelConverter samzaMsgConverter = 
sqlConfig.getSamzaRelConverters().get(queryInfo.getOutputSource());
-    MessageStreamImpl<SamzaSqlRelMessage> stream =
-        (MessageStreamImpl<SamzaSqlRelMessage>) 
context.getMessageStream(node.getId());
+    String sink = queryInfo.getSink();
+    SqlIOConfig sinkConfig = 
sqlConfig.getOutputSystemStreamConfigsBySource().get(sink);
+    SamzaRelConverter samzaMsgConverter = 
sqlConfig.getSamzaRelConverters().get(queryInfo.getSink());
+    MessageStreamImpl<SamzaSqlRelMessage> stream = 
(MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId());
     MessageStream<KV<Object, Object>> outputStream = 
stream.map(samzaMsgConverter::convertToSamzaMessage);
-    
outputStream.sendTo(streamGraph.getOutputStream(outputSystemConfig.getStreamName()));
+
+    Optional<TableDescriptor> tableDescriptor = 
sinkConfig.getTableDescriptor();
+    if (!tableDescriptor.isPresent()) {
+      
outputStream.sendTo(streamGraph.getOutputStream(sinkConfig.getStreamName()));
+    } else {
+      Table outputTable = streamGraph.getTable(tableDescriptor.get());
+      if (outputTable == null) {
+        String msg = "Failed to obtain table descriptor of " + 
sinkConfig.getSource();
+        LOG.error(msg);
+        throw new SamzaException(msg);
+      }
+      outputStream.sendTo(outputTable);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 13300f7..1f9ed52 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -28,7 +28,7 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 
 
 /**
@@ -38,9 +38,9 @@ import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
 class ScanTranslator {
 
   private final Map<String, SamzaRelConverter> relMsgConverters;
-  private final Map<String, SqlSystemSourceConfig> systemStreamConfig;
+  private final Map<String, SqlIOConfig> systemStreamConfig;
 
-  ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, 
SqlSystemSourceConfig> ssc) {
+  ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, 
SqlIOConfig> ssc) {
     relMsgConverters = converters;
     this.systemStreamConfig = ssc;
   }
@@ -48,7 +48,7 @@ class ScanTranslator {
   void translate(final TableScan tableScan, final TranslatorContext context) {
     StreamGraph streamGraph = context.getStreamGraph();
     List<String> tableNameParts = tableScan.getTable().getQualifiedName();
-    String sourceName = 
SqlSystemSourceConfig.getSourceFromSourceParts(tableNameParts);
+    String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
 
     Validate.isTrue(relMsgConverters.containsKey(sourceName), 
String.format("Unknown source %s", sourceName));
     SamzaRelConverter converter = relMsgConverters.get(sourceName);

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
index 3365923..de0ecf1 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
@@ -28,7 +28,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
@@ -313,10 +313,10 @@ public class TestQueryTranslator {
   @Test (expected = SamzaException.class)
   public void testTranslateStreamTableInnerJoinWithMissingStream() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
-    String configSourceResolverDomain =
+    String configIOResolverDomain =
         
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, 
"config");
-    config.put(configSourceResolverDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
-        ConfigBasedSourceResolverFactory.class.getName());
+    config.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedIOResolverFactory.class.getName());
     String sql =
         "Insert into testavro.enrichedPageViewTopic"
             + " select p.name as profileName, pv.pageKey"

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
index 0804a6d..0d48c56 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.junit.Assert;
@@ -62,18 +62,18 @@ public class TestSamzaSqlApplicationConfig {
     // Pass
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into 
testavro.COMPLEX1 select * from testavro.SIMPLE1");
     new SamzaSqlApplicationConfig(new MapConfig(config));
-    testWithoutConfigShouldFail(config, 
SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER);
+    testWithoutConfigShouldFail(config, 
SamzaSqlApplicationConfig.CFG_IO_RESOLVER);
     testWithoutConfigShouldFail(config, 
SamzaSqlApplicationConfig.CFG_UDF_RESOLVER);
 
-    String configSourceResolverDomain =
+    String configIOResolverDomain =
         
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, 
"config");
-    String avroSamzaSqlConfigPrefix = configSourceResolverDomain + 
String.format("%s.", "testavro");
+    String avroSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", "testavro");
 
-    testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + 
SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER);
+    testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER);
 
     // Configs for the unused system "log" is not mandatory.
-    String logSamzaSqlConfigPrefix = configSourceResolverDomain + 
String.format("%s.", "log");
-    testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + 
SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER);
+    String logSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", "log");
+    testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER);
   }
 
   private void testWithoutConfigShouldPass(Map<String, String> config, String 
configKey) {

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
index 0bfd721..24faf4b 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java
@@ -31,10 +31,10 @@ public class TestSamzaSqlQueryParser {
   @Test
   public void testParseQuery() {
     QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo 
select * from tracking.bar");
-    Assert.assertEquals("log.foo", queryInfo.getOutputSource());
+    Assert.assertEquals("log.foo", queryInfo.getSink());
     Assert.assertEquals(queryInfo.getSelectQuery(), "select * from 
tracking.bar", queryInfo.getSelectQuery());
-    Assert.assertEquals(1, queryInfo.getInputSources().size());
-    Assert.assertEquals("tracking.bar", queryInfo.getInputSources().get(0));
+    Assert.assertEquals(1, queryInfo.getSources().size());
+    Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
   }
 
   @Test
@@ -46,10 +46,10 @@ public class TestSamzaSqlQueryParser {
             + " join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
-    Assert.assertEquals("testavro.enrichedPageViewTopic", 
queryInfo.getOutputSource());
-    Assert.assertEquals(2, queryInfo.getInputSources().size());
-    Assert.assertEquals("testavro.PAGEVIEW", 
queryInfo.getInputSources().get(0));
-    Assert.assertEquals("testavro.PROFILE.$table", 
queryInfo.getInputSources().get(1));
+    Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
+    Assert.assertEquals(2, queryInfo.getSources().size());
+    Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
+    Assert.assertEquals("testavro.PROFILE.$table", 
queryInfo.getSources().get(1));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java 
b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
new file mode 100644
index 0000000..c4cacbd
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
@@ -0,0 +1,69 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.e2e;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.testutil.TestIOResolverFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSamzaSqlTable {
+  @Test
+  public void testEndToEnd() throws Exception {
+    int numMessages = 20;
+
+    TestIOResolverFactory.TestTable.records.clear();
+
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+    String sql1 = "Insert into testDb.testTable.`$table` select id, name from 
testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    Assert.assertEquals(numMessages, 
TestIOResolverFactory.TestTable.records.size());
+  }
+
+  @Test
+  public void testEndToEndWithKey() throws Exception {
+    int numMessages = 20;
+
+    TestIOResolverFactory.TestTable.records.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+    String sql1 = "Insert into testDb.testTable.`$table` select id __key__, 
name from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    Assert.assertEquals(numMessages, 
TestIOResolverFactory.TestTable.records.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java 
b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index 208625d..14e2243 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -37,9 +37,9 @@ import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.sql.fn.FlattenUdf;
 import org.apache.samza.sql.fn.RegexMatchUdf;
-import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
 import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
@@ -52,6 +52,7 @@ import 
org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 public class SamzaSqlTestConfig {
 
   public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
+  public static final String SAMZA_SYSTEM_TEST_DB = "testDb";
 
   public static Map<String, String> fetchStaticConfigsWithFactories(int 
numberOfMessages) {
     return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, 
false);
@@ -76,11 +77,11 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
PassthroughCoordinationUtilsFactory.class.getName());
     staticConfigs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
 
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER, "config");
-    String configSourceResolverDomain =
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
+    String configIOResolverDomain =
         
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, 
"config");
-    staticConfigs.put(configSourceResolverDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
-        TestSourceResolverFactory.class.getName());
+    staticConfigs.put(configIOResolverDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        TestIOResolverFactory.class.getName());
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
     String configUdfResolverDomain = 
String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
@@ -91,8 +92,8 @@ public class SamzaSqlTestConfig {
             MyTestArrayUdf.class.getName()));
 
     String avroSystemConfigPrefix =
-        String.format(ConfigBasedSourceResolverFactory.CFG_FMT_SAMZA_PREFIX, 
SAMZA_SYSTEM_TEST_AVRO);
-    String avroSamzaSqlConfigPrefix = configSourceResolverDomain + 
String.format("%s.", SAMZA_SYSTEM_TEST_AVRO);
+        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, 
SAMZA_SYSTEM_TEST_AVRO);
+    String avroSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", SAMZA_SYSTEM_TEST_AVRO);
     staticConfigs.put(avroSystemConfigPrefix + "samza.factory", 
TestAvroSystemFactory.class.getName());
     staticConfigs.put(avroSystemConfigPrefix + 
TestAvroSystemFactory.CFG_NUM_MESSAGES,
         String.valueOf(numberOfMessages));
@@ -101,14 +102,23 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(avroSystemConfigPrefix + 
TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
         String.valueOf(windowDurationMs / 2));
     
staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, 
String.valueOf(windowDurationMs));
-    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "avro");
-    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String testDbSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", SAMZA_SYSTEM_TEST_DB);
+    staticConfigs.put(testDbSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(testDbSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
 
     String avroSamzaToRelMsgConverterDomain =
         
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, 
"avro");
     staticConfigs.put(avroSamzaToRelMsgConverterDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
         AvroRelConverterFactory.class.getName());
 
+    String testDbSamzaToRelMsgConverterDomain =
+        
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, 
TestIOResolverFactory.TEST_DB_SYSTEM);
+    staticConfigs.put(testDbSamzaToRelMsgConverterDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        AvroRelConverterFactory.class.getName());
+
     String configAvroRelSchemaProviderDomain =
         
String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, 
"config");
     staticConfigs.put(configAvroRelSchemaProviderDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
@@ -141,6 +151,10 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(configAvroRelSchemaProviderDomain + 
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
         "testavro", "pageViewCountTopic"), PageViewCount.SCHEMA$.toString());
 
+    staticConfigs.put(
+        configAvroRelSchemaProviderDomain + 
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+            TestIOResolverFactory.TEST_DB_SYSTEM, "testTable"), 
SimpleRecord.SCHEMA$.toString());
+
     staticConfigs.putAll(props);
 
     return staticConfigs;

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
new file mode 100644
index 0000000..bbe2a7e
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -0,0 +1,195 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.TaskContext;
+
+
+public class TestIOResolverFactory implements SqlIOResolverFactory {
+  public static final String TEST_DB_SYSTEM = "testDb";
+  public static final String TEST_TABLE_ID = "testDbId";
+
+  @Override
+  public SqlIOResolver create(Config config) {
+    return new TestIOResolver(config);
+  }
+
+  static class TestTableDescriptor extends BaseTableDescriptor {
+    protected TestTableDescriptor(String tableId) {
+      super(tableId);
+    }
+
+    @Override
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public TableSpec getTableSpec() {
+      return new TableSpec(tableId, KVSerde.of(new NoOpSerde(), new 
NoOpSerde()), TestTableProviderFactory.class.getName(), new HashMap<>());
+    }
+  }
+
+  public static class TestTable implements ReadWriteTable {
+    public static Map<Object, Object> records = new HashMap<>();
+    @Override
+    public Object get(Object key) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public Map getAll(List keys) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void put(Object key, Object value) {
+      if (key == null) {
+        records.put(System.nanoTime(), value);
+      } else {
+        records.put(key, value);
+      }
+    }
+
+    @Override
+    public void delete(Object key) {
+      records.remove(key);
+    }
+
+    @Override
+    public void deleteAll(List keys) {
+      records.clear();
+    }
+
+    @Override
+    public void flush() {
+    }
+
+    @Override
+    public void putAll(List entries) {
+      throw new NotImplementedException();
+    }
+  }
+
+  public static class TestTableProviderFactory implements TableProviderFactory 
{
+    @Override
+    public TableProvider getTableProvider(TableSpec tableSpec) {
+      return new TestTableProvider();
+    }
+  }
+
+  static class TestTableProvider implements TableProvider {
+    @Override
+    public void init(SamzaContainerContext containerContext, TaskContext 
taskContext) {
+    }
+
+    @Override
+    public Table getTable() {
+      return new TestTable();
+    }
+
+    @Override
+    public Map<String, String> generateConfig(Map<String, String> config) {
+      return new HashMap<>();
+    }
+
+    @Override
+    public void close() {
+    }
+  }
+
+  private class TestIOResolver implements SqlIOResolver {
+    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
+    private final Config config;
+    private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
+
+    public TestIOResolver(Config config) {
+      this.config = config;
+    }
+
+    private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
+      String[] sourceComponents = ioName.split("\\.");
+      int systemIdx = 0;
+      int endIdx = sourceComponents.length - 1;
+      int streamIdx = endIdx;
+      TableDescriptor tableDescriptor = null;
+
+      if 
(sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+        streamIdx = endIdx - 1;
+
+        tableDescriptor = tableDescMap.get(ioName);
+
+        if (tableDescriptor == null) {
+          if (isSink) {
+            tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + 
tableDescMap.size());
+          } else {
+            tableDescriptor = new RocksDbTableDescriptor("InputTable-" + 
ioName)
+                .withSerde(KVSerde.of(
+                    new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
+                    new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+          }
+          tableDescMap.put(ioName, tableDescriptor);
+        }
+      }
+
+      Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
+      return new SqlIOConfig(sourceComponents[systemIdx], 
sourceComponents[streamIdx],
+          Arrays.asList(sourceComponents), systemConfigs, tableDescriptor);
+    }
+
+    @Override
+    public SqlIOConfig fetchSourceInfo(String sourceName) {
+      return fetchIOInfo(sourceName, false);
+    }
+
+    @Override
+    public SqlIOConfig fetchSinkInfo(String sinkName) {
+      return fetchIOInfo(sinkName, true);
+    }
+  }
+}

Reply via email to