[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534328#comment-16534328
 ] 

ASF GitHub Bot commented on FLINK-8866:
---------------------------------------

Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6201#discussion_r200521017
  
    --- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
    @@ -56,23 +58,44 @@ public Environment() {
                return tables;
        }
     
    +   private static TableDescriptor create(String name, Map<String, Object> 
config) {
    +           if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
    +                   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
    +           }
    +           final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
    +           if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
    +                   return new Source(name, 
ConfigUtil.normalizeYaml(config));
    +           } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
    +                   return new Sink(name, ConfigUtil.normalizeYaml(config));
    +           } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
    +                   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
    +           }
    +           return null;
    +   }
    +
        public void setTables(List<Map<String, Object>> tables) {
                this.tables = new HashMap<>(tables.size());
                tables.forEach(config -> {
    -                   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
    -                           throw new SqlClientException("The 'type' 
attribute of a table is missing.");
    +                   if (!config.containsKey(NAME)) {
    +                           throw new SqlClientException("The 'name' 
attribute of a table is missing.");
                        }
    -                   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
    -                           
config.remove(TableDescriptorValidator.TABLE_TYPE());
    -                           final Source s = Source.create(config);
    -                           if (this.tables.containsKey(s.getName())) {
    -                                   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
    -                           }
    -                           this.tables.put(s.getName(), s);
    -                   } else {
    +                   final Object name = config.get(NAME);
    +                   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
    +                           throw new SqlClientException("Invalid table 
name '" + name + "'.");
    +                   }
    +                   final String tableName = (String) name;
    +                   final Map<String, Object> properties = new 
HashMap<>(config);
    +                   properties.remove(NAME);
    +
    +                   TableDescriptor tableDescriptor = create(tableName, 
properties);
    +                   if (null == tableDescriptor) {
                                throw new SqlClientException(
    -                                           "Invalid table 'type' attribute 
value, only 'source' is supported");
    +                                           "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
    --- End diff --
    
    missing `both` ?


> Create unified interfaces to configure and instatiate TableSinks
> ----------------------------------------------------------------
>
>                 Key: FLINK-8866
>                 URL: https://issues.apache.org/jira/browse/FLINK-8866
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Shuyi Chen
>            Priority: Major
>              Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to