Repository: flink Updated Branches: refs/heads/release-1.3 db4ac400b -> 5f53052f6
[FLINK-6780] [table] ExternalTableSource should add time attributes in the row type. This closes #4023. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f53052f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f53052f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f53052f Branch: refs/heads/release-1.3 Commit: 5f53052f676ebf37de2d64cd0809704da16fe246 Parents: db4ac40 Author: Haohui Mai <whe...@apache.org> Authored: Wed May 31 01:40:53 2017 -0700 Committer: twalthr <twal...@apache.org> Committed: Wed May 31 13:42:41 2017 +0200 ---------------------------------------------------------------------- .../table/catalog/ExternalTableSourceUtil.scala | 18 ++++-- .../catalog/ExternalTableSourceUtilTest.scala | 63 ++++++++++++++++++++ 2 files changed, 77 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5f53052f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala index 0964bbc..ccc2e9e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala @@ -18,15 +18,15 @@ package org.apache.flink.table.catalog -import java.lang.reflect.Modifier import java.net.URL import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration} +import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.table.annotation.TableType import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException} -import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.schema.{StreamTableSourceTable, TableSourceTable} import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.TableSource +import org.apache.flink.table.sources.{StreamTableSource, TableSource} import org.apache.flink.util.InstantiationUtil import org.reflections.Reflections import org.slf4j.{Logger, LoggerFactory} @@ -83,6 +83,13 @@ object ExternalTableSourceUtil { registeredConverters } + @VisibleForTesting + private[flink] def injectTableSourceConverter( + tableType: String, + converterClazz: Class[_ <: TableSourceConverter[_]]) = { + tableTypeToTableSourceConvertersClazz.addBinding(tableType, converterClazz) + } + /** * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance * @@ -119,7 +126,10 @@ object ExternalTableSourceUtil { } else { FlinkStatistic.UNKNOWN } - new TableSourceTable(convertedTableSource, flinkStatistic) + convertedTableSource match { + case s : StreamTableSource[_] => new StreamTableSourceTable(s, flinkStatistic) + case _ => new TableSourceTable(convertedTableSource, flinkStatistic) + } } case None => LOG.error(s"Cannot find any TableSourceConverter binded to table type [$tableType]. " + http://git-wip-us.apache.org/repos/asf/flink/blob/5f53052f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala new file mode 100644 index 0000000..82bfd8d --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util.{Collections => JCollections, Set => JSet} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableSchema, Types} +import org.apache.flink.table.plan.schema.StreamTableSourceTable +import org.apache.flink.table.sources.StreamTableSource +import org.apache.flink.types.Row +import org.junit.Assert.assertTrue +import org.junit.{Before, Test} + +class ExternalTableSourceUtilTest { + + @Before + def setUp() : Unit = { + ExternalTableSourceUtil.injectTableSourceConverter("mock", classOf[MockTableSourceConverter]) + } + + @Test + def testExternalStreamTable() = { + val schema = new TableSchema(Array("foo"), Array(BasicTypeInfo.INT_TYPE_INFO)) + val table = ExternalCatalogTable("mock", schema) + val tableSource = ExternalTableSourceUtil.fromExternalCatalogTable(table) + assertTrue(tableSource.isInstanceOf[StreamTableSourceTable[_]]) + } +} + +class MockTableSourceConverter extends TableSourceConverter[StreamTableSource[Row]] { + override def requiredProperties: JSet[String] = JCollections.emptySet() + override def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable) + : StreamTableSource[Row] = { + new StreamTableSource[Row] { + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = + throw new UnsupportedOperationException + + override def getReturnType: TypeInformation[Row] = { + val schema = externalCatalogTable.schema + Types.ROW(schema.getColumnNames, schema.getTypes) + } + } + } +}