This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d169038784 [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax
9d169038784 is described below

commit 9d1690387849303b27050bb0cefaa1bad6e3fb98
Author: Yubin Li <lixin58...@163.com>
AuthorDate: Thu Jun 13 19:19:47 2024 +0800

    [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax
    
    This closes #24763
---
 docs/content.zh/docs/dev/table/sql/alter.md        |  20 +-
 docs/content/docs/dev/table/sql/alter.md           |  20 +-
 .../src/test/resources/sql/catalog_database.q      | 187 ++++++++++--------
 .../src/test/resources/sql/catalog_database.q      | 218 ++++++++++++---------
 .../src/main/codegen/data/Parser.tdd               |   1 +
 .../src/main/codegen/includes/parserImpls.ftl      |  24 ++-
 .../flink/sql/parser/ddl/SqlAlterCatalogReset.java |  76 +++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |   3 +-
 .../apache/flink/table/catalog/CatalogManager.java |  12 +-
 .../ddl/AlterCatalogOptionsOperation.java          |   4 +-
 ...ration.java => AlterCatalogResetOperation.java} |  30 ++-
 .../converters/SqlAlterCatalogResetConverter.java  |  48 +++++
 .../operations/converters/SqlNodeConverters.java   |   1 +
 .../operations/SqlDdlToOperationConverterTest.java |  21 ++
 14 files changed, 448 insertions(+), 217 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sql/alter.md 
b/docs/content.zh/docs/dev/table/sql/alter.md
index 808ee893023..39de6985c9d 100644
--- a/docs/content.zh/docs/dev/table/sql/alter.md
+++ b/docs/content.zh/docs/dev/table/sql/alter.md
@@ -28,7 +28,7 @@ under the License.
 
 
 
-ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 
中注册的表、视图或函数定义。
+ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 
中注册的表、视图或函数定义,或 catalog 本身的定义。
 
 Flink SQL 目前支持以下 ALTER 语句:
 
@@ -36,6 +36,7 @@ Flink SQL 目前支持以下 ALTER 语句:
 - ALTER VIEW
 - ALTER DATABASE
 - ALTER FUNCTION
+- ALTER CATALOG
 
 ## 执行 ALTER 语句
 
@@ -538,10 +539,14 @@ ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
 
 Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA,SCALA 和 PYTHON,且函数的默认语言为 
JAVA。
 
+{{< top >}}
+
 ## ALTER CATALOG
 
 ```sql
-ALTER CATALOG catalog_name SET (key1=val1, ...)
+ALTER CATALOG catalog_name 
+    SET (key1=val1, ...)
+  | RESET (key1, ...)
 ```
 
 ### SET
@@ -555,4 +560,15 @@ ALTER CATALOG catalog_name SET (key1=val1, ...)
 ALTER CATALOG cat2 SET ('default-database'='db');
 ```
 
+### RESET
+
+为指定的 catalog 重置一个或多个属性。
+
+`RESET` 语句示例如下。
+
+```sql
+-- reset 'default-database'
+ALTER CATALOG cat2 RESET ('default-database');
+```
+
 {{< top >}}
diff --git a/docs/content/docs/dev/table/sql/alter.md 
b/docs/content/docs/dev/table/sql/alter.md
index 54fdf8f15b0..e842d137ce0 100644
--- a/docs/content/docs/dev/table/sql/alter.md
+++ b/docs/content/docs/dev/table/sql/alter.md
@@ -28,7 +28,7 @@ under the License.
 
 
 
-ALTER statements are used to modified a registered table/view/function 
definition in the [Catalog]({{< ref "docs/dev/table/catalogs" >}}).
+ALTER statements are used to modify the definition of a table, view or 
function that has already been registered in the [Catalog]({{< ref 
"docs/dev/table/catalogs" >}}), or the definition of a catalog itself.
 
 Flink SQL supports the following ALTER statements for now:
 
@@ -36,6 +36,7 @@ Flink SQL supports the following ALTER statements for now:
 - ALTER VIEW
 - ALTER DATABASE
 - ALTER FUNCTION
+- ALTER CATALOG
 
 ## Run an ALTER statement
 
@@ -540,10 +541,14 @@ If the function doesn't exist, nothing happens.
 
 Language tag to instruct flink runtime how to execute the function. Currently 
only JAVA, SCALA and PYTHON are supported, the default language for a function 
is JAVA.
 
+{{< top >}}
+
 ## ALTER CATALOG
 
 ```sql
-ALTER CATALOG catalog_name SET (key1=val1, ...)
+ALTER CATALOG catalog_name 
+    SET (key1=val1, ...)
+  | RESET (key1, ...)
 ```
 
 ### SET
@@ -557,4 +562,15 @@ The following examples illustrate the usage of the `SET` 
statements.
 ALTER CATALOG cat2 SET ('default-database'='db');
 ```
 
+### RESET
+
+Reset one or more properties to its default value in the specified catalog.
+
+The following examples illustrate the usage of the `RESET` statements.
+
+```sql
+-- reset 'default-database'
+ALTER CATALOG cat2 RESET ('default-database');
+```
+
 {{< top >}}
diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
index c9a5b02116a..b99af7344f9 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
@@ -92,6 +92,110 @@ drop catalog c1;
 org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a 
catalog which is currently in use.
 !error
 
+create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
+[INFO] Execute statement succeeded.
+!info
+
+show create catalog cat2;
++---------------------------------------------------------------------------------------------+
+|                                                                              
        result |
++---------------------------------------------------------------------------------------------+
+| CREATE CATALOG `cat2` WITH (
+  'default-database' = 'db',
+  'type' = 'generic_in_memory'
+)
+ |
++---------------------------------------------------------------------------------------------+
+1 row in set
+!ok
+
+describe catalog cat2;
++-----------+-------------------+
+| info name |        info value |
++-----------+-------------------+
+|      name |              cat2 |
+|      type | generic_in_memory |
+|   comment |                   |
++-----------+-------------------+
+3 rows in set
+!ok
+
+describe catalog extended cat2;
++-------------------------+-------------------+
+|               info name |        info value |
++-------------------------+-------------------+
+|                    name |              cat2 |
+|                    type | generic_in_memory |
+|                 comment |                   |
+| option:default-database |                db |
++-------------------------+-------------------+
+4 rows in set
+!ok
+
+desc catalog cat2;
++-----------+-------------------+
+| info name |        info value |
++-----------+-------------------+
+|      name |              cat2 |
+|      type | generic_in_memory |
+|   comment |                   |
++-----------+-------------------+
+3 rows in set
+!ok
+
+desc catalog extended cat2;
++-------------------------+-------------------+
+|               info name |        info value |
++-------------------------+-------------------+
+|                    name |              cat2 |
+|                    type | generic_in_memory |
+|                 comment |                   |
+| option:default-database |                db |
++-------------------------+-------------------+
+4 rows in set
+!ok
+
+alter catalog cat2 set ('default-database'='db_new');
+[INFO] Execute statement succeeded.
+!info
+
+desc catalog extended cat2;
++-------------------------+-------------------+
+|               info name |        info value |
++-------------------------+-------------------+
+|                    name |              cat2 |
+|                    type | generic_in_memory |
+|                 comment |                   |
+| option:default-database |            db_new |
++-------------------------+-------------------+
+4 rows in set
+!ok
+
+alter catalog cat2 reset ('default-database', 'k1');
+[INFO] Execute statement succeeded.
+!info
+
+desc catalog extended cat2;
++-----------+-------------------+
+| info name |        info value |
++-----------+-------------------+
+|      name |              cat2 |
+|      type | generic_in_memory |
+|   comment |                   |
++-----------+-------------------+
+3 rows in set
+!ok
+
+alter catalog cat2 reset ('type');
+[ERROR] Could not execute SQL statement. Reason:
+org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not 
support changing 'type'
+!error
+
+alter catalog cat2 reset ();
+[ERROR] Could not execute SQL statement. Reason:
+org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not 
support empty key
+!error
+
 # ==========================================================================
 # test database
 # ==========================================================================
@@ -686,86 +790,3 @@ show tables from db1 like 'p_r%';
 +------------+
 1 row in set
 !ok
-
-# ==========================================================================
-# test catalog
-# ==========================================================================
-
-create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
-[INFO] Execute statement succeeded.
-!info
-
-show create catalog cat2;
-+---------------------------------------------------------------------------------------------+
-|                                                                              
        result |
-+---------------------------------------------------------------------------------------------+
-| CREATE CATALOG `cat2` WITH (
-  'default-database' = 'db',
-  'type' = 'generic_in_memory'
-)
- |
-+---------------------------------------------------------------------------------------------+
-1 row in set
-!ok
-
-describe catalog cat2;
-+-----------+-------------------+
-| info name |        info value |
-+-----------+-------------------+
-|      name |              cat2 |
-|      type | generic_in_memory |
-|   comment |                   |
-+-----------+-------------------+
-3 rows in set
-!ok
-
-describe catalog extended cat2;
-+-------------------------+-------------------+
-|               info name |        info value |
-+-------------------------+-------------------+
-|                    name |              cat2 |
-|                    type | generic_in_memory |
-|                 comment |                   |
-| option:default-database |                db |
-+-------------------------+-------------------+
-4 rows in set
-!ok
-
-desc catalog cat2;
-+-----------+-------------------+
-| info name |        info value |
-+-----------+-------------------+
-|      name |              cat2 |
-|      type | generic_in_memory |
-|   comment |                   |
-+-----------+-------------------+
-3 rows in set
-!ok
-
-desc catalog extended cat2;
-+-------------------------+-------------------+
-|               info name |        info value |
-+-------------------------+-------------------+
-|                    name |              cat2 |
-|                    type | generic_in_memory |
-|                 comment |                   |
-| option:default-database |                db |
-+-------------------------+-------------------+
-4 rows in set
-!ok
-
-alter catalog cat2 set ('default-database'='db_new');
-[INFO] Execute statement succeeded.
-!info
-
-desc catalog extended cat2;
-+-------------------------+-------------------+
-|               info name |        info value |
-+-------------------------+-------------------+
-|                    name |              cat2 |
-|                    type | generic_in_memory |
-|                 comment |                   |
-| option:default-database |            db_new |
-+-------------------------+-------------------+
-4 rows in set
-!ok
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
index 063edbabd23..cd7658ec0fc 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
@@ -108,6 +108,129 @@ drop catalog default_catalog;
 1 row in set
 !ok
 
+create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+show create catalog cat2;
+!output
+CREATE CATALOG `cat2` WITH (
+  'default-database' = 'db',
+  'type' = 'generic_in_memory'
+)
+!ok
+
+describe catalog cat2;
+!output
++-----------+-------------------+
+| info name |        info value |
++-----------+-------------------+
+|      name |              cat2 |
+|      type | generic_in_memory |
+|   comment |                   |
++-----------+-------------------+
+3 rows in set
+!ok
+
+describe catalog extended cat2;
+!output
++-------------------------+-------------------+
+|               info name |        info value |
++-------------------------+-------------------+
+|                    name |              cat2 |
+|                    type | generic_in_memory |
+|                 comment |                   |
+| option:default-database |                db |
++-------------------------+-------------------+
+4 rows in set
+!ok
+
+desc catalog cat2;
+!output
++-----------+-------------------+
+| info name |        info value |
++-----------+-------------------+
+|      name |              cat2 |
+|      type | generic_in_memory |
+|   comment |                   |
++-----------+-------------------+
+3 rows in set
+!ok
+
+desc catalog extended cat2;
+!output
++-------------------------+-------------------+
+|               info name |        info value |
++-------------------------+-------------------+
+|                    name |              cat2 |
+|                    type | generic_in_memory |
+|                 comment |                   |
+| option:default-database |                db |
++-------------------------+-------------------+
+4 rows in set
+!ok
+
+alter catalog cat2 set ('default-database'='db_new');
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+desc catalog extended cat2;
+!output
++-------------------------+-------------------+
+|               info name |        info value |
++-------------------------+-------------------+
+|                    name |              cat2 |
+|                    type | generic_in_memory |
+|                 comment |                   |
+| option:default-database |            db_new |
++-------------------------+-------------------+
+4 rows in set
+!ok
+
+alter catalog cat2 reset ('default-database', 'k1');
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+desc catalog extended cat2;
+!output
++-----------+-------------------+
+| info name |        info value |
++-----------+-------------------+
+|      name |              cat2 |
+|      type | generic_in_memory |
+|   comment |                   |
++-----------+-------------------+
+3 rows in set
+!ok
+
+alter catalog cat2 reset ('type');
+!output
+org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not 
support changing 'type'
+!error
+
+alter catalog cat2 reset ();
+!output
+org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not 
support empty key
+!error
+
 # ==========================================================================
 # test database
 # ==========================================================================
@@ -816,98 +939,3 @@ show tables from db1 like 'p_r%';
 +------------+
 1 row in set
 !ok
-
-# ==========================================================================
-# test catalog
-# ==========================================================================
-
-create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
-
-show create catalog cat2;
-!output
-CREATE CATALOG `cat2` WITH (
-  'default-database' = 'db',
-  'type' = 'generic_in_memory'
-)
-!ok
-
-describe catalog cat2;
-!output
-+-----------+-------------------+
-| info name |        info value |
-+-----------+-------------------+
-|      name |              cat2 |
-|      type | generic_in_memory |
-|   comment |                   |
-+-----------+-------------------+
-3 rows in set
-!ok
-
-describe catalog extended cat2;
-!output
-+-------------------------+-------------------+
-|               info name |        info value |
-+-------------------------+-------------------+
-|                    name |              cat2 |
-|                    type | generic_in_memory |
-|                 comment |                   |
-| option:default-database |                db |
-+-------------------------+-------------------+
-4 rows in set
-!ok
-
-desc catalog cat2;
-!output
-+-----------+-------------------+
-| info name |        info value |
-+-----------+-------------------+
-|      name |              cat2 |
-|      type | generic_in_memory |
-|   comment |                   |
-+-----------+-------------------+
-3 rows in set
-!ok
-
-desc catalog extended cat2;
-!output
-+-------------------------+-------------------+
-|               info name |        info value |
-+-------------------------+-------------------+
-|                    name |              cat2 |
-|                    type | generic_in_memory |
-|                 comment |                   |
-| option:default-database |                db |
-+-------------------------+-------------------+
-4 rows in set
-!ok
-
-alter catalog cat2 set ('default-database'='db_new');
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
-
-desc catalog extended cat2;
-!output
-+-------------------------+-------------------+
-|               info name |        info value |
-+-------------------------+-------------------+
-|                    name |              cat2 |
-|                    type | generic_in_memory |
-|                 comment |                   |
-| option:default-database |            db_new |
-+-------------------------+-------------------+
-4 rows in set
-!ok
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index b9215460555..6f1365951f4 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -35,6 +35,7 @@
     
"org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext"
     "org.apache.flink.sql.parser.ddl.SqlAlterCatalog"
     "org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions"
+    "org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset"
     "org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
     "org.apache.flink.sql.parser.ddl.SqlAlterFunction"
     "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 6d524422625..4d7214325d4 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -160,13 +160,23 @@ SqlAlterCatalog SqlAlterCatalog() :
 {
     <ALTER> <CATALOG> { startPos = getPos(); }
     catalogName = SimpleIdentifier()
-    <SET>
-    propertyList = Properties()
-    {
-        return new SqlAlterCatalogOptions(startPos.plus(getPos()),
-                    catalogName,
-                    propertyList);
-    }
+    (
+        <SET>
+        propertyList = Properties()
+        {
+            return new SqlAlterCatalogOptions(startPos.plus(getPos()),
+                        catalogName,
+                        propertyList);
+        }
+    |
+        <RESET>
+        propertyList = PropertyKeys()
+        {
+            return new SqlAlterCatalogReset(startPos.plus(getPos()),
+                           catalogName,
+                           propertyList);
+        }
+    )
 }
 
 /**
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java
new file mode 100644
index 00000000000..3f05a0e574d
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/** ALTER CATALOG catalog_name RESET (key1, ...). */
+public class SqlAlterCatalogReset extends SqlAlterCatalog {
+
+    private final SqlNodeList propertyKeyList;
+
+    public SqlAlterCatalogReset(
+            SqlParserPos position, SqlIdentifier catalogName, SqlNodeList 
propertyKeyList) {
+        super(position, catalogName);
+        this.propertyKeyList = requireNonNull(propertyKeyList, 
"propertyKeyList cannot be null");
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(catalogName, propertyKeyList);
+    }
+
+    public SqlNodeList getPropertyList() {
+        return propertyKeyList;
+    }
+
+    public Set<String> getResetKeys() {
+        return propertyKeyList.getList().stream()
+                .map(key -> ((NlsString) SqlLiteral.value(key)).getValue())
+                .collect(Collectors.toSet());
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("RESET");
+        SqlWriter.Frame withFrame = writer.startList("(", ")");
+        for (SqlNode property : propertyKeyList) {
+            SqlUnparseUtils.printIndent(writer);
+            property.unparse(writer, leftPrec, rightPrec);
+        }
+        writer.newlineAndIndent();
+        writer.endList(withFrame);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index a6c2bab9b80..8fe3c21a41f 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -67,8 +67,9 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
     @Test
     void testAlterCatalog() {
-        sql("alter catalog a set ('k1'='v1','k2'='v2')")
+        sql("alter catalog a set ('k1'='v1', 'k2'='v2')")
                 .ok("ALTER CATALOG `A` SET (\n" + "  'k1' = 'v1',\n" + "  'k2' 
= 'v2'\n" + ")");
+        sql("alter catalog a reset ('k1')").ok("ALTER CATALOG `A` RESET (\n" + 
"  'k1'\n" + ")");
     }
 
     @Test
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 5e8826e6c3a..fc16f7ae057 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -326,22 +326,24 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
      * Alters a catalog under the given name. The catalog name must be unique.
      *
      * @param catalogName the given catalog name under which to alter the 
given catalog
-     * @param catalogDescriptor catalog descriptor for altering catalog
+     * @param catalogUpdater catalog configuration updater to alter catalog
      * @throws CatalogException If the catalog neither exists in the catalog 
store nor in the
      *     initialized catalogs, or if an error occurs while creating the 
catalog or storing the
      *     {@link CatalogDescriptor}
      */
-    public void alterCatalog(String catalogName, CatalogDescriptor 
catalogDescriptor)
+    public void alterCatalog(String catalogName, Consumer<Configuration> 
catalogUpdater)
             throws CatalogException {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(catalogName),
                 "Catalog name cannot be null or empty.");
-        checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null");
+        checkNotNull(catalogUpdater, "Catalog configuration updater cannot be 
null.");
+
         CatalogStore catalogStore = catalogStoreHolder.catalogStore();
         Optional<CatalogDescriptor> oldCatalogDescriptor = 
getCatalogDescriptor(catalogName);
+
         if (catalogStore.contains(catalogName) && 
oldCatalogDescriptor.isPresent()) {
             Configuration conf = oldCatalogDescriptor.get().getConfiguration();
-            conf.addAll(catalogDescriptor.getConfiguration());
+            catalogUpdater.accept(conf);
             CatalogDescriptor newCatalogDescriptor = 
CatalogDescriptor.of(catalogName, conf);
             Catalog newCatalog = initCatalog(catalogName, 
newCatalogDescriptor);
             catalogStore.removeCatalog(catalogName, false);
@@ -353,7 +355,7 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
             catalogStoreHolder.catalogStore().storeCatalog(catalogName, 
newCatalogDescriptor);
         } else {
             throw new CatalogException(
-                    format("Catalog %s not exists in the catalog store.", 
catalogName));
+                    String.format("Catalog %s does not exist in the catalog 
store.", catalogName));
         }
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java
index dd4d1433505..e523aa49eb6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableResultImpl;
 import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.CatalogDescriptor;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 
 import java.util.Collections;
@@ -70,8 +69,7 @@ public class AlterCatalogOptionsOperation implements 
AlterOperation {
         try {
             ctx.getCatalogManager()
                     .alterCatalog(
-                            catalogName,
-                            CatalogDescriptor.of(catalogName, 
Configuration.fromMap(properties)));
+                            catalogName, conf -> 
conf.addAll(Configuration.fromMap(properties)));
 
             return TableResultImpl.TABLE_RESULT_OK;
         } catch (CatalogException e) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java
similarity index 65%
copy from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java
copy to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java
index dd4d1433505..b68387def74 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java
@@ -19,36 +19,34 @@
 package org.apache.flink.table.operations.ddl;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableResultImpl;
 import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.CatalogDescriptor;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 
 import java.util.Collections;
-import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** Operation to describe a ALTER CATALOG SET statement. */
+/** Operation to describe an ALTER CATALOG RESET statement. */
 @Internal
-public class AlterCatalogOptionsOperation implements AlterOperation {
+public class AlterCatalogResetOperation implements AlterOperation {
     private final String catalogName;
-    private final Map<String, String> properties;
+    private final Set<String> resetKeys;
 
-    public AlterCatalogOptionsOperation(String catalogName, Map<String, 
String> properties) {
+    public AlterCatalogResetOperation(String catalogName, Set<String> 
resetKeys) {
         this.catalogName = checkNotNull(catalogName);
-        this.properties = 
Collections.unmodifiableMap(checkNotNull(properties));
+        this.resetKeys = Collections.unmodifiableSet(checkNotNull(resetKeys));
     }
 
     public String getCatalogName() {
         return catalogName;
     }
 
-    public Map<String, String> getProperties() {
-        return properties;
+    public Set<String> getResetKeys() {
+        return resetKeys;
     }
 
     @Override
@@ -56,12 +54,8 @@ public class AlterCatalogOptionsOperation implements 
AlterOperation {
         return String.format(
                 "ALTER CATALOG %s\n%s",
                 catalogName,
-                properties.entrySet().stream()
-                        .map(
-                                entry ->
-                                        String.format(
-                                                "  SET '%s' = '%s'",
-                                                entry.getKey(), 
entry.getValue()))
+                resetKeys.stream()
+                        .map(key -> String.format("  RESET '%s'", key))
                         .collect(Collectors.joining(",\n")));
     }
 
@@ -69,9 +63,7 @@ public class AlterCatalogOptionsOperation implements 
AlterOperation {
     public TableResultInternal execute(Context ctx) {
         try {
             ctx.getCatalogManager()
-                    .alterCatalog(
-                            catalogName,
-                            CatalogDescriptor.of(catalogName, 
Configuration.fromMap(properties)));
+                    .alterCatalog(catalogName, conf -> 
resetKeys.forEach(conf::removeKey));
 
             return TableResultImpl.TABLE_RESULT_OK;
         } catch (CatalogException e) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java
new file mode 100644
index 00000000000..c352d43785b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation;
+
+import java.util.Set;
+
+/** A converter for {@link SqlAlterCatalogReset}. */
+public class SqlAlterCatalogResetConverter implements 
SqlNodeConverter<SqlAlterCatalogReset> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlAlterCatalogReset sqlAlterCatalogReset, ConvertContext context) 
{
+        String type = CommonCatalogOptions.CATALOG_TYPE.key();
+        Set<String> resetKeys = sqlAlterCatalogReset.getResetKeys();
+        if (resetKeys.isEmpty() || resetKeys.contains(type)) {
+            String exMsg =
+                    resetKeys.isEmpty()
+                            ? "ALTER CATALOG RESET does not support empty key"
+                            : String.format(
+                                    "ALTER CATALOG RESET does not support 
changing '%s'", type);
+            throw new ValidationException(exMsg);
+        }
+        return new AlterCatalogResetOperation(
+                sqlAlterCatalogReset.catalogName(), 
sqlAlterCatalogReset.getResetKeys());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index b3dca807899..5c6fbc8a220 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -40,6 +40,7 @@ public class SqlNodeConverters {
         // register all the converters here
         register(new SqlCreateCatalogConverter());
         register(new SqlAlterCatalogOptionsConverter());
+        register(new SqlAlterCatalogResetConverter());
         register(new SqlCreateViewConverter());
         register(new SqlAlterViewRenameConverter());
         register(new SqlAlterViewPropertiesConverter());
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 27139d1de7d..b10d9792bd3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -53,6 +53,7 @@ import org.apache.flink.table.operations.SinkModifyOperation;
 import org.apache.flink.table.operations.SourceQueryOperation;
 import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogOptionsOperation;
+import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
@@ -86,6 +87,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -124,6 +126,25 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
                         "cat2",
                         "ALTER CATALOG cat2\n  SET 'K1' = 'V1',\n  SET 'k2' = 
'v2_new'",
                         expectedOptions);
+
+        // test alter catalog reset
+        final Set<String> expectedResetKeys = Collections.singleton("K1");
+
+        operation = parse("ALTER CATALOG cat2 RESET ('K1')");
+        assertThat(operation)
+                .isInstanceOf(AlterCatalogResetOperation.class)
+                
.asInstanceOf(InstanceOfAssertFactories.type(AlterCatalogResetOperation.class))
+                .extracting(
+                        AlterCatalogResetOperation::getCatalogName,
+                        AlterCatalogResetOperation::asSummaryString,
+                        AlterCatalogResetOperation::getResetKeys)
+                .containsExactly("cat2", "ALTER CATALOG cat2\n  RESET 'K1'", 
expectedResetKeys);
+        assertThatThrownBy(() -> parse("ALTER CATALOG cat2 RESET ('type')"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("ALTER CATALOG RESET does not support 
changing 'type'");
+        assertThatThrownBy(() -> parse("ALTER CATALOG cat2 RESET ()"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("ALTER CATALOG RESET does not support 
empty key");
     }
 
     @Test


Reply via email to