Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6201#discussion_r200521222
  
    --- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
    @@ -56,23 +58,44 @@ public Environment() {
                return tables;
        }
     
    +   private static TableDescriptor create(String name, Map<String, Object> 
config) {
    +           if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
    +                   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
    +           }
    +           final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
    +           if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
    +                   return new Source(name, 
ConfigUtil.normalizeYaml(config));
    +           } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
    +                   return new Sink(name, ConfigUtil.normalizeYaml(config));
    +           } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
    +                   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
    +           }
    +           return null;
    +   }
    +
        public void setTables(List<Map<String, Object>> tables) {
                this.tables = new HashMap<>(tables.size());
                tables.forEach(config -> {
    -                   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
    -                           throw new SqlClientException("The 'type' 
attribute of a table is missing.");
    +                   if (!config.containsKey(NAME)) {
    +                           throw new SqlClientException("The 'name' 
attribute of a table is missing.");
                        }
    -                   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
    -                           
config.remove(TableDescriptorValidator.TABLE_TYPE());
    -                           final Source s = Source.create(config);
    -                           if (this.tables.containsKey(s.getName())) {
    -                                   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
    -                           }
    -                           this.tables.put(s.getName(), s);
    -                   } else {
    +                   final Object name = config.get(NAME);
    +                   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
    +                           throw new SqlClientException("Invalid table 
name '" + name + "'.");
    +                   }
    +                   final String tableName = (String) name;
    +                   final Map<String, Object> properties = new 
HashMap<>(config);
    +                   properties.remove(NAME);
    +
    +                   TableDescriptor tableDescriptor = create(tableName, 
properties);
    +                   if (null == tableDescriptor) {
                                throw new SqlClientException(
    -                                           "Invalid table 'type' attribute 
value, only 'source' is supported");
    +                                           "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
    +                   }
    +                   if (this.tables.containsKey(tableName)) {
    +                           throw new SqlClientException("Duplicate table 
name '" + tableName + "'.");
    --- End diff --
    
    if only `"source"` and `"sink"` is allowed, should we allow the same name 
but different type. e.g. `{"name": "t1", "type": "source"}` and `{"name": "t1", 
"type": "sink"}` co-exist? this is actually following up with the previous 
comment. I think we just need one, either should work.


---

Reply via email to