This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4044b9ea42a [FLINK-34917][table] Support `CREATE CATALOG IF NOT 
EXISTS` with comment
4044b9ea42a is described below

commit 4044b9ea42aa6cf4a638b4ed46219fed94ce84bf
Author: Yubin Li <lixin58...@163.com>
AuthorDate: Fri Jun 14 20:47:22 2024 +0800

    [FLINK-34917][table] Support `CREATE CATALOG IF NOT EXISTS` with comment
    
    This closes #24934
---
 docs/content.zh/docs/dev/table/sql/create.md       | 15 ++++--
 docs/content/docs/dev/table/sql/create.md          |  7 ++-
 .../src/test/resources/sql/catalog_database.q      | 50 ++++++++++++++++++
 .../service/session/SessionManagerImplTest.java    |  2 +-
 .../src/test/resources/sql/catalog_database.q      | 59 ++++++++++++++++++++++
 .../src/main/codegen/includes/parserImpls.ftl      | 16 ++++--
 .../flink/sql/parser/ddl/SqlCreateCatalog.java     | 33 ++++++++++--
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 21 ++++++--
 .../flink/table/api/internal/ShowCreateUtil.java   |  6 ++-
 .../apache/flink/table/catalog/CatalogManager.java | 34 ++++++++-----
 .../table/operations/DescribeCatalogOperation.java |  4 +-
 .../operations/ddl/CreateCatalogOperation.java     | 24 ++++++++-
 .../flink/table/catalog/CatalogManagerTest.java    | 33 ++++++++++--
 .../flink/table/catalog/CatalogDescriptor.java     | 23 ++++++++-
 .../converters/SqlCreateCatalogConverter.java      | 12 ++++-
 15 files changed, 298 insertions(+), 41 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sql/create.md 
b/docs/content.zh/docs/dev/table/sql/create.md
index f6c8674b6c8..69467cd4042 100644
--- a/docs/content.zh/docs/dev/table/sql/create.md
+++ b/docs/content.zh/docs/dev/table/sql/create.md
@@ -628,18 +628,23 @@ INSERT INTO my_rtas_table SELECT id, name, age FROM 
source_table WHERE mod(id, 1
 ## CREATE CATALOG
 
 ```sql
-CREATE CATALOG catalog_name
+CREATE CATALOG [IF NOT EXISTS] catalog_name
+  [COMMENT catalog_comment]
   WITH (key1=val1, key2=val2, ...)
 ```
 
-Create a catalog with the given catalog properties. If a catalog with the same 
name already exists, an exception is thrown.
+根据给定的属性创建 catalog。若已存在同名 catalog,会抛出异常。
+
+**IF NOT EXISTS**
+
+若 catalog 已经存在,则不会进行任何操作。
 
 **WITH OPTIONS**
 
-Catalog properties used to store extra information related to this catalog.
-The key and value of expression `key1=val1` should both be string literal.
+catalog 属性一般用于存储关于这个 catalog 的额外的信息。
+表达式 `key1=val1` 中的键和值都需要是字符串文本常量。
 
-Check out more details at [Catalogs]({{< ref "docs/dev/table/catalogs" >}}).
+详情见 [Catalogs]({{< ref "docs/dev/table/catalogs" >}})。
 
 {{< top >}}
 
diff --git a/docs/content/docs/dev/table/sql/create.md 
b/docs/content/docs/dev/table/sql/create.md
index 3d94b111c53..136468190e4 100644
--- a/docs/content/docs/dev/table/sql/create.md
+++ b/docs/content/docs/dev/table/sql/create.md
@@ -629,12 +629,17 @@ If you want to enable atomicity for RTAS, then you should 
make sure:
 ## CREATE CATALOG
 
 ```sql
-CREATE CATALOG catalog_name
+CREATE CATALOG [IF NOT EXISTS] catalog_name
+  [COMMENT catalog_comment]
   WITH (key1=val1, key2=val2, ...)
 ```
 
 Create a catalog with the given catalog properties. If a catalog with the same 
name already exists, an exception is thrown.
 
+**IF NOT EXISTS**
+
+If the catalog already exists, nothing happens.
+
 **WITH OPTIONS**
 
 Catalog properties used to store extra information related to this catalog.
diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
index b99af7344f9..a2ae9a2fe1a 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
@@ -35,6 +35,7 @@ org.apache.flink.sql.parser.impl.ParseException: Encountered 
"." at line 1, colu
 Was expecting one of:
     <EOF>
     "WITH" ...
+    "COMMENT" ...
     ";" ...
 !error
 
@@ -92,6 +93,55 @@ drop catalog c1;
 org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a 
catalog which is currently in use.
 !error
 
+create catalog cat_comment comment 'hello ''catalog''' WITH 
('type'='generic_in_memory', 'default-database'='db');
+[INFO] Execute statement succeeded.
+!info
+
+show create catalog cat_comment;
++--------------------------------------------------------------------------------------------------------------------------------+
+|                                                                              
                                           result |
++--------------------------------------------------------------------------------------------------------------------------------+
+| CREATE CATALOG `cat_comment` COMMENT 'hello ''catalog''' WITH (
+  'default-database' = 'db',
+  'type' = 'generic_in_memory'
+)
+ |
++--------------------------------------------------------------------------------------------------------------------------------+
+1 row in set
+!ok
+
+describe catalog cat_comment;
++-----------+-------------------+
+| info name |        info value |
++-----------+-------------------+
+|      name |       cat_comment |
+|      type | generic_in_memory |
+|   comment |   hello 'catalog' |
++-----------+-------------------+
+3 rows in set
+!ok
+
+describe catalog extended cat_comment;
++-------------------------+-------------------+
+|               info name |        info value |
++-------------------------+-------------------+
+|                    name |       cat_comment |
+|                    type | generic_in_memory |
+|                 comment |   hello 'catalog' |
+| option:default-database |                db |
++-------------------------+-------------------+
+4 rows in set
+!ok
+
+create catalog if not exists cat_comment comment 'hello' with ('type' = 
'generic_in_memory');
+[INFO] Execute statement succeeded.
+!info
+
+create catalog cat_comment comment 'hello2' with ('type' = 
'generic_in_memory');
+[ERROR] Could not execute SQL statement. Reason:
+org.apache.flink.table.catalog.exceptions.CatalogException: Catalog 
cat_comment already exists.
+!error
+
 create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
 [INFO] Execute statement succeeded.
 !info
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerImplTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerImplTest.java
index 32fa747894d..be580c2729f 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerImplTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerImplTest.java
@@ -155,7 +155,7 @@ class SessionManagerImplTest {
                                                 "cat2",
                                                 CatalogDescriptor.of("cat2", 
configuration)))
                 .isInstanceOf(CatalogException.class)
