JingsongLi commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1116731188
########## flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java: ########## @@ -214,6 +221,53 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig catalogTable = catalogTable.copy(options); } + if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) { Review Comment: These logical can be in `SchemaManager.createTable`? Spark dose not have these codes to take care partition. ########## flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java: ########## @@ -131,6 +131,137 @@ public void testCreateTable() { + "{\"id\":2,\"name\":\"c\",\"type\":\"CHAR(10)\"}]]]"); } + @Test + public void testCreateTableAs() { + spark.sql( + "CREATE TABLE default.testCreateTable(\n" + + "a BIGINT,\n" + + "b VARCHAR(10),\n" + + "c CHAR(10))"); + spark.sql("INSERT INTO default.testCreateTable VALUES(1,'a','b')"); + spark.sql( + "CREATE TABLE default.testCreateTableAs AS SELECT * FROM default.testCreateTable"); + List<Row> result = spark.sql("SELECT * FROM default.testCreateTableAs").collectAsList(); + assertThat(result.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,a,b]"); + + // partitioned table + spark.sql( + "CREATE TABLE default.partitionedTable (\n" + + "a BIGINT,\n" + + "b STRING," + + "c STRING) USING tablestore\n" + + "COMMENT 'table comment'\n" + + "PARTITIONED BY (a,b)"); + spark.sql("INSERT INTO default.partitionedTable VALUES(1,'aaa','bbb')"); + spark.sql( + "CREATE TABLE default.partitionedTableAs TBLPROPERTIES ('partition' = 'a') AS SELECT * FROM default.partitionedTable"); + assertThat( + spark.sql("SHOW CREATE TABLE default.partitionedTableAs") + .collectAsList() + .toString()) + .isEqualTo( + String.format( + "[[CREATE TABLE partitionedTableAs (\n" + + " `a` BIGINT,\n" + + " `b` STRING,\n" + + " `c` STRING)\n" + + "TBLPROPERTIES(\n" + + " 'partition' = 'a',\n" Review Comment: It is wrong, there is not `PARTITIONED` definition. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java: ########## @@ -474,6 +474,18 @@ public class CoreOptions implements Serializable { + "$hour:00:00'.")) .build()); + public static final ConfigOption<String> PRIMARY_KEY = + key("primary-key") + .stringType() + .noDefaultValue() + .withDescription("Define primary key for `CREATE TABLE AS SELECT` statement."); Review Comment: `Define primary key by table options, cannot define primary key on DDL and table options at the same time.`. ########## flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java: ########## @@ -214,6 +221,53 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig catalogTable = catalogTable.copy(options); } + if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) { + ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) catalogTable; + ResolvedSchema oldResolvedSchema = resolvedCatalogTable.getResolvedSchema(); + if (oldResolvedSchema.getPrimaryKey().isPresent()) { + throw new CatalogException( + "Table with primary key cannot contain `primary-key` option."); + } + + String primaryKeys = options.get(CoreOptions.PRIMARY_KEY.key()); Review Comment: We should remove this key? same to partition. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java: ########## @@ -474,6 +474,18 @@ public class CoreOptions implements Serializable { + "$hour:00:00'.")) .build()); + public static final ConfigOption<String> PRIMARY_KEY = Review Comment: immutable ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java: ########## @@ -474,6 +474,18 @@ public class CoreOptions implements Serializable { + "$hour:00:00'.")) .build()); + public static final ConfigOption<String> PRIMARY_KEY = + key("primary-key") + .stringType() + .noDefaultValue() + .withDescription("Define primary key for `CREATE TABLE AS SELECT` statement."); + + public static final ConfigOption<String> PARTITION = Review Comment: immutable ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java: ########## @@ -474,6 +474,18 @@ public class CoreOptions implements Serializable { + "$hour:00:00'.")) .build()); + public static final ConfigOption<String> PRIMARY_KEY = + key("primary-key") + .stringType() + .noDefaultValue() + .withDescription("Define primary key for `CREATE TABLE AS SELECT` statement."); + + public static final ConfigOption<String> PARTITION = + key("partition") + .stringType() + .noDefaultValue() + .withDescription("Define partition for `CREATE TABLE AS SELECT` statement."); Review Comment: Same to pk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org