wuchong commented on a change in pull request #12275: URL: https://github.com/apache/flink/pull/12275#discussion_r429746785
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java ########## @@ -241,6 +244,19 @@ public void putTableSchema(String key, TableSchema schema) { Arrays.asList(WATERMARK_ROWTIME, WATERMARK_STRATEGY_EXPR, WATERMARK_STRATEGY_DATA_TYPE), watermarkValues); } + + if (schema.getPrimaryKey().isPresent()) { + final UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get(); + final List<List<String>> uniqueConstraintValues = new ArrayList<>(); + uniqueConstraintValues.add(Arrays.asList( + uniqueConstraint.getName(), + uniqueConstraint.getType().name(), + String.join(",", uniqueConstraint.getColumns()))); + putIndexedFixedProperties( + key + '.' + CONSTRAINT_UNIQUE, + Arrays.asList(NAME, TYPE, CONSTRAINT_UNIQUE_COLUMNS), + uniqueConstraintValues); + } Review comment: Because we only support primary key now. I think we can have a dedicate primary key properties, so that we don't need to handle the index. For example: ```java public static final String PRIMARY_KEY_NAME = "primary-key.name"; public static final String PRIMARY_KEY_COLUMNS = "primary-key.columns"; schema.getPrimaryKey().ifPresent(pk -> { putString(key + "." + PRIMARY_KEY_NAME, pk.getName()); putString(key + "." + PRIMARY_KEY_COLUMNS, String.join(",", pk.getColumns())); }); ``` This is also helpful for users who write yaml: ``` tables: - name: TableNumber1 type: source-table schema: primary-key name: constraint1 columns: f1, f2 ``` ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java ########## @@ -610,7 +626,9 @@ public DataType getDataType(String key) { public Optional<TableSchema> getOptionalTableSchema(String key) { // filter for number of fields final int fieldCount = properties.keySet().stream() - .filter((k) -> k.startsWith(key) && k.endsWith('.' + TABLE_SCHEMA_NAME)) + .filter((k) -> k.startsWith(key) + // "key." is the prefix. + && SCHEMA_COLUMN_NAME_SUFFIX.matcher(k.substring(key.length() + 1)).matches()) Review comment: We can just to exclude the primary key, then don't need the regex matching. ``` .filter((k) -> k.startsWith(key) && !k.startsWith(key + "." + PRIMARY_KEY) && k.endsWith('.' + NAME)) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org