Repository: samza Updated Branches: refs/heads/master 3cc2a05f7 -> 2f4d0b953
http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java deleted file mode 100644 index 4f1d08e..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java +++ /dev/null @@ -1,66 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql.testutil; - -import java.util.Arrays; -import org.apache.samza.config.Config; -import org.apache.samza.sql.interfaces.SourceResolver; -import org.apache.samza.sql.interfaces.SourceResolverFactory; -import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; - - -public class TestSourceResolverFactory implements SourceResolverFactory { - @Override - public SourceResolver create(Config config) { - return new TestSourceResolver(config); - } - - private class TestSourceResolver implements SourceResolver { - private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table"; - private final Config config; - - public TestSourceResolver(Config config) { - this.config = config; - } - - @Override - public SqlSystemSourceConfig fetchSourceInfo(String sourceName) { - String[] sourceComponents = sourceName.split("\\."); - boolean isTable = false; - int systemIdx = 0; - int endIdx = sourceComponents.length - 1; - int streamIdx = endIdx; - - if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) { - isTable = true; - streamIdx = endIdx - 1; - } - Config systemConfigs = config.subset(sourceComponents[systemIdx] + "."); - return new SqlSystemSourceConfig(sourceComponents[systemIdx], sourceComponents[streamIdx], - Arrays.asList(sourceComponents), systemConfigs, isTable); - } - - @Override - public boolean isTable(String sourceName) { - String[] sourceComponents = sourceName.split("\\."); - return sourceComponents[sourceComponents.length - 1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2f4d0b95/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java ---------------------------------------------------------------------- diff --git a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java index 251ea16..bfd217c 100644 --- a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java +++ b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java @@ -38,9 +38,9 @@ import org.apache.samza.serializers.StringSerdeFactory; import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory; import org.apache.samza.sql.fn.FlattenUdf; import org.apache.samza.sql.fn.RegexMatchUdf; -import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory; +import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory; import org.apache.samza.sql.impl.ConfigBasedUdfResolver; -import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; +import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; import org.apache.samza.sql.testutil.JsonUtil; @@ -119,11 +119,11 @@ public class SamzaSqlConsole { staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER, "config"); - String configSourceResolverDomain = + staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config"); + String configIOResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config"); - staticConfigs.put(configSourceResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY, - ConfigBasedSourceResolverFactory.class.getName()); + staticConfigs.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY, + ConfigBasedIOResolverFactory.class.getName()); staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config"); String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config"); @@ -137,8 +137,8 @@ public class SamzaSqlConsole { staticConfigs.put(AvroSerDeFactory.CFG_AVRO_SCHEMA, ProfileChangeEvent.SCHEMA$.toString()); String kafkaSystemConfigPrefix = - String.format(ConfigBasedSourceResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_KAFKA); - String avroSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", SAMZA_SYSTEM_KAFKA); + String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_KAFKA); + String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_KAFKA); staticConfigs.put(kafkaSystemConfigPrefix + "samza.factory", KafkaSystemFactory.class.getName()); staticConfigs.put(kafkaSystemConfigPrefix + "samza.key.serde", "string"); staticConfigs.put(kafkaSystemConfigPrefix + "samza.msg.serde", "avro"); @@ -148,15 +148,15 @@ public class SamzaSqlConsole { staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.reset", "true"); staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.default", "oldest"); - staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "avro"); - staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config"); + staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro"); + staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config"); String logSystemConfigPrefix = - String.format(ConfigBasedSourceResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_LOG); - String logSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", SAMZA_SYSTEM_LOG); + String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_LOG); + String logSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_LOG); staticConfigs.put(logSystemConfigPrefix + "samza.factory", ConsoleLoggingSystemFactory.class.getName()); - staticConfigs.put(logSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "json"); - staticConfigs.put(logSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config"); + staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "json"); + staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config"); String avroSamzaToRelMsgConverterDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");