LadyForest commented on code in PR #24934:
URL: https://github.com/apache/flink/pull/24934#discussion_r1639160703


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java:
##########
@@ -39,10 +41,18 @@
 public class CreateCatalogOperation implements CreateOperation {
     private final String catalogName;
     private final Map<String, String> properties;
+    private final @Nullable String comment;

Review Comment:
   please keep a consistent annotation style
   ```suggestion
       @Nullable private final  String comment;
   ```



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java:
##########
@@ -45,11 +48,18 @@ public class SqlCreateCatalog extends SqlCreate {
 
     private final SqlNodeList propertyList;
 
+    private final @Nullable SqlNode comment;

Review Comment:
   ```suggestion
       @Nullable private final SqlCharStringLiteral comment;
   ```



##########
flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java:
##########
@@ -428,7 +428,7 @@ void testConcurrentSplitAssignmentForMultipleHosts() throws 
InterruptedException
         assertThat(ia.getNextInputSplit("testhost", 0)).isNull();
 
         // at least one fraction of hosts needs be local, no matter how bad 
the thread races
-        assertThat(ia.getNumberOfRemoteAssignments())
+        assertThat(ia.getNumberOfLocalAssignments())

Review Comment:
   Do not include commits that don't belong to this PR.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -85,6 +87,13 @@ public static List<TableChange> buildModifyColumnChange(
         }
     }
 
+    public static @Nullable String getCatalogComment(Optional<SqlNode> 
catalogComment) {
+        return catalogComment
+                .map(SqlCharStringLiteral.class::cast)
+                .map(c -> c.getValueAs(NlsString.class).getValue())
+                .orElse(null);
+    }
+

Review Comment:
   it can be inlined, so remove this



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java:
##########
@@ -53,11 +63,23 @@ public Map<String, String> getProperties() {
         return properties;
     }
 
+    public @Nullable String getComment() {
+        return comment;
+    }

Review Comment:
   Remove this getter if there is no use.



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java:
##########
@@ -70,11 +80,28 @@ public SqlNodeList getPropertyList() {
         return propertyList;
     }
 
