[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546407#comment-16546407
 ] 

ASF GitHub Bot commented on FLINK-9852:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202978881
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
    @@ -18,33 +18,299 @@
     
     package org.apache.flink.table.catalog
     
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
    +import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
     import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
    -  * Defines a table in an [[ExternalCatalog]].
    +  * Defines a table in an [[ExternalCatalog]]. External catalog tables 
describe table sources
    +  * and/or sinks for both batch and stream environments.
    +  *
    +  * The catalog table takes descriptors which allow for declaring the 
communication to external
    +  * systems in an implementation-agnostic way. The classpath is scanned 
for suitable table factories
    +  * that match the desired configuration.
    +  *
    +  * Use the provided builder methods to configure the external catalog 
table accordingly.
    +  *
    +  * The following example shows how to read from a connector using a JSON 
format and
    +  * declaring it as a table source:
       *
    -  * @param connectorDesc describes the system to connect to
    -  * @param formatDesc describes the data format of a connector
    -  * @param schemaDesc describes the schema of the result table
    -  * @param statisticsDesc describes the estimated statistics of the result 
table
    -  * @param metadataDesc describes additional metadata of a table
    +  * {{{
    +  *   ExternalCatalogTable(
    +  *     new ExternalSystemXYZ()
    +  *       .version("0.11"))
    +  *   .withFormat(
    +  *     new Json()
    +  *       .jsonSchema("{...}")
    +  *       .failOnMissingField(false))
    +  *   .withSchema(
    +  *     new Schema()
    +  *       .field("user-name", "VARCHAR").from("u_name")
    +  *       .field("count", "DECIMAL")
    +  *   .asTableSource()
    --- End diff --
    
    I also thought about that but actually descriptors don't "build" something. 
The only final representation would be the properties but we don't expose them 
to the user.


> Expose descriptor-based sink creation in table environments
> -----------------------------------------------------------
>
>                 Key: FLINK-9852
>                 URL: https://issues.apache.org/jira/browse/FLINK-9852
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to