This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9d169038784 [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax 9d169038784 is described below commit 9d1690387849303b27050bb0cefaa1bad6e3fb98 Author: Yubin Li <lixin58...@163.com> AuthorDate: Thu Jun 13 19:19:47 2024 +0800 [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax This closes #24763 --- docs/content.zh/docs/dev/table/sql/alter.md | 20 +- docs/content/docs/dev/table/sql/alter.md | 20 +- .../src/test/resources/sql/catalog_database.q | 187 ++++++++++-------- .../src/test/resources/sql/catalog_database.q | 218 ++++++++++++--------- .../src/main/codegen/data/Parser.tdd | 1 + .../src/main/codegen/includes/parserImpls.ftl | 24 ++- .../flink/sql/parser/ddl/SqlAlterCatalogReset.java | 76 +++++++ .../flink/sql/parser/FlinkSqlParserImplTest.java | 3 +- .../apache/flink/table/catalog/CatalogManager.java | 12 +- .../ddl/AlterCatalogOptionsOperation.java | 4 +- ...ration.java => AlterCatalogResetOperation.java} | 30 ++- .../converters/SqlAlterCatalogResetConverter.java | 48 +++++ .../operations/converters/SqlNodeConverters.java | 1 + .../operations/SqlDdlToOperationConverterTest.java | 21 ++ 14 files changed, 448 insertions(+), 217 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/alter.md b/docs/content.zh/docs/dev/table/sql/alter.md index 808ee893023..39de6985c9d 100644 --- a/docs/content.zh/docs/dev/table/sql/alter.md +++ b/docs/content.zh/docs/dev/table/sql/alter.md @@ -28,7 +28,7 @@ under the License. -ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 中注册的表、视图或函数定义。 +ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 中注册的表、视图或函数定义,或 catalog 本身的定义。 Flink SQL 目前支持以下 ALTER 语句: @@ -36,6 +36,7 @@ Flink SQL 目前支持以下 ALTER 语句: - ALTER VIEW - ALTER DATABASE - ALTER FUNCTION +- ALTER CATALOG ## 执行 ALTER 语句 @@ -538,10 +539,14 @@ ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA,SCALA 和 PYTHON,且函数的默认语言为 JAVA。 +{{< top >}} + ## ALTER CATALOG ```sql -ALTER CATALOG catalog_name SET (key1=val1, ...) +ALTER CATALOG catalog_name + SET (key1=val1, ...) + | RESET (key1, ...) ``` ### SET @@ -555,4 +560,15 @@ ALTER CATALOG catalog_name SET (key1=val1, ...) ALTER CATALOG cat2 SET ('default-database'='db'); ``` +### RESET + +为指定的 catalog 重置一个或多个属性。 + +`RESET` 语句示例如下。 + +```sql +-- reset 'default-database' +ALTER CATALOG cat2 RESET ('default-database'); +``` + {{< top >}} diff --git a/docs/content/docs/dev/table/sql/alter.md b/docs/content/docs/dev/table/sql/alter.md index 54fdf8f15b0..e842d137ce0 100644 --- a/docs/content/docs/dev/table/sql/alter.md +++ b/docs/content/docs/dev/table/sql/alter.md @@ -28,7 +28,7 @@ under the License. -ALTER statements are used to modified a registered table/view/function definition in the [Catalog]({{< ref "docs/dev/table/catalogs" >}}). +ALTER statements are used to modify the definition of a table, view or function that has already been registered in the [Catalog]({{< ref "docs/dev/table/catalogs" >}}), or the definition of a catalog itself. Flink SQL supports the following ALTER statements for now: @@ -36,6 +36,7 @@ Flink SQL supports the following ALTER statements for now: - ALTER VIEW - ALTER DATABASE - ALTER FUNCTION +- ALTER CATALOG ## Run an ALTER statement @@ -540,10 +541,14 @@ If the function doesn't exist, nothing happens. Language tag to instruct flink runtime how to execute the function. Currently only JAVA, SCALA and PYTHON are supported, the default language for a function is JAVA. +{{< top >}} + ## ALTER CATALOG ```sql -ALTER CATALOG catalog_name SET (key1=val1, ...) +ALTER CATALOG catalog_name + SET (key1=val1, ...) + | RESET (key1, ...) ``` ### SET @@ -557,4 +562,15 @@ The following examples illustrate the usage of the `SET` statements. ALTER CATALOG cat2 SET ('default-database'='db'); ``` +### RESET + +Reset one or more properties to its default value in the specified catalog. + +The following examples illustrate the usage of the `RESET` statements. + +```sql +-- reset 'default-database' +ALTER CATALOG cat2 RESET ('default-database'); +``` + {{< top >}} diff --git a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q index c9a5b02116a..b99af7344f9 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q @@ -92,6 +92,110 @@ drop catalog c1; org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a catalog which is currently in use. !error +create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); +[INFO] Execute statement succeeded. +!info + +show create catalog cat2; ++---------------------------------------------------------------------------------------------+ +| result | ++---------------------------------------------------------------------------------------------+ +| CREATE CATALOG `cat2` WITH ( + 'default-database' = 'db', + 'type' = 'generic_in_memory' +) + | ++---------------------------------------------------------------------------------------------+ +1 row in set +!ok + +describe catalog cat2; ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +describe catalog extended cat2; ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +desc catalog cat2; ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +desc catalog extended cat2; ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 set ('default-database'='db_new'); +[INFO] Execute statement succeeded. +!info + +desc catalog extended cat2; ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db_new | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 reset ('default-database', 'k1'); +[INFO] Execute statement succeeded. +!info + +desc catalog extended cat2; ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +alter catalog cat2 reset ('type'); +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type' +!error + +alter catalog cat2 reset (); +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key +!error + # ========================================================================== # test database # ========================================================================== @@ -686,86 +790,3 @@ show tables from db1 like 'p_r%'; +------------+ 1 row in set !ok - -# ========================================================================== -# test catalog -# ========================================================================== - -create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); -[INFO] Execute statement succeeded. -!info - -show create catalog cat2; -+---------------------------------------------------------------------------------------------+ -| result | -+---------------------------------------------------------------------------------------------+ -| CREATE CATALOG `cat2` WITH ( - 'default-database' = 'db', - 'type' = 'generic_in_memory' -) - | -+---------------------------------------------------------------------------------------------+ -1 row in set -!ok - -describe catalog cat2; -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -describe catalog extended cat2; -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -desc catalog cat2; -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -desc catalog extended cat2; -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -alter catalog cat2 set ('default-database'='db_new'); -[INFO] Execute statement succeeded. -!info - -desc catalog extended cat2; -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db_new | -+-------------------------+-------------------+ -4 rows in set -!ok diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q index 063edbabd23..cd7658ec0fc 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q @@ -108,6 +108,129 @@ drop catalog default_catalog; 1 row in set !ok +create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +show create catalog cat2; +!output +CREATE CATALOG `cat2` WITH ( + 'default-database' = 'db', + 'type' = 'generic_in_memory' +) +!ok + +describe catalog cat2; +!output ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +describe catalog extended cat2; +!output ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +desc catalog cat2; +!output ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +desc catalog extended cat2; +!output ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 set ('default-database'='db_new'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +desc catalog extended cat2; +!output ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db_new | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 reset ('default-database', 'k1'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +desc catalog extended cat2; +!output ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +alter catalog cat2 reset ('type'); +!output +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type' +!error + +alter catalog cat2 reset (); +!output +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key +!error + # ========================================================================== # test database # ========================================================================== @@ -816,98 +939,3 @@ show tables from db1 like 'p_r%'; +------------+ 1 row in set !ok - -# ========================================================================== -# test catalog -# ========================================================================== - -create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); -!output -+--------+ -| result | -+--------+ -| OK | -+--------+ -1 row in set -!ok - -show create catalog cat2; -!output -CREATE CATALOG `cat2` WITH ( - 'default-database' = 'db', - 'type' = 'generic_in_memory' -) -!ok - -describe catalog cat2; -!output -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -describe catalog extended cat2; -!output -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -desc catalog cat2; -!output -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -desc catalog extended cat2; -!output -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -alter catalog cat2 set ('default-database'='db_new'); -!output -+--------+ -| result | -+--------+ -| OK | -+--------+ -1 row in set -!ok - -desc catalog extended cat2; -!output -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db_new | -+-------------------------+-------------------+ -4 rows in set -!ok diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index b9215460555..6f1365951f4 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -35,6 +35,7 @@ "org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext" "org.apache.flink.sql.parser.ddl.SqlAlterCatalog" "org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions" + "org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset" "org.apache.flink.sql.parser.ddl.SqlAlterDatabase" "org.apache.flink.sql.parser.ddl.SqlAlterFunction" "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 6d524422625..4d7214325d4 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -160,13 +160,23 @@ SqlAlterCatalog SqlAlterCatalog() : { <ALTER> <CATALOG> { startPos = getPos(); } catalogName = SimpleIdentifier() - <SET> - propertyList = Properties() - { - return new SqlAlterCatalogOptions(startPos.plus(getPos()), - catalogName, - propertyList); - } + ( + <SET> + propertyList = Properties() + { + return new SqlAlterCatalogOptions(startPos.plus(getPos()), + catalogName, + propertyList); + } + | + <RESET> + propertyList = PropertyKeys() + { + return new SqlAlterCatalogReset(startPos.plus(getPos()), + catalogName, + propertyList); + } + ) } /** diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java new file mode 100644 index 00000000000..3f05a0e574d --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; +import org.apache.calcite.util.NlsString; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** ALTER CATALOG catalog_name RESET (key1, ...). */ +public class SqlAlterCatalogReset extends SqlAlterCatalog { + + private final SqlNodeList propertyKeyList; + + public SqlAlterCatalogReset( + SqlParserPos position, SqlIdentifier catalogName, SqlNodeList propertyKeyList) { + super(position, catalogName); + this.propertyKeyList = requireNonNull(propertyKeyList, "propertyKeyList cannot be null"); + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(catalogName, propertyKeyList); + } + + public SqlNodeList getPropertyList() { + return propertyKeyList; + } + + public Set<String> getResetKeys() { + return propertyKeyList.getList().stream() + .map(key -> ((NlsString) SqlLiteral.value(key)).getValue()) + .collect(Collectors.toSet()); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("RESET"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode property : propertyKeyList) { + SqlUnparseUtils.printIndent(writer); + property.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index a6c2bab9b80..8fe3c21a41f 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -67,8 +67,9 @@ class FlinkSqlParserImplTest extends SqlParserTest { @Test void testAlterCatalog() { - sql("alter catalog a set ('k1'='v1','k2'='v2')") + sql("alter catalog a set ('k1'='v1', 'k2'='v2')") .ok("ALTER CATALOG `A` SET (\n" + " 'k1' = 'v1',\n" + " 'k2' = 'v2'\n" + ")"); + sql("alter catalog a reset ('k1')").ok("ALTER CATALOG `A` RESET (\n" + " 'k1'\n" + ")"); } @Test diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 5e8826e6c3a..fc16f7ae057 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -326,22 +326,24 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { * Alters a catalog under the given name. The catalog name must be unique. * * @param catalogName the given catalog name under which to alter the given catalog - * @param catalogDescriptor catalog descriptor for altering catalog + * @param catalogUpdater catalog configuration updater to alter catalog * @throws CatalogException If the catalog neither exists in the catalog store nor in the * initialized catalogs, or if an error occurs while creating the catalog or storing the * {@link CatalogDescriptor} */ - public void alterCatalog(String catalogName, CatalogDescriptor catalogDescriptor) + public void alterCatalog(String catalogName, Consumer<Configuration> catalogUpdater) throws CatalogException { checkArgument( !StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); - checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null"); + checkNotNull(catalogUpdater, "Catalog configuration updater cannot be null."); + CatalogStore catalogStore = catalogStoreHolder.catalogStore(); Optional<CatalogDescriptor> oldCatalogDescriptor = getCatalogDescriptor(catalogName); + if (catalogStore.contains(catalogName) && oldCatalogDescriptor.isPresent()) { Configuration conf = oldCatalogDescriptor.get().getConfiguration(); - conf.addAll(catalogDescriptor.getConfiguration()); + catalogUpdater.accept(conf); CatalogDescriptor newCatalogDescriptor = CatalogDescriptor.of(catalogName, conf); Catalog newCatalog = initCatalog(catalogName, newCatalogDescriptor); catalogStore.removeCatalog(catalogName, false); @@ -353,7 +355,7 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { catalogStoreHolder.catalogStore().storeCatalog(catalogName, newCatalogDescriptor); } else { throw new CatalogException( - format("Catalog %s not exists in the catalog store.", catalogName)); + String.format("Catalog %s does not exist in the catalog store.", catalogName)); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java index dd4d1433505..e523aa49eb6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java @@ -23,7 +23,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.TableResultImpl; import org.apache.flink.table.api.internal.TableResultInternal; -import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.exceptions.CatalogException; import java.util.Collections; @@ -70,8 +69,7 @@ public class AlterCatalogOptionsOperation implements AlterOperation { try { ctx.getCatalogManager() .alterCatalog( - catalogName, - CatalogDescriptor.of(catalogName, Configuration.fromMap(properties))); + catalogName, conf -> conf.addAll(Configuration.fromMap(properties))); return TableResultImpl.TABLE_RESULT_OK; } catch (CatalogException e) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java similarity index 65% copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java index dd4d1433505..b68387def74 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java @@ -19,36 +19,34 @@ package org.apache.flink.table.operations.ddl; import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.TableResultImpl; import org.apache.flink.table.api.internal.TableResultInternal; -import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.exceptions.CatalogException; import java.util.Collections; -import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; -/** Operation to describe a ALTER CATALOG SET statement. */ +/** Operation to describe an ALTER CATALOG RESET statement. */ @Internal -public class AlterCatalogOptionsOperation implements AlterOperation { +public class AlterCatalogResetOperation implements AlterOperation { private final String catalogName; - private final Map<String, String> properties; + private final Set<String> resetKeys; - public AlterCatalogOptionsOperation(String catalogName, Map<String, String> properties) { + public AlterCatalogResetOperation(String catalogName, Set<String> resetKeys) { this.catalogName = checkNotNull(catalogName); - this.properties = Collections.unmodifiableMap(checkNotNull(properties)); + this.resetKeys = Collections.unmodifiableSet(checkNotNull(resetKeys)); } public String getCatalogName() { return catalogName; } - public Map<String, String> getProperties() { - return properties; + public Set<String> getResetKeys() { + return resetKeys; } @Override @@ -56,12 +54,8 @@ public class AlterCatalogOptionsOperation implements AlterOperation { return String.format( "ALTER CATALOG %s\n%s", catalogName, - properties.entrySet().stream() - .map( - entry -> - String.format( - " SET '%s' = '%s'", - entry.getKey(), entry.getValue())) + resetKeys.stream() + .map(key -> String.format(" RESET '%s'", key)) .collect(Collectors.joining(",\n"))); } @@ -69,9 +63,7 @@ public class AlterCatalogOptionsOperation implements AlterOperation { public TableResultInternal execute(Context ctx) { try { ctx.getCatalogManager() - .alterCatalog( - catalogName, - CatalogDescriptor.of(catalogName, Configuration.fromMap(properties))); + .alterCatalog(catalogName, conf -> resetKeys.forEach(conf::removeKey)); return TableResultImpl.TABLE_RESULT_OK; } catch (CatalogException e) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java new file mode 100644 index 00000000000..c352d43785b --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation; + +import java.util.Set; + +/** A converter for {@link SqlAlterCatalogReset}. */ +public class SqlAlterCatalogResetConverter implements SqlNodeConverter<SqlAlterCatalogReset> { + + @Override + public Operation convertSqlNode( + SqlAlterCatalogReset sqlAlterCatalogReset, ConvertContext context) { + String type = CommonCatalogOptions.CATALOG_TYPE.key(); + Set<String> resetKeys = sqlAlterCatalogReset.getResetKeys(); + if (resetKeys.isEmpty() || resetKeys.contains(type)) { + String exMsg = + resetKeys.isEmpty() + ? "ALTER CATALOG RESET does not support empty key" + : String.format( + "ALTER CATALOG RESET does not support changing '%s'", type); + throw new ValidationException(exMsg); + } + return new AlterCatalogResetOperation( + sqlAlterCatalogReset.catalogName(), sqlAlterCatalogReset.getResetKeys()); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index b3dca807899..5c6fbc8a220 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -40,6 +40,7 @@ public class SqlNodeConverters { // register all the converters here register(new SqlCreateCatalogConverter()); register(new SqlAlterCatalogOptionsConverter()); + register(new SqlAlterCatalogResetConverter()); register(new SqlCreateViewConverter()); register(new SqlAlterViewRenameConverter()); register(new SqlAlterViewPropertiesConverter()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index 27139d1de7d..b10d9792bd3 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -53,6 +53,7 @@ import org.apache.flink.table.operations.SinkModifyOperation; import org.apache.flink.table.operations.SourceQueryOperation; import org.apache.flink.table.operations.ddl.AddPartitionsOperation; import org.apache.flink.table.operations.ddl.AlterCatalogOptionsOperation; +import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; import org.apache.flink.table.operations.ddl.AlterTableRenameOperation; @@ -86,6 +87,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -124,6 +126,25 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion "cat2", "ALTER CATALOG cat2\n SET 'K1' = 'V1',\n SET 'k2' = 'v2_new'", expectedOptions); + + // test alter catalog reset + final Set<String> expectedResetKeys = Collections.singleton("K1"); + + operation = parse("ALTER CATALOG cat2 RESET ('K1')"); + assertThat(operation) + .isInstanceOf(AlterCatalogResetOperation.class) + .asInstanceOf(InstanceOfAssertFactories.type(AlterCatalogResetOperation.class)) + .extracting( + AlterCatalogResetOperation::getCatalogName, + AlterCatalogResetOperation::asSummaryString, + AlterCatalogResetOperation::getResetKeys) + .containsExactly("cat2", "ALTER CATALOG cat2\n RESET 'K1'", expectedResetKeys); + assertThatThrownBy(() -> parse("ALTER CATALOG cat2 RESET ('type')")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("ALTER CATALOG RESET does not support changing 'type'"); + assertThatThrownBy(() -> parse("ALTER CATALOG cat2 RESET ()")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("ALTER CATALOG RESET does not support empty key"); } @Test