Repository: samza
Updated Branches:
  refs/heads/master 3cc2a05f7 -> 2f4d0b953


http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
deleted file mode 100644
index 4f1d08e..0000000
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.testutil;
-
-import java.util.Arrays;
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.interfaces.SourceResolverFactory;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
-
-
-public class TestSourceResolverFactory implements SourceResolverFactory {
-  @Override
-  public SourceResolver create(Config config) {
-    return new TestSourceResolver(config);
-  }
-
-  private class TestSourceResolver implements SourceResolver {
-    private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
-    private final Config config;
-
-    public TestSourceResolver(Config config) {
-      this.config = config;
-    }
-
-    @Override
-    public SqlSystemSourceConfig fetchSourceInfo(String sourceName) {
-      String[] sourceComponents = sourceName.split("\\.");
-      boolean isTable = false;
-      int systemIdx = 0;
-      int endIdx = sourceComponents.length - 1;
-      int streamIdx = endIdx;
-
-      if 
(sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
-        isTable = true;
-        streamIdx = endIdx - 1;
-      }
-      Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
-      return new SqlSystemSourceConfig(sourceComponents[systemIdx], 
sourceComponents[streamIdx],
-          Arrays.asList(sourceComponents), systemConfigs, isTable);
-    }
-
-    @Override
-    public boolean isTable(String sourceName) {
-      String[] sourceComponents = sourceName.split("\\.");
-      return sourceComponents[sourceComponents.length - 
1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java 
b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
index 251ea16..bfd217c 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
@@ -38,9 +38,9 @@ import org.apache.samza.serializers.StringSerdeFactory;
 import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
 import org.apache.samza.sql.fn.FlattenUdf;
 import org.apache.samza.sql.fn.RegexMatchUdf;
-import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
-import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.testutil.JsonUtil;
@@ -119,11 +119,11 @@ public class SamzaSqlConsole {
     staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
     staticConfigs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
 
-    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER, "config");
-    String configSourceResolverDomain =
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
+    String configIOResolverDomain =
         
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, 
"config");
-    staticConfigs.put(configSourceResolverDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
-        ConfigBasedSourceResolverFactory.class.getName());
+    staticConfigs.put(configIOResolverDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedIOResolverFactory.class.getName());
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
     String configUdfResolverDomain = 
String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
@@ -137,8 +137,8 @@ public class SamzaSqlConsole {
     staticConfigs.put(AvroSerDeFactory.CFG_AVRO_SCHEMA, 
ProfileChangeEvent.SCHEMA$.toString());
 
     String kafkaSystemConfigPrefix =
-        String.format(ConfigBasedSourceResolverFactory.CFG_FMT_SAMZA_PREFIX, 
SAMZA_SYSTEM_KAFKA);
-    String avroSamzaSqlConfigPrefix = configSourceResolverDomain + 
String.format("%s.", SAMZA_SYSTEM_KAFKA);
+        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, 
SAMZA_SYSTEM_KAFKA);
+    String avroSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", SAMZA_SYSTEM_KAFKA);
     staticConfigs.put(kafkaSystemConfigPrefix + "samza.factory", 
KafkaSystemFactory.class.getName());
     staticConfigs.put(kafkaSystemConfigPrefix + "samza.key.serde", "string");
     staticConfigs.put(kafkaSystemConfigPrefix + "samza.msg.serde", "avro");
@@ -148,15 +148,15 @@ public class SamzaSqlConsole {
     staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.reset", "true");
     staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.default", 
"oldest");
 
-    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "avro");
-    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
 
     String logSystemConfigPrefix =
-        String.format(ConfigBasedSourceResolverFactory.CFG_FMT_SAMZA_PREFIX, 
SAMZA_SYSTEM_LOG);
-    String logSamzaSqlConfigPrefix = configSourceResolverDomain + 
String.format("%s.", SAMZA_SYSTEM_LOG);
+        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, 
SAMZA_SYSTEM_LOG);
+    String logSamzaSqlConfigPrefix = configIOResolverDomain + 
String.format("%s.", SAMZA_SYSTEM_LOG);
     staticConfigs.put(logSystemConfigPrefix + "samza.factory", 
ConsoleLoggingSystemFactory.class.getName());
-    staticConfigs.put(logSamzaSqlConfigPrefix + 
SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "json");
-    staticConfigs.put(logSamzaSqlConfigPrefix + 
SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+    staticConfigs.put(logSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "json");
+    staticConfigs.put(logSamzaSqlConfigPrefix + 
SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
 
     String avroSamzaToRelMsgConverterDomain =
         
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, 
"avro");

Reply via email to