[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334931#comment-16334931 ]
ASF GitHub Bot commented on FLINK-8240: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r162990874 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -23,22 +23,74 @@ import java.net.URL import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration} import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.table.annotation.TableType -import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException, TableException} +import org.apache.flink.table.api._ import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource} +import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource, TableSourceFactoryService} import org.apache.flink.table.util.Logging import org.apache.flink.util.InstantiationUtil import org.reflections.Reflections -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** * The utility class is used to convert ExternalCatalogTable to TableSourceTable. */ object ExternalTableSourceUtil extends Logging { + /** + * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance + * + * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert + * @return converted [[TableSourceTable]] instance from the input catalog table + */ + def fromExternalCatalogTable( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) + : TableSourceTable[_] = { + + // check for the legacy external catalog path + if (externalCatalogTable.isLegacyTableType) { + LOG.warn("External catalog tables based on TableType annotations are deprecated. " + + "Please consider updating them to TableSourceFactories.") + fromExternalCatalogTableType(externalCatalogTable) + } + // use the factory approach + else { + val source = TableSourceFactoryService.findTableSourceFactory(externalCatalogTable) + tableEnv match { + // check for a batch table source in this batch environment + case _: BatchTableEnvironment => + source match { + case bts: BatchTableSource[_] => + new BatchTableSourceTable( + bts, + new FlinkStatistic(externalCatalogTable.getTableStats)) + case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + + s"in a batch environment.") + } + // check for a stream table source in this streaming environment + case _: StreamTableEnvironment => + source match { + case sts: StreamTableSource[_] => + new StreamTableSourceTable( + sts, + new FlinkStatistic(externalCatalogTable.getTableStats)) + case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + + s"in a streaming environment.") + } + case _ => throw new TableException("Unsupported table environment.") + } + } + } + + // ---------------------------------------------------------------------------------------------- + // NOTE: the following line can be removed once we drop support for TableType --- End diff -- I think we can also remove the `org.reflections:reflections` dependency once we removed this. > Create unified interfaces to configure and instatiate TableSources > ------------------------------------------------------------------ > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Timo Walther > Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)