+    public Optional<SqlNode> getComment() {

Review Comment:
   ```suggestion
       public Optional<SqlCharStringLiteral> getComment() {
   ```



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java:
##########
@@ -45,11 +48,18 @@ public class SqlCreateCatalog extends SqlCreate {
 
     private final SqlNodeList propertyList;
 
+    private final @Nullable SqlNode comment;
+
     public SqlCreateCatalog(
-            SqlParserPos position, SqlIdentifier catalogName, SqlNodeList 
propertyList) {
-        super(OPERATOR, position, false, false);
+            SqlParserPos position,
+            SqlIdentifier catalogName,
+            SqlNodeList propertyList,
+            @Nullable SqlNode comment,

Review Comment:
   ```suggestion
               @Nullable SqlCharStringLiteral comment,
   ```



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -134,6 +134,28 @@ void testCreateCatalog() {
                                 + "  'key1' = 'value1',\n"
                                 + "  'key2' = 'value2'\n"
                                 + ")");
+        sql("create catalog c1 comment 'hello'\n"
+                        + " WITH (\n"
+                        + "  'key1'='value1',\n"
+                        + "  'key2'='value2'\n"
+                        + " )\n")
+                .ok(
+                        "CREATE CATALOG `C1`\n"
+                                + "COMMENT 'hello' WITH (\n"
+                                + "  'key1' = 'value1',\n"
+                                + "  'key2' = 'value2'\n"
+                                + ")");
+        sql("create catalog if not exists c1 comment 'hello'\n"
+                        + " WITH (\n"
+                        + "  'key1'='value1',\n"
+                        + "  'key2'='value2'\n"
+                        + " )\n")
+                .ok(
+                        "CREATE CATALOG IF NOT EXISTS `C1`\n"
+                                + "COMMENT 'hello' WITH (\n"
+                                + "  'key1' = 'value1',\n"
+                                + "  'key2' = 'value2'\n"
+                                + ")");

Review Comment:
   Add one more test for `CREATE CATALOG IF NOT EXISTS` without comment



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java:
##########
@@ -48,18 +55,34 @@ public Configuration getConfiguration() {
         return configuration;
     }
 
-    private CatalogDescriptor(String catalogName, Configuration configuration) 
{
+    public Optional<String> getComment() {
+        return Optional.ofNullable(comment);
+    }
+
+    public CatalogDescriptor setComment(String comment) {
+        return new CatalogDescriptor(catalogName, configuration, comment);
+    }

Review Comment:
   remove this method and add it back for the next pr



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java:
##########
@@ -367,14 +368,28 @@ void testCatalogStore() throws Exception {
         catalogManager.createCatalog("cat1", CatalogDescriptor.of("cat1", 
configuration));
         catalogManager.createCatalog("cat2", CatalogDescriptor.of("cat2", 
configuration));
         catalogManager.createCatalog("cat3", CatalogDescriptor.of("cat3", 
configuration));
+        catalogManager.createCatalog(

Review Comment:
   Please add the exception test as well.
   ```java
       assertThatThrownBy(
                           () ->
                                   catalogManager.createCatalog(
                                           "cat_comment",
                                           CatalogDescriptor.of(
                                                   "cat_comment",
                                                   configuration.clone(),
                                                   "second comment for 
catalog"),
                                           false))
                   .isInstanceOf(CatalogException.class)
                   .hasMessage("Catalog cat_comment already exists in catalog 
store.");
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java:
##########
@@ -41,6 +43,10 @@ public Operation convertSqlNode(SqlCreateCatalog node, 
ConvertContext context) {
                                         ((SqlTableOption) p).getKeyString(),
                                         ((SqlTableOption) 
p).getValueString()));
 
-        return new CreateCatalogOperation(node.catalogName(), properties);
+        return new CreateCatalogOperation(
+                node.catalogName(),
+                properties,
+                getCatalogComment(node.getComment()),

Review Comment:
   ```suggestion
                   node.getComment().map(c -> 
c.getValueAs(String.class)).orElse(null),
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -295,31 +295,42 @@ public DataTypeFactory getDataTypeFactory() {
      *
      * @param catalogName the given catalog name under which to create the 
given catalog
      * @param catalogDescriptor catalog descriptor for creating catalog
+     * @param ignoreIfExists if false exception will be thrown if a catalog 
exists.
      * @throws CatalogException If the catalog already exists in the catalog 
store or initialized
      *     catalogs, or if an error occurs while creating the catalog or 
storing the {@link
      *     CatalogDescriptor}
      */
-    public void createCatalog(String catalogName, CatalogDescriptor 
catalogDescriptor)
+    public void createCatalog(
+            String catalogName, CatalogDescriptor catalogDescriptor, boolean 
ignoreIfExists)
             throws CatalogException {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(catalogName),
                 "Catalog name cannot be null or empty.");
         checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null");
 
         if (catalogStoreHolder.catalogStore().contains(catalogName)) {
-            throw new CatalogException(
-                    format("Catalog %s already exists in catalog store.", 
catalogName));
+            if (!ignoreIfExists) {

Review Comment:
   The logic is a little bit cumbersome, what about
   
   ```java
   public void createCatalog(
           String catalogName, CatalogDescriptor catalogDescriptor, boolean 
ignoreIfExists)
           throws CatalogException {
       checkArgument(
               !StringUtils.isNullOrWhitespaceOnly(catalogName),
               "Catalog name cannot be null or empty.");
       checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null");
   
       boolean catalogExistsInStore = 
catalogStoreHolder.catalogStore().contains(catalogName);
       boolean catalogExistsInMemory = catalogs.containsKey(catalogName);
   
       if (catalogExistsInStore || catalogExistsInMemory) {
           if (!ignoreIfExists) {
               throw new CatalogException(
                       format("Catalog %s already exists.", catalogName));
           }
       } else {
           // Store the catalog in the catalog store
           catalogStoreHolder.catalogStore().storeCatalog(catalogName, 
catalogDescriptor);
           
           // Initialize and store the catalog in memory
           Catalog catalog = initCatalog(catalogName, catalogDescriptor);
           catalog.open();
           catalogs.put(catalogName, catalog);
       }
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to