-                .hasMessageContaining("Catalog cat2 already exists in catalog 
store.");
+                .hasMessageContaining("Catalog cat2 already exists.");
 
         sessionManager.stop();
     }
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
index cd7658ec0fc..687330e4a50 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
@@ -35,6 +35,7 @@ org.apache.flink.sql.parser.impl.ParseException: Encountered 
"." at line 1, colu
 Was expecting one of:
     <EOF> 
     "WITH" ...
+    "COMMENT" ...
     ";" ...
 !error
 
@@ -108,6 +109,64 @@ drop catalog default_catalog;
 1 row in set
 !ok
 
+create catalog cat_comment comment 'hello ''catalog''' WITH 
('type'='generic_in_memory', 'default-database'='db');
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+show create catalog cat_comment;
+!output
+CREATE CATALOG `cat_comment` COMMENT 'hello ''catalog''' WITH (
+  'default-database' = 'db',
+  'type' = 'generic_in_memory'
+)
+!ok
+
+describe catalog cat_comment;
+!output
++-----------+-------------------+
+| info name |        info value |
++-----------+-------------------+
+|      name |       cat_comment |
+|      type | generic_in_memory |
+|   comment |   hello 'catalog' |
++-----------+-------------------+
+3 rows in set
+!ok
+
+describe catalog extended cat_comment;
+!output
++-------------------------+-------------------+
+|               info name |        info value |
++-------------------------+-------------------+
+|                    name |       cat_comment |
+|                    type | generic_in_memory |
+|                 comment |   hello 'catalog' |
+| option:default-database |                db |
++-------------------------+-------------------+
+4 rows in set
+!ok
+
+create catalog if not exists cat_comment comment 'hello' with ('type' = 
'generic_in_memory');
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+create catalog cat_comment comment 'hello2' with ('type' = 
'generic_in_memory');
+!output
+org.apache.flink.table.catalog.exceptions.CatalogException: Catalog 
cat_comment already exists.
+!error
+
 create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
 !output
 +--------+
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index b7e3db3942e..b4ea9664c4d 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -108,17 +108,26 @@ SqlUseCatalog SqlUseCatalog() :
 
 /**
 * Parses a create catalog statement.
-* CREATE CATALOG catalog_name [WITH (property_name=property_value, ...)];
+* CREATE CATALOG [IF NOT EXISTS] catalog_name [COMMENT 'comment_value'] [WITH 
(property_name=property_value, ...)];
 */
 SqlCreate SqlCreateCatalog(Span s, boolean replace) :
 {
     SqlParserPos startPos;
     SqlIdentifier catalogName;
     SqlNodeList propertyList = SqlNodeList.EMPTY;
+    SqlNode comment = null;
+    boolean ifNotExists = false;
 }
 {
     <CATALOG> { startPos = getPos(); }
+
+    ifNotExists = IfNotExistsOpt()
+
     catalogName = SimpleIdentifier()
+    [
+        <COMMENT>
+        comment = StringLiteral()
+    ]
     [
         <WITH>
         propertyList = Properties()
@@ -126,7 +135,9 @@ SqlCreate SqlCreateCatalog(Span s, boolean replace) :
     {
         return new SqlCreateCatalog(startPos.plus(getPos()),
             catalogName,
-            propertyList);
+            propertyList,
+            comment,
+            ifNotExists);
     }
 }
 
