This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4044b9ea42a [FLINK-34917][table] Support `CREATE CATALOG IF NOT EXISTS` with comment 4044b9ea42a is described below commit 4044b9ea42aa6cf4a638b4ed46219fed94ce84bf Author: Yubin Li <lixin58...@163.com> AuthorDate: Fri Jun 14 20:47:22 2024 +0800 [FLINK-34917][table] Support `CREATE CATALOG IF NOT EXISTS` with comment This closes #24934 --- docs/content.zh/docs/dev/table/sql/create.md | 15 ++++-- docs/content/docs/dev/table/sql/create.md | 7 ++- .../src/test/resources/sql/catalog_database.q | 50 ++++++++++++++++++ .../service/session/SessionManagerImplTest.java | 2 +- .../src/test/resources/sql/catalog_database.q | 59 ++++++++++++++++++++++ .../src/main/codegen/includes/parserImpls.ftl | 16 ++++-- .../flink/sql/parser/ddl/SqlCreateCatalog.java | 33 ++++++++++-- .../flink/sql/parser/FlinkSqlParserImplTest.java | 21 ++++++-- .../flink/table/api/internal/ShowCreateUtil.java | 6 ++- .../apache/flink/table/catalog/CatalogManager.java | 34 ++++++++----- .../table/operations/DescribeCatalogOperation.java | 4 +- .../operations/ddl/CreateCatalogOperation.java | 24 ++++++++- .../flink/table/catalog/CatalogManagerTest.java | 33 ++++++++++-- .../flink/table/catalog/CatalogDescriptor.java | 23 ++++++++- .../converters/SqlCreateCatalogConverter.java | 12 ++++- 15 files changed, 298 insertions(+), 41 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/create.md b/docs/content.zh/docs/dev/table/sql/create.md index f6c8674b6c8..69467cd4042 100644 --- a/docs/content.zh/docs/dev/table/sql/create.md +++ b/docs/content.zh/docs/dev/table/sql/create.md @@ -628,18 +628,23 @@ INSERT INTO my_rtas_table SELECT id, name, age FROM source_table WHERE mod(id, 1 ## CREATE CATALOG ```sql -CREATE CATALOG catalog_name +CREATE CATALOG [IF NOT EXISTS] catalog_name + [COMMENT catalog_comment] WITH (key1=val1, key2=val2, ...) ``` -Create a catalog with the given catalog properties. If a catalog with the same name already exists, an exception is thrown. +根据给定的属性创建 catalog。若已存在同名 catalog,会抛出异常。 + +**IF NOT EXISTS** + +若 catalog 已经存在,则不会进行任何操作。 **WITH OPTIONS** -Catalog properties used to store extra information related to this catalog. -The key and value of expression `key1=val1` should both be string literal. +catalog 属性一般用于存储关于这个 catalog 的额外的信息。 +表达式 `key1=val1` 中的键和值都需要是字符串文本常量。 -Check out more details at [Catalogs]({{< ref "docs/dev/table/catalogs" >}}). +详情见 [Catalogs]({{< ref "docs/dev/table/catalogs" >}})。 {{< top >}} diff --git a/docs/content/docs/dev/table/sql/create.md b/docs/content/docs/dev/table/sql/create.md index 3d94b111c53..136468190e4 100644 --- a/docs/content/docs/dev/table/sql/create.md +++ b/docs/content/docs/dev/table/sql/create.md @@ -629,12 +629,17 @@ If you want to enable atomicity for RTAS, then you should make sure: ## CREATE CATALOG ```sql -CREATE CATALOG catalog_name +CREATE CATALOG [IF NOT EXISTS] catalog_name + [COMMENT catalog_comment] WITH (key1=val1, key2=val2, ...) ``` Create a catalog with the given catalog properties. If a catalog with the same name already exists, an exception is thrown. +**IF NOT EXISTS** + +If the catalog already exists, nothing happens. + **WITH OPTIONS** Catalog properties used to store extra information related to this catalog. diff --git a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q index b99af7344f9..a2ae9a2fe1a 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q @@ -35,6 +35,7 @@ org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 1, colu Was expecting one of: <EOF> "WITH" ... + "COMMENT" ... ";" ... !error @@ -92,6 +93,55 @@ drop catalog c1; org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a catalog which is currently in use. !error +create catalog cat_comment comment 'hello ''catalog''' WITH ('type'='generic_in_memory', 'default-database'='db'); +[INFO] Execute statement succeeded. +!info + +show create catalog cat_comment; ++--------------------------------------------------------------------------------------------------------------------------------+ +| result | ++--------------------------------------------------------------------------------------------------------------------------------+ +| CREATE CATALOG `cat_comment` COMMENT 'hello ''catalog''' WITH ( + 'default-database' = 'db', + 'type' = 'generic_in_memory' +) + | ++--------------------------------------------------------------------------------------------------------------------------------+ +1 row in set +!ok + +describe catalog cat_comment; ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat_comment | +| type | generic_in_memory | +| comment | hello 'catalog' | ++-----------+-------------------+ +3 rows in set +!ok + +describe catalog extended cat_comment; ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat_comment | +| type | generic_in_memory | +| comment | hello 'catalog' | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +create catalog if not exists cat_comment comment 'hello' with ('type' = 'generic_in_memory'); +[INFO] Execute statement succeeded. +!info + +create catalog cat_comment comment 'hello2' with ('type' = 'generic_in_memory'); +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.catalog.exceptions.CatalogException: Catalog cat_comment already exists. +!error + create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); [INFO] Execute statement succeeded. !info diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerImplTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerImplTest.java index 32fa747894d..be580c2729f 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerImplTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerImplTest.java @@ -155,7 +155,7 @@ class SessionManagerImplTest { "cat2", CatalogDescriptor.of("cat2", configuration))) .isInstanceOf(CatalogException.class) - .hasMessageContaining("Catalog cat2 already exists in catalog store."); + .hasMessageContaining("Catalog cat2 already exists."); sessionManager.stop(); } diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q index cd7658ec0fc..687330e4a50 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q @@ -35,6 +35,7 @@ org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 1, colu Was expecting one of: <EOF> "WITH" ... + "COMMENT" ... ";" ... !error @@ -108,6 +109,64 @@ drop catalog default_catalog; 1 row in set !ok +create catalog cat_comment comment 'hello ''catalog''' WITH ('type'='generic_in_memory', 'default-database'='db'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +show create catalog cat_comment; +!output +CREATE CATALOG `cat_comment` COMMENT 'hello ''catalog''' WITH ( + 'default-database' = 'db', + 'type' = 'generic_in_memory' +) +!ok + +describe catalog cat_comment; +!output ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat_comment | +| type | generic_in_memory | +| comment | hello 'catalog' | ++-----------+-------------------+ +3 rows in set +!ok + +describe catalog extended cat_comment; +!output ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat_comment | +| type | generic_in_memory | +| comment | hello 'catalog' | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +create catalog if not exists cat_comment comment 'hello' with ('type' = 'generic_in_memory'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +create catalog cat_comment comment 'hello2' with ('type' = 'generic_in_memory'); +!output +org.apache.flink.table.catalog.exceptions.CatalogException: Catalog cat_comment already exists. +!error + create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); !output +--------+ diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index b7e3db3942e..b4ea9664c4d 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -108,17 +108,26 @@ SqlUseCatalog SqlUseCatalog() : /** * Parses a create catalog statement. -* CREATE CATALOG catalog_name [WITH (property_name=property_value, ...)]; +* CREATE CATALOG [IF NOT EXISTS] catalog_name [COMMENT 'comment_value'] [WITH (property_name=property_value, ...)]; */ SqlCreate SqlCreateCatalog(Span s, boolean replace) : { SqlParserPos startPos; SqlIdentifier catalogName; SqlNodeList propertyList = SqlNodeList.EMPTY; + SqlNode comment = null; + boolean ifNotExists = false; } { <CATALOG> { startPos = getPos(); } + + ifNotExists = IfNotExistsOpt() + catalogName = SimpleIdentifier() + [ + <COMMENT> + comment = StringLiteral() + ] [ <WITH> propertyList = Properties() @@ -126,7 +135,9 @@ SqlCreate SqlCreateCatalog(Span s, boolean replace) : { return new SqlCreateCatalog(startPos.plus(getPos()), catalogName, - propertyList); + propertyList, + comment, + ifNotExists); } } @@ -149,7 +160,6 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) : /** * Parses an alter catalog statement. -* ALTER CATALOG catalog_name SET (key1=val1, key2=val2, ...); */ SqlAlterCatalog SqlAlterCatalog() : { diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java index c80facc9ebc..d95427b9fdc 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java @@ -31,7 +31,10 @@ import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.util.ImmutableNullableList; +import javax.annotation.Nullable; + import java.util.List; +import java.util.Optional; import static java.util.Objects.requireNonNull; @@ -45,11 +48,18 @@ public class SqlCreateCatalog extends SqlCreate { private final SqlNodeList propertyList; + @Nullable private final SqlNode comment; + public SqlCreateCatalog( - SqlParserPos position, SqlIdentifier catalogName, SqlNodeList propertyList) { - super(OPERATOR, position, false, false); + SqlParserPos position, + SqlIdentifier catalogName, + SqlNodeList propertyList, + @Nullable SqlNode comment, + boolean ifNotExists) { + super(OPERATOR, position, false, ifNotExists); this.catalogName = requireNonNull(catalogName, "catalogName cannot be null"); this.propertyList = requireNonNull(propertyList, "propertyList cannot be null"); + this.comment = comment; } @Override @@ -59,7 +69,7 @@ public class SqlCreateCatalog extends SqlCreate { @Override public List<SqlNode> getOperandList() { - return ImmutableNullableList.of(catalogName, propertyList); + return ImmutableNullableList.of(catalogName, propertyList, comment); } public SqlIdentifier getCatalogName() { @@ -70,11 +80,28 @@ public class SqlCreateCatalog extends SqlCreate { return propertyList; } + public Optional<SqlNode> getComment() { + return Optional.ofNullable(comment); + } + + public boolean isIfNotExists() { + return ifNotExists; + } + @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("CREATE CATALOG"); + if (isIfNotExists()) { + writer.keyword("IF NOT EXISTS"); + } catalogName.unparse(writer, leftPrec, rightPrec); + if (comment != null) { + writer.newlineAndIndent(); + writer.keyword("COMMENT"); + comment.unparse(writer, leftPrec, rightPrec); + } + if (this.propertyList.size() > 0) { writer.keyword("WITH"); SqlWriter.Frame withFrame = writer.startList("(", ")"); diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 1cb44ce42c4..8f75d9f0ad6 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -26,12 +26,15 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParserFixture; import org.apache.calcite.sql.parser.SqlParserTest; +import org.apache.commons.lang3.StringUtils; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -121,15 +124,25 @@ class FlinkSqlParserImplTest extends SqlParserTest { sql("use catalog a").ok("USE CATALOG `A`"); } - @Test - void testCreateCatalog() { - sql("create catalog c1\n" + @ParameterizedTest + @CsvSource({"true,true", "true,false", "false,true", "false,false"}) + void testCreateCatalog(boolean ifNotExists, boolean comment) { + String ifNotExistsClause = ifNotExists ? "if not exists " : ""; + String commentClause = comment ? "\ncomment 'HELLO' " : " "; + + sql("create catalog " + + ifNotExistsClause + + "c1" + + commentClause + " WITH (\n" + " 'key1'='value1',\n" + " 'key2'='value2'\n" + " )\n") .ok( - "CREATE CATALOG `C1` " + "CREATE CATALOG " + + StringUtils.upperCase(ifNotExistsClause) + + "`C1`" + + StringUtils.upperCase(commentClause) + "WITH (\n" + " 'key1' = 'value1',\n" + " 'key2' = 'value2'\n" diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java index c11aca26046..b3a5d8e6f02 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java @@ -117,9 +117,13 @@ public class ShowCreateUtil { public static String buildShowCreateCatalogRow(CatalogDescriptor catalogDescriptor) { final String printIndent = " "; + final String comment = catalogDescriptor.getComment().orElse(null); return String.format( - "CREATE CATALOG %s WITH (%s%s%s)%s", + "CREATE CATALOG %s %sWITH (%s%s%s)%s", escapeIdentifier(catalogDescriptor.getCatalogName()), + StringUtils.isNotEmpty(comment) + ? String.format("COMMENT '%s' ", EncodingUtils.escapeSingleQuotes(comment)) + : "", System.lineSeparator(), extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(), printIndent) .orElse(""), diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index fc16f7ae057..f97a625a476 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -295,31 +295,39 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { * * @param catalogName the given catalog name under which to create the given catalog * @param catalogDescriptor catalog descriptor for creating catalog + * @param ignoreIfExists if false exception will be thrown if a catalog exists. * @throws CatalogException If the catalog already exists in the catalog store or initialized * catalogs, or if an error occurs while creating the catalog or storing the {@link * CatalogDescriptor} */ - public void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor) + public void createCatalog( + String catalogName, CatalogDescriptor catalogDescriptor, boolean ignoreIfExists) throws CatalogException { checkArgument( !StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null"); - if (catalogStoreHolder.catalogStore().contains(catalogName)) { - throw new CatalogException( - format("Catalog %s already exists in catalog store.", catalogName)); - } - if (catalogs.containsKey(catalogName)) { - throw new CatalogException( - format("Catalog %s already exists in initialized catalogs.", catalogName)); - } + boolean catalogExistsInStore = catalogStoreHolder.catalogStore().contains(catalogName); + boolean catalogExistsInMemory = catalogs.containsKey(catalogName); - Catalog catalog = initCatalog(catalogName, catalogDescriptor); - catalog.open(); - catalogs.put(catalogName, catalog); + if (catalogExistsInStore || catalogExistsInMemory) { + if (!ignoreIfExists) { + throw new CatalogException(format("Catalog %s already exists.", catalogName)); + } + } else { + // Store the catalog in the catalog store + catalogStoreHolder.catalogStore().storeCatalog(catalogName, catalogDescriptor); + + // Initialize and store the catalog in memory + Catalog catalog = initCatalog(catalogName, catalogDescriptor); + catalog.open(); + catalogs.put(catalogName, catalog); + } + } - catalogStoreHolder.catalogStore().storeCatalog(catalogName, catalogDescriptor); + public void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor) { + createCatalog(catalogName, catalogDescriptor, false); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java index d3910a673cc..08a97d10bec 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java @@ -84,8 +84,8 @@ public class DescribeCatalogOperation implements Operation, ExecutableOperation "type", properties.getOrDefault( CommonCatalogOptions.CATALOG_TYPE.key(), "")), - // TODO: Show the catalog comment until FLINK-34918 is resolved - Arrays.asList("comment", ""))); + Arrays.asList( + "comment", catalogDescriptor.getComment().orElse(null)))); if (isExtended) { properties.entrySet().stream() .filter( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java index 087386c7521..cdacafadefc 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java @@ -28,6 +28,8 @@ import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.OperationUtils; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -39,10 +41,18 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class CreateCatalogOperation implements CreateOperation { private final String catalogName; private final Map<String, String> properties; + @Nullable private final String comment; + private final boolean ignoreIfExists; - public CreateCatalogOperation(String catalogName, Map<String, String> properties) { + public CreateCatalogOperation( + String catalogName, + Map<String, String> properties, + @Nullable String comment, + boolean ignoreIfExists) { this.catalogName = checkNotNull(catalogName); this.properties = Collections.unmodifiableMap(checkNotNull(properties)); + this.comment = comment; + this.ignoreIfExists = ignoreIfExists; } public String getCatalogName() { @@ -53,11 +63,19 @@ public class CreateCatalogOperation implements CreateOperation { return properties; } + public boolean isIgnoreIfExists() { + return ignoreIfExists; + } + @Override public String asSummaryString() { Map<String, Object> params = new LinkedHashMap<>(); params.put("catalogName", catalogName); params.put("properties", properties); + if (comment != null) { + params.put("comment", comment); + } + params.put("ignoreIfExists", ignoreIfExists); return OperationUtils.formatWithChildren( "CREATE CATALOG", params, Collections.emptyList(), Operation::asSummaryString); @@ -69,7 +87,9 @@ public class CreateCatalogOperation implements CreateOperation { ctx.getCatalogManager() .createCatalog( catalogName, - CatalogDescriptor.of(catalogName, Configuration.fromMap(properties))); + CatalogDescriptor.of( + catalogName, Configuration.fromMap(properties), comment), + ignoreIfExists); return TableResultImpl.TABLE_RESULT_OK; } catch (CatalogException e) { diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java index b522ac1265b..632387a0ebc 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -367,14 +368,39 @@ class CatalogManagerTest { catalogManager.createCatalog("cat1", CatalogDescriptor.of("cat1", configuration)); catalogManager.createCatalog("cat2", CatalogDescriptor.of("cat2", configuration)); catalogManager.createCatalog("cat3", CatalogDescriptor.of("cat3", configuration)); + catalogManager.createCatalog( + "cat_comment", + CatalogDescriptor.of("cat_comment", configuration.clone(), "comment for catalog")); + catalogManager.createCatalog( + "cat_comment", + CatalogDescriptor.of( + "cat_comment", configuration.clone(), "second comment for catalog"), + true); + assertThatThrownBy( + () -> + catalogManager.createCatalog( + "cat_comment", + CatalogDescriptor.of( + "cat_comment", + configuration.clone(), + "third comment for catalog"), + false)) + .isInstanceOf(CatalogException.class) + .hasMessage("Catalog cat_comment already exists."); assertTrue(catalogManager.getCatalog("cat1").isPresent()); assertTrue(catalogManager.getCatalog("cat2").isPresent()); assertTrue(catalogManager.getCatalog("cat3").isPresent()); + assertTrue(catalogManager.getCatalog("cat_comment").isPresent()); + assertTrue(catalogManager.getCatalogDescriptor("cat_comment").isPresent()); + assertEquals( + "comment for catalog", + catalogManager.getCatalogDescriptor("cat_comment").get().getComment().get()); assertTrue(catalogManager.listCatalogs().contains("cat1")); assertTrue(catalogManager.listCatalogs().contains("cat2")); assertTrue(catalogManager.listCatalogs().contains("cat3")); + assertTrue(catalogManager.listCatalogs().contains("cat_comment")); catalogManager.registerCatalog("cat4", new GenericInMemoryCatalog("cat4")); @@ -383,14 +409,14 @@ class CatalogManagerTest { catalogManager.createCatalog( "cat1", CatalogDescriptor.of("cat1", configuration))) .isInstanceOf(CatalogException.class) - .hasMessageContaining("Catalog cat1 already exists in catalog store."); + .hasMessageContaining("Catalog cat1 already exists."); assertThatThrownBy( () -> catalogManager.createCatalog( "cat4", CatalogDescriptor.of("cat4", configuration))) .isInstanceOf(CatalogException.class) - .hasMessageContaining("Catalog cat4 already exists in initialized catalogs."); + .hasMessageContaining("Catalog cat4 already exists."); catalogManager.createDatabase( "exist_cat", @@ -419,7 +445,8 @@ class CatalogManagerTest { "cat3", "cat4", "default_catalog", - "exist_cat"))); + "exist_cat", + "cat_comment"))); catalogManager.setCurrentDatabase("cat_db"); assertThat(catalogManager.listTables()).isEqualTo(Collections.singleton("test_table")); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java index 4daf5d1410e..f984203672d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java @@ -21,6 +21,10 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; +import javax.annotation.Nullable; + +import java.util.Optional; + /** * Describes a {@link Catalog} with the catalog name and configuration. * @@ -40,6 +44,9 @@ public class CatalogDescriptor { /* The configuration used to discover and construct the catalog. */ private final Configuration configuration; + /* Catalog comment. */ + @Nullable private final String comment; + public String getCatalogName() { return catalogName; } @@ -48,9 +55,15 @@ public class CatalogDescriptor { return configuration; } - private CatalogDescriptor(String catalogName, Configuration configuration) { + public Optional<String> getComment() { + return Optional.ofNullable(comment); + } + + private CatalogDescriptor( + String catalogName, Configuration configuration, @Nullable String comment) { this.catalogName = catalogName; this.configuration = configuration; + this.comment = comment; } /** @@ -58,8 +71,14 @@ public class CatalogDescriptor { * * @param catalogName the name of the catalog * @param configuration the configuration of the catalog + * @param comment the comment of the catalog */ + public static CatalogDescriptor of( + String catalogName, Configuration configuration, String comment) { + return new CatalogDescriptor(catalogName, configuration, comment); + } + public static CatalogDescriptor of(String catalogName, Configuration configuration) { - return new CatalogDescriptor(catalogName, configuration); + return new CatalogDescriptor(catalogName, configuration, null); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java index 8d2c4c4731e..ea65f8f4ed8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java @@ -23,6 +23,9 @@ import org.apache.flink.sql.parser.ddl.SqlTableOption; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateCatalogOperation; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.util.NlsString; + import java.util.HashMap; import java.util.Map; @@ -41,6 +44,13 @@ public class SqlCreateCatalogConverter implements SqlNodeConverter<SqlCreateCata ((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString())); - return new CreateCatalogOperation(node.catalogName(), properties); + return new CreateCatalogOperation( + node.catalogName(), + properties, + node.getComment() + .map(SqlCharStringLiteral.class::cast) + .map(c -> c.getValueAs(NlsString.class).getValue()) + .orElse(null), + node.isIfNotExists()); } }