wuchong commented on a change in pull request #12275:
URL: https://github.com/apache/flink/pull/12275#discussion_r429746785



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
##########
@@ -241,6 +244,19 @@ public void putTableSchema(String key, TableSchema schema) 
{
                                Arrays.asList(WATERMARK_ROWTIME, 
WATERMARK_STRATEGY_EXPR, WATERMARK_STRATEGY_DATA_TYPE),
                                watermarkValues);
                }
+
+               if (schema.getPrimaryKey().isPresent()) {
+                       final UniqueConstraint uniqueConstraint = 
schema.getPrimaryKey().get();
+                       final List<List<String>> uniqueConstraintValues = new 
ArrayList<>();
+                       uniqueConstraintValues.add(Arrays.asList(
+                                       uniqueConstraint.getName(),
+                                       uniqueConstraint.getType().name(),
+                                       String.join(",", 
uniqueConstraint.getColumns())));
+                       putIndexedFixedProperties(
+                                       key + '.' + CONSTRAINT_UNIQUE,
+                                       Arrays.asList(NAME, TYPE, 
CONSTRAINT_UNIQUE_COLUMNS),
+                                       uniqueConstraintValues);
+               }

Review comment:
       Because we only support primary key now. I think we can have a dedicate 
primary key properties, so that we don't need to handle the index. For example:
   
   ```java
   public static final String PRIMARY_KEY_NAME = "primary-key.name";
   public static final String PRIMARY_KEY_COLUMNS = "primary-key.columns";
   
   schema.getPrimaryKey().ifPresent(pk -> {
       putString(key + "." + PRIMARY_KEY_NAME, pk.getName());
       putString(key + "." + PRIMARY_KEY_COLUMNS, String.join(",", 
pk.getColumns()));
   });
   ```
   
   This is also helpful for users who write yaml: 
   
   ```
   tables:
     - name: TableNumber1
       type: source-table
       schema:
         primary-key
           name: constraint1
           columns: f1, f2
   ```

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
##########
@@ -610,7 +626,9 @@ public DataType getDataType(String key) {
        public Optional<TableSchema> getOptionalTableSchema(String key) {
                // filter for number of fields
                final int fieldCount = properties.keySet().stream()
-                       .filter((k) -> k.startsWith(key) && k.endsWith('.' + 
TABLE_SCHEMA_NAME))
+                       .filter((k) -> k.startsWith(key)
+                                       // "key." is the prefix.
+                                       && 
SCHEMA_COLUMN_NAME_SUFFIX.matcher(k.substring(key.length() + 1)).matches())

Review comment:
       We can just to exclude the primary key, then don't need the regex 
matching. 
   
   ```
   .filter((k) -> k.startsWith(key) && !k.startsWith(key + "." + PRIMARY_KEY) 
&& k.endsWith('.' + NAME))
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to