@@ -149,7 +160,6 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
 
 /**
 * Parses an alter catalog statement.
-* ALTER CATALOG catalog_name SET (key1=val1, key2=val2, ...);
 */
 SqlAlterCatalog SqlAlterCatalog() :
 {
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java
index c80facc9ebc..d95427b9fdc 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java
@@ -31,7 +31,10 @@ import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.ImmutableNullableList;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
+import java.util.Optional;
 
 import static java.util.Objects.requireNonNull;
 
@@ -45,11 +48,18 @@ public class SqlCreateCatalog extends SqlCreate {
 
     private final SqlNodeList propertyList;
 
+    @Nullable private final SqlNode comment;
+
     public SqlCreateCatalog(
-            SqlParserPos position, SqlIdentifier catalogName, SqlNodeList 
propertyList) {
-        super(OPERATOR, position, false, false);
+            SqlParserPos position,
+            SqlIdentifier catalogName,
+            SqlNodeList propertyList,
+            @Nullable SqlNode comment,
+            boolean ifNotExists) {
+        super(OPERATOR, position, false, ifNotExists);
         this.catalogName = requireNonNull(catalogName, "catalogName cannot be 
null");
         this.propertyList = requireNonNull(propertyList, "propertyList cannot 
be null");
+        this.comment = comment;
     }
 
     @Override
@@ -59,7 +69,7 @@ public class SqlCreateCatalog extends SqlCreate {
 
     @Override
     public List<SqlNode> getOperandList() {
-        return ImmutableNullableList.of(catalogName, propertyList);
+        return ImmutableNullableList.of(catalogName, propertyList, comment);
     }
 
     public SqlIdentifier getCatalogName() {
@@ -70,11 +80,28 @@ public class SqlCreateCatalog extends SqlCreate {
         return propertyList;
     }
 
+    public Optional<SqlNode> getComment() {
+        return Optional.ofNullable(comment);
+    }
+
+    public boolean isIfNotExists() {
+        return ifNotExists;
+    }
+
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         writer.keyword("CREATE CATALOG");
+        if (isIfNotExists()) {
+            writer.keyword("IF NOT EXISTS");
+        }
         catalogName.unparse(writer, leftPrec, rightPrec);
 
+        if (comment != null) {
+            writer.newlineAndIndent();
+            writer.keyword("COMMENT");
+            comment.unparse(writer, leftPrec, rightPrec);
+        }
+
         if (this.propertyList.size() > 0) {
             writer.keyword("WITH");
             SqlWriter.Frame withFrame = writer.startList("(", ")");
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 1cb44ce42c4..8f75d9f0ad6 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -26,12 +26,15 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.parser.SqlParserFixture;
 import org.apache.calcite.sql.parser.SqlParserTest;
+import org.apache.commons.lang3.StringUtils;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -121,15 +124,25 @@ class FlinkSqlParserImplTest extends SqlParserTest {
         sql("use catalog a").ok("USE CATALOG `A`");
     }
 
-    @Test
-    void testCreateCatalog() {
-        sql("create catalog c1\n"
+    @ParameterizedTest
+    @CsvSource({"true,true", "true,false", "false,true", "false,false"})
+    void testCreateCatalog(boolean ifNotExists, boolean comment) {
+        String ifNotExistsClause = ifNotExists ? "if not exists " : "";
+        String commentClause = comment ? "\ncomment 'HELLO' " : " ";
+
+        sql("create catalog "
+                        + ifNotExistsClause
+                        + "c1"
+                        + commentClause
                         + " WITH (\n"
                         + "  'key1'='value1',\n"
                         + "  'key2'='value2'\n"
                         + " )\n")
                 .ok(
-                        "CREATE CATALOG `C1` "
+                        "CREATE CATALOG "
+                                + StringUtils.upperCase(ifNotExistsClause)
+                                + "`C1`"
+                                + StringUtils.upperCase(commentClause)
                                 + "WITH (\n"
                                 + "  'key1' = 'value1',\n"
                                 + "  'key2' = 'value2'\n"
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index c11aca26046..b3a5d8e6f02 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -117,9 +117,13 @@ public class ShowCreateUtil {
 
     public static String buildShowCreateCatalogRow(CatalogDescriptor 
catalogDescriptor) {
         final String printIndent = "  ";
+        final String comment = catalogDescriptor.getComment().orElse(null);
         return String.format(
-                "CREATE CATALOG %s WITH (%s%s%s)%s",
+                "CREATE CATALOG %s %sWITH (%s%s%s)%s",
                 escapeIdentifier(catalogDescriptor.getCatalogName()),
+                StringUtils.isNotEmpty(comment)
+                        ? String.format("COMMENT '%s' ", 
EncodingUtils.escapeSingleQuotes(comment))
+                        : "",
                 System.lineSeparator(),
                 
extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(), 
printIndent)
                         .orElse(""),
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index fc16f7ae057..f97a625a476 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -295,31 +295,39 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
      *
      * @param catalogName the given catalog name under which to create the 
given catalog
      * @param catalogDescriptor catalog descriptor for creating catalog
+     * @param ignoreIfExists if false exception will be thrown if a catalog 
exists.
      * @throws CatalogException If the catalog already exists in the catalog 
store or initialized
      *     catalogs, or if an error occurs while creating the catalog or 
storing the {@link
      *     CatalogDescriptor}
      */
-    public void createCatalog(String catalogName, CatalogDescriptor 
catalogDescriptor)
+    public void createCatalog(
+            String catalogName, CatalogDescriptor catalogDescriptor, boolean 
ignoreIfExists)
             throws CatalogException {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(catalogName),
                 "Catalog name cannot be null or empty.");
         checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null");
 
-        if (catalogStoreHolder.catalogStore().contains(catalogName)) {
-            throw new CatalogException(
-                    format("Catalog %s already exists in catalog store.", 
catalogName));
-        }
-        if (catalogs.containsKey(catalogName)) {
-            throw new CatalogException(
-                    format("Catalog %s already exists in initialized 
catalogs.", catalogName));
-        }
+        boolean catalogExistsInStore = 
catalogStoreHolder.catalogStore().contains(catalogName);
+        boolean catalogExistsInMemory = catalogs.containsKey(catalogName);
 
-        Catalog catalog = initCatalog(catalogName, catalogDescriptor);
-        catalog.open();
-        catalogs.put(catalogName, catalog);
+        if (catalogExistsInStore || catalogExistsInMemory) {
+            if (!ignoreIfExists) {
+                throw new CatalogException(format("Catalog %s already 
exists.", catalogName));
+            }
+        } else {
+            // Store the catalog in the catalog store
+            catalogStoreHolder.catalogStore().storeCatalog(catalogName, 
catalogDescriptor);
+
+            // Initialize and store the catalog in memory
+            Catalog catalog = initCatalog(catalogName, catalogDescriptor);
+            catalog.open();
+            catalogs.put(catalogName, catalog);
+        }
+    }
 
-        catalogStoreHolder.catalogStore().storeCatalog(catalogName, 
catalogDescriptor);
+    public void createCatalog(String catalogName, CatalogDescriptor 
catalogDescriptor) {
+        createCatalog(catalogName, catalogDescriptor, false);
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
index d3910a673cc..08a97d10bec 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
@@ -84,8 +84,8 @@ public class DescribeCatalogOperation implements Operation, 
ExecutableOperation
                                         "type",
                                         properties.getOrDefault(
                                                 
CommonCatalogOptions.CATALOG_TYPE.key(), "")),
-                                // TODO: Show the catalog comment until 
FLINK-34918 is resolved
-                                Arrays.asList("comment", "")));
+                                Arrays.asList(
+                                        "comment", 
catalogDescriptor.getComment().orElse(null))));
         if (isExtended) {
             properties.entrySet().stream()
                     .filter(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
index 087386c7521..cdacafadefc 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.OperationUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -39,10 +41,18 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class CreateCatalogOperation implements CreateOperation {
     private final String catalogName;
     private final Map<String, String> properties;
+    @Nullable private final String comment;
+    private final boolean ignoreIfExists;
 
-    public CreateCatalogOperation(String catalogName, Map<String, String> 
properties) {
+    public CreateCatalogOperation(
+            String catalogName,
+            Map<String, String> properties,
+            @Nullable String comment,
+            boolean ignoreIfExists) {
         this.catalogName = checkNotNull(catalogName);
         this.properties = 
Collections.unmodifiableMap(checkNotNull(properties));
+        this.comment = comment;
+        this.ignoreIfExists = ignoreIfExists;
     }
 
     public String getCatalogName() {
@@ -53,11 +63,19 @@ public class CreateCatalogOperation implements 
CreateOperation {
         return properties;
     }
 
+    public boolean isIgnoreIfExists() {
+        return ignoreIfExists;
+    }
+
     @Override
     public String asSummaryString() {
         Map<String, Object> params = new LinkedHashMap<>();
         params.put("catalogName", catalogName);
         params.put("properties", properties);
+        if (comment != null) {
+            params.put("comment", comment);
+        }
+        params.put("ignoreIfExists", ignoreIfExists);
 
         return OperationUtils.formatWithChildren(
                 "CREATE CATALOG", params, Collections.emptyList(), 
Operation::asSummaryString);
@@ -69,7 +87,9 @@ public class CreateCatalogOperation implements 
CreateOperation {
             ctx.getCatalogManager()
                     .createCatalog(
                             catalogName,
-                            CatalogDescriptor.of(catalogName, 
Configuration.fromMap(properties)));
+                            CatalogDescriptor.of(
+                                    catalogName, 
Configuration.fromMap(properties), comment),
+                            ignoreIfExists);
 
             return TableResultImpl.TABLE_RESULT_OK;
         } catch (CatalogException e) {
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
index b522ac1265b..632387a0ebc 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -367,14 +368,39 @@ class CatalogManagerTest {
         catalogManager.createCatalog("cat1", CatalogDescriptor.of("cat1", 
configuration));
         catalogManager.createCatalog("cat2", CatalogDescriptor.of("cat2", 
configuration));
         catalogManager.createCatalog("cat3", CatalogDescriptor.of("cat3", 
configuration));
+        catalogManager.createCatalog(
+                "cat_comment",
+                CatalogDescriptor.of("cat_comment", configuration.clone(), 
"comment for catalog"));
+        catalogManager.createCatalog(
+                "cat_comment",
+                CatalogDescriptor.of(
+                        "cat_comment", configuration.clone(), "second comment 
for catalog"),
+                true);
+        assertThatThrownBy(
+                        () ->
+                                catalogManager.createCatalog(
+                                        "cat_comment",
+                                        CatalogDescriptor.of(
+                                                "cat_comment",
+                                                configuration.clone(),
+                                                "third comment for catalog"),
+                                        false))
+                .isInstanceOf(CatalogException.class)
+                .hasMessage("Catalog cat_comment already exists.");
 
         assertTrue(catalogManager.getCatalog("cat1").isPresent());
         assertTrue(catalogManager.getCatalog("cat2").isPresent());
         assertTrue(catalogManager.getCatalog("cat3").isPresent());
+        assertTrue(catalogManager.getCatalog("cat_comment").isPresent());
+        
assertTrue(catalogManager.getCatalogDescriptor("cat_comment").isPresent());
+        assertEquals(
+                "comment for catalog",
+                
catalogManager.getCatalogDescriptor("cat_comment").get().getComment().get());
 
         assertTrue(catalogManager.listCatalogs().contains("cat1"));
         assertTrue(catalogManager.listCatalogs().contains("cat2"));
         assertTrue(catalogManager.listCatalogs().contains("cat3"));
+        assertTrue(catalogManager.listCatalogs().contains("cat_comment"));
 
         catalogManager.registerCatalog("cat4", new 
GenericInMemoryCatalog("cat4"));
 
@@ -383,14 +409,14 @@ class CatalogManagerTest {
                                 catalogManager.createCatalog(
                                         "cat1", CatalogDescriptor.of("cat1", 
configuration)))
                 .isInstanceOf(CatalogException.class)
-                .hasMessageContaining("Catalog cat1 already exists in catalog 
store.");
+                .hasMessageContaining("Catalog cat1 already exists.");
 
         assertThatThrownBy(
                         () ->
                                 catalogManager.createCatalog(
                                         "cat4", CatalogDescriptor.of("cat4", 
configuration)))
                 .isInstanceOf(CatalogException.class)
-                .hasMessageContaining("Catalog cat4 already exists in 
initialized catalogs.");
+                .hasMessageContaining("Catalog cat4 already exists.");
 
         catalogManager.createDatabase(
                 "exist_cat",
@@ -419,7 +445,8 @@ class CatalogManagerTest {
                                         "cat3",
                                         "cat4",
                                         "default_catalog",
-                                        "exist_cat")));
+                                        "exist_cat",
+                                        "cat_comment")));
         catalogManager.setCurrentDatabase("cat_db");
         
assertThat(catalogManager.listTables()).isEqualTo(Collections.singleton("test_table"));
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java
index 4daf5d1410e..f984203672d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java
@@ -21,6 +21,10 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
 /**
  * Describes a {@link Catalog} with the catalog name and configuration.
  *
@@ -40,6 +44,9 @@ public class CatalogDescriptor {
     /* The configuration used to discover and construct the catalog. */
     private final Configuration configuration;
 
+    /* Catalog comment. */
+    @Nullable private final String comment;
+
     public String getCatalogName() {
         return catalogName;
     }
@@ -48,9 +55,15 @@ public class CatalogDescriptor {
         return configuration;
     }
 
-    private CatalogDescriptor(String catalogName, Configuration configuration) 
{
+    public Optional<String> getComment() {
+        return Optional.ofNullable(comment);
+    }
+
+    private CatalogDescriptor(
+            String catalogName, Configuration configuration, @Nullable String 
comment) {
         this.catalogName = catalogName;
         this.configuration = configuration;
+        this.comment = comment;
     }
 
     /**
@@ -58,8 +71,14 @@ public class CatalogDescriptor {
      *
      * @param catalogName the name of the catalog
      * @param configuration the configuration of the catalog
+     * @param comment the comment of the catalog
      */
+    public static CatalogDescriptor of(
+            String catalogName, Configuration configuration, String comment) {
+        return new CatalogDescriptor(catalogName, configuration, comment);
+    }
+
     public static CatalogDescriptor of(String catalogName, Configuration 
configuration) {
-        return new CatalogDescriptor(catalogName, configuration);
+        return new CatalogDescriptor(catalogName, configuration, null);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
index 8d2c4c4731e..ea65f8f4ed8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
@@ -23,6 +23,9 @@ import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
 
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.util.NlsString;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -41,6 +44,13 @@ public class SqlCreateCatalogConverter implements 
SqlNodeConverter<SqlCreateCata
                                         ((SqlTableOption) p).getKeyString(),
                                         ((SqlTableOption) 
p).getValueString()));
 
-        return new CreateCatalogOperation(node.catalogName(), properties);
+        return new CreateCatalogOperation(
+                node.catalogName(),
+                properties,
+                node.getComment()
+                        .map(SqlCharStringLiteral.class::cast)
+                        .map(c -> c.getValueAs(NlsString.class).getValue())
+                        .orElse(null),
+                node.isIfNotExists());
     }
 }

Reply via email to