LadyForest commented on code in PR #24934: URL: https://github.com/apache/flink/pull/24934#discussion_r1639160703
########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java: ########## @@ -39,10 +41,18 @@ public class CreateCatalogOperation implements CreateOperation { private final String catalogName; private final Map<String, String> properties; + private final @Nullable String comment; Review Comment: please keep a consistent annotation style ```suggestion @Nullable private final String comment; ``` ########## flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java: ########## @@ -45,11 +48,18 @@ public class SqlCreateCatalog extends SqlCreate { private final SqlNodeList propertyList; + private final @Nullable SqlNode comment; Review Comment: ```suggestion @Nullable private final SqlCharStringLiteral comment; ``` ########## flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java: ########## @@ -428,7 +428,7 @@ void testConcurrentSplitAssignmentForMultipleHosts() throws InterruptedException assertThat(ia.getNextInputSplit("testhost", 0)).isNull(); // at least one fraction of hosts needs be local, no matter how bad the thread races - assertThat(ia.getNumberOfRemoteAssignments()) + assertThat(ia.getNumberOfLocalAssignments()) Review Comment: Do not include commits that don't belong to this PR. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java: ########## @@ -85,6 +87,13 @@ public static List<TableChange> buildModifyColumnChange( } } + public static @Nullable String getCatalogComment(Optional<SqlNode> catalogComment) { + return catalogComment + .map(SqlCharStringLiteral.class::cast) + .map(c -> c.getValueAs(NlsString.class).getValue()) + .orElse(null); + } + Review Comment: it can be inlined, so remove this ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java: ########## @@ -53,11 +63,23 @@ public Map<String, String> getProperties() { return properties; } + public @Nullable String getComment() { + return comment; + } Review Comment: Remove this getter if there is no use. ########## flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java: ########## @@ -70,11 +80,28 @@ public SqlNodeList getPropertyList() { return propertyList; } + public Optional<SqlNode> getComment() { Review Comment: ```suggestion public Optional<SqlCharStringLiteral> getComment() { ``` ########## flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java: ########## @@ -45,11 +48,18 @@ public class SqlCreateCatalog extends SqlCreate { private final SqlNodeList propertyList; + private final @Nullable SqlNode comment; + public SqlCreateCatalog( - SqlParserPos position, SqlIdentifier catalogName, SqlNodeList propertyList) { - super(OPERATOR, position, false, false); + SqlParserPos position, + SqlIdentifier catalogName, + SqlNodeList propertyList, + @Nullable SqlNode comment, Review Comment: ```suggestion @Nullable SqlCharStringLiteral comment, ``` ########## flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java: ########## @@ -134,6 +134,28 @@ void testCreateCatalog() { + " 'key1' = 'value1',\n" + " 'key2' = 'value2'\n" + ")"); + sql("create catalog c1 comment 'hello'\n" + + " WITH (\n" + + " 'key1'='value1',\n" + + " 'key2'='value2'\n" + + " )\n") + .ok( + "CREATE CATALOG `C1`\n" + + "COMMENT 'hello' WITH (\n" + + " 'key1' = 'value1',\n" + + " 'key2' = 'value2'\n" + + ")"); + sql("create catalog if not exists c1 comment 'hello'\n" + + " WITH (\n" + + " 'key1'='value1',\n" + + " 'key2'='value2'\n" + + " )\n") + .ok( + "CREATE CATALOG IF NOT EXISTS `C1`\n" + + "COMMENT 'hello' WITH (\n" + + " 'key1' = 'value1',\n" + + " 'key2' = 'value2'\n" + + ")"); Review Comment: Add one more test for `CREATE CATALOG IF NOT EXISTS` without comment ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java: ########## @@ -48,18 +55,34 @@ public Configuration getConfiguration() { return configuration; } - private CatalogDescriptor(String catalogName, Configuration configuration) { + public Optional<String> getComment() { + return Optional.ofNullable(comment); + } + + public CatalogDescriptor setComment(String comment) { + return new CatalogDescriptor(catalogName, configuration, comment); + } Review Comment: remove this method and add it back for the next pr ########## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java: ########## @@ -367,14 +368,28 @@ void testCatalogStore() throws Exception { catalogManager.createCatalog("cat1", CatalogDescriptor.of("cat1", configuration)); catalogManager.createCatalog("cat2", CatalogDescriptor.of("cat2", configuration)); catalogManager.createCatalog("cat3", CatalogDescriptor.of("cat3", configuration)); + catalogManager.createCatalog( Review Comment: Please add the exception test as well. ```java assertThatThrownBy( () -> catalogManager.createCatalog( "cat_comment", CatalogDescriptor.of( "cat_comment", configuration.clone(), "second comment for catalog"), false)) .isInstanceOf(CatalogException.class) .hasMessage("Catalog cat_comment already exists in catalog store."); ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java: ########## @@ -41,6 +43,10 @@ public Operation convertSqlNode(SqlCreateCatalog node, ConvertContext context) { ((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString())); - return new CreateCatalogOperation(node.catalogName(), properties); + return new CreateCatalogOperation( + node.catalogName(), + properties, + getCatalogComment(node.getComment()), Review Comment: ```suggestion node.getComment().map(c -> c.getValueAs(String.class)).orElse(null), ``` ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -295,31 +295,42 @@ public DataTypeFactory getDataTypeFactory() { * * @param catalogName the given catalog name under which to create the given catalog * @param catalogDescriptor catalog descriptor for creating catalog + * @param ignoreIfExists if false exception will be thrown if a catalog exists. * @throws CatalogException If the catalog already exists in the catalog store or initialized * catalogs, or if an error occurs while creating the catalog or storing the {@link * CatalogDescriptor} */ - public void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor) + public void createCatalog( + String catalogName, CatalogDescriptor catalogDescriptor, boolean ignoreIfExists) throws CatalogException { checkArgument( !StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null"); if (catalogStoreHolder.catalogStore().contains(catalogName)) { - throw new CatalogException( - format("Catalog %s already exists in catalog store.", catalogName)); + if (!ignoreIfExists) { Review Comment: The logic is a little bit cumbersome, what about ```java public void createCatalog( String catalogName, CatalogDescriptor catalogDescriptor, boolean ignoreIfExists) throws CatalogException { checkArgument( !StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null"); boolean catalogExistsInStore = catalogStoreHolder.catalogStore().contains(catalogName); boolean catalogExistsInMemory = catalogs.containsKey(catalogName); if (catalogExistsInStore || catalogExistsInMemory) { if (!ignoreIfExists) { throw new CatalogException( format("Catalog %s already exists.", catalogName)); } } else { // Store the catalog in the catalog store catalogStoreHolder.catalogStore().storeCatalog(catalogName, catalogDescriptor); // Initialize and store the catalog in memory Catalog catalog = initCatalog(catalogName, catalogDescriptor); catalog.open(); catalogs.put(catalogName, catalog); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org