JingsongLi commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1116731188


##########
flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java:
##########
@@ -214,6 +221,53 @@ public void createTable(ObjectPath tablePath, 
CatalogBaseTable table, boolean ig
             catalogTable = catalogTable.copy(options);
         }
 
+        if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {

Review Comment:
   These logical can be in `SchemaManager.createTable`? Spark dose not have 
these codes to take care partition.



##########
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java:
##########
@@ -131,6 +131,137 @@ public void testCreateTable() {
                                 + 
"{\"id\":2,\"name\":\"c\",\"type\":\"CHAR(10)\"}]]]");
     }
 
+    @Test
+    public void testCreateTableAs() {
+        spark.sql(
+                "CREATE TABLE default.testCreateTable(\n"
+                        + "a BIGINT,\n"
+                        + "b VARCHAR(10),\n"
+                        + "c CHAR(10))");
+        spark.sql("INSERT INTO default.testCreateTable VALUES(1,'a','b')");
+        spark.sql(
+                "CREATE TABLE default.testCreateTableAs AS SELECT * FROM 
default.testCreateTable");
+        List<Row> result = spark.sql("SELECT * FROM 
default.testCreateTableAs").collectAsList();
+        
assertThat(result.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,a,b]");
+
+        // partitioned table
+        spark.sql(
+                "CREATE TABLE default.partitionedTable (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING,"
+                        + "c STRING) USING tablestore\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a,b)");
+        spark.sql("INSERT INTO default.partitionedTable 
VALUES(1,'aaa','bbb')");
+        spark.sql(
+                "CREATE TABLE default.partitionedTableAs TBLPROPERTIES 
('partition' = 'a') AS SELECT * FROM default.partitionedTable");
+        assertThat(
+                        spark.sql("SHOW CREATE TABLE 
default.partitionedTableAs")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo(
+                        String.format(
+                                "[[CREATE TABLE partitionedTableAs (\n"
+                                        + "  `a` BIGINT,\n"
+                                        + "  `b` STRING,\n"
+                                        + "  `c` STRING)\n"
+                                        + "TBLPROPERTIES(\n"
+                                        + "  'partition' = 'a',\n"

Review Comment:
   It is wrong, there is not `PARTITIONED` definition.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -474,6 +474,18 @@ public class CoreOptions implements Serializable {
                                                             + "$hour:00:00'."))
                                     .build());
 
+    public static final ConfigOption<String> PRIMARY_KEY =
+            key("primary-key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Define primary key for `CREATE TABLE AS 
SELECT` statement.");

Review Comment:
   `Define primary key by table options, cannot define primary key on DDL and 
table options at the same time.`.



##########
flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java:
##########
@@ -214,6 +221,53 @@ public void createTable(ObjectPath tablePath, 
CatalogBaseTable table, boolean ig
             catalogTable = catalogTable.copy(options);
         }
 
+        if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {
+            ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) 
catalogTable;
+            ResolvedSchema oldResolvedSchema = 
resolvedCatalogTable.getResolvedSchema();
+            if (oldResolvedSchema.getPrimaryKey().isPresent()) {
+                throw new CatalogException(
+                        "Table with primary key cannot contain `primary-key` 
option.");
+            }
+
+            String primaryKeys = options.get(CoreOptions.PRIMARY_KEY.key());

Review Comment:
   We should remove this key? same to partition.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -474,6 +474,18 @@ public class CoreOptions implements Serializable {
                                                             + "$hour:00:00'."))
                                     .build());
 
+    public static final ConfigOption<String> PRIMARY_KEY =

Review Comment:
   immutable



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -474,6 +474,18 @@ public class CoreOptions implements Serializable {
                                                             + "$hour:00:00'."))
                                     .build());
 
+    public static final ConfigOption<String> PRIMARY_KEY =
+            key("primary-key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Define primary key for `CREATE TABLE AS 
SELECT` statement.");
+
+    public static final ConfigOption<String> PARTITION =

Review Comment:
   immutable



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -474,6 +474,18 @@ public class CoreOptions implements Serializable {
                                                             + "$hour:00:00'."))
                                     .build());
 
+    public static final ConfigOption<String> PRIMARY_KEY =
+            key("primary-key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Define primary key for `CREATE TABLE AS 
SELECT` statement.");
+
+    public static final ConfigOption<String> PARTITION =
+            key("partition")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Define partition for `CREATE TABLE AS 
SELECT` statement.");

Review Comment:
   Same to pk



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to