This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4d39738587e [FLINK-34993][flink-sql-parser] Support MODEL syntax in parser 4d39738587e is described below commit 4d39738587e0f07a4776eebb83e2bcaf28eef604 Author: Hao Li <1127478+lihao...@users.noreply.github.com> AuthorDate: Fri May 10 01:04:58 2024 -0700 [FLINK-34993][flink-sql-parser] Support MODEL syntax in parser --- .../src/main/codegen/data/Parser.tdd | 12 + .../src/main/codegen/includes/parserImpls.ftl | 248 +++++++++++++++++++-- .../apache/flink/sql/parser/ddl/SqlAlterModel.java | 146 ++++++++++++ .../flink/sql/parser/ddl/SqlCreateModel.java | 195 ++++++++++++++++ .../flink/sql/parser/ddl/SqlCreateModelAs.java | 122 ++++++++++ .../apache/flink/sql/parser/ddl/SqlDropModel.java | 91 ++++++++ .../flink/sql/parser/dql/SqlRichDescribeModel.java | 79 +++++++ .../flink/sql/parser/dql/SqlShowCreateModel.java | 65 ++++++ .../apache/flink/sql/parser/dql/SqlShowModels.java | 126 +++++++++++ .../flink/sql/parser/FlinkSqlParserImplTest.java | 205 +++++++++++++++++ .../table/planner/calcite/FlinkPlannerImpl.scala | 2 + tools/maven/suppressions.xml | 1 + 12 files changed, 1275 insertions(+), 17 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 883b6aec1b2..b9215460555 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -45,6 +45,7 @@ "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh" "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume" "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSuspend" + "org.apache.flink.sql.parser.ddl.SqlAlterModel" "org.apache.flink.sql.parser.ddl.SqlAlterTable" "org.apache.flink.sql.parser.ddl.SqlAlterTable.AlterTableContext" "org.apache.flink.sql.parser.ddl.SqlAlterTableAdd" @@ -67,6 +68,8 @@ "org.apache.flink.sql.parser.ddl.SqlCreateDatabase" "org.apache.flink.sql.parser.ddl.SqlCreateFunction" "org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable" + "org.apache.flink.sql.parser.ddl.SqlCreateModel" + "org.apache.flink.sql.parser.ddl.SqlCreateModelAs" "org.apache.flink.sql.parser.ddl.SqlCreateTable" "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext" "org.apache.flink.sql.parser.ddl.SqlCreateTableAs" @@ -77,6 +80,7 @@ "org.apache.flink.sql.parser.ddl.SqlDropDatabase" "org.apache.flink.sql.parser.ddl.SqlDropFunction" "org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable" + "org.apache.flink.sql.parser.ddl.SqlDropModel" "org.apache.flink.sql.parser.ddl.SqlDropPartitions" "org.apache.flink.sql.parser.ddl.SqlDropPartitions.AlterTableDropPartitionsContext" "org.apache.flink.sql.parser.ddl.SqlDropTable" @@ -116,16 +120,19 @@ "org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase" "org.apache.flink.sql.parser.dql.SqlShowFunctions" "org.apache.flink.sql.parser.dql.SqlShowJars" + "org.apache.flink.sql.parser.dql.SqlShowModels" "org.apache.flink.sql.parser.dql.SqlShowModules" "org.apache.flink.sql.parser.dql.SqlShowPartitions" "org.apache.flink.sql.parser.dql.SqlShowProcedures" "org.apache.flink.sql.parser.dql.SqlShowTables" "org.apache.flink.sql.parser.dql.SqlShowColumns" "org.apache.flink.sql.parser.dql.SqlShowCreate" + "org.apache.flink.sql.parser.dql.SqlShowCreateModel" "org.apache.flink.sql.parser.dql.SqlShowCreateTable" "org.apache.flink.sql.parser.dql.SqlShowCreateView" "org.apache.flink.sql.parser.dql.SqlShowCreateCatalog" "org.apache.flink.sql.parser.dql.SqlShowViews" + "org.apache.flink.sql.parser.dql.SqlRichDescribeModel" "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" "org.apache.flink.sql.parser.dql.SqlUnloadModule" "org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction" @@ -185,6 +192,8 @@ "JOBS" "LOAD" "METADATA" + "MODEL" + "MODELS" "MATERIALIZED" "MODIFY" "MODULES" @@ -593,11 +602,13 @@ "SqlDescribeDatabase()" "SqlAlterFunction()" "SqlShowFunctions()" + "SqlShowModels()" "SqlShowTables()" "SqlShowColumns()" "SqlShowCreate()" "SqlReplaceTable()" "SqlAlterMaterializedTable()" + "SqlAlterModel()" "SqlAlterTable()" "SqlAlterView()" "SqlShowModules()" @@ -621,6 +632,7 @@ "SqlShowJobs()" "SqlTruncateTable()" "SqlDescribeJob()" + "SqlRichDescribeModel()" "SqlRichDescribeTable()" ] diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index b2a5ea02d0f..6d524422625 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -121,7 +121,7 @@ SqlCreate SqlCreateCatalog(Span s, boolean replace) : catalogName = SimpleIdentifier() [ <WITH> - propertyList = TableProperties() + propertyList = Properties() ] { return new SqlCreateCatalog(startPos.plus(getPos()), @@ -161,7 +161,7 @@ SqlAlterCatalog SqlAlterCatalog() : <ALTER> <CATALOG> { startPos = getPos(); } catalogName = SimpleIdentifier() <SET> - propertyList = TableProperties() + propertyList = Properties() { return new SqlAlterCatalogOptions(startPos.plus(getPos()), catalogName, @@ -262,7 +262,7 @@ SqlCreate SqlCreateDatabase(Span s, boolean replace) : ] [ <WITH> - propertyList = TableProperties() + propertyList = Properties() ] { return new SqlCreateDatabase(startPos.plus(getPos()), @@ -283,7 +283,7 @@ SqlAlterDatabase SqlAlterDatabase() : <ALTER> <DATABASE> { startPos = getPos(); } databaseName = CompoundIdentifier() <SET> - propertyList = TableProperties() + propertyList = Properties() { return new SqlAlterDatabase(startPos.plus(getPos()), databaseName, @@ -712,9 +712,35 @@ SqlShowCreate SqlShowCreate() : { return new SqlShowCreateCatalog(pos, sqlIdentifier); } + | + <MODEL> + { pos = getPos(); } + sqlIdentifier = CompoundIdentifier() + { + return new SqlShowCreateModel(pos, sqlIdentifier); + } ) } +/** + * DESCRIBE | DESC MODEL [ EXTENDED] [[catalogName.] dataBasesName].modelName sql call. + * Here we add Rich in className to match the naming of SqlRichDescribeTable. + */ +SqlRichDescribeModel SqlRichDescribeModel() : +{ + SqlIdentifier modelName; + SqlParserPos pos; + boolean isExtended = false; +} +{ + ( <DESCRIBE> | <DESC> ) <MODEL> { pos = getPos();} + [ <EXTENDED> { isExtended = true;} ] + modelName = CompoundIdentifier() + { + return new SqlRichDescribeModel(pos, modelName, isExtended); + } +} + /** * DESCRIBE | DESC [ EXTENDED] [[catalogName.] dataBasesName].tableName sql call. * Here we add Rich in className to distinguish from calcite's original SqlDescribeTable. @@ -782,7 +808,7 @@ SqlAlterTable SqlAlterTable() : } | <RESET> - propertyKeyList = TablePropertyKeys() + propertyKeyList = PropertyKeys() { return new SqlAlterTableReset( startPos.plus(getPos()), @@ -792,7 +818,7 @@ SqlAlterTable SqlAlterTable() : } | <SET> - propertyList = TableProperties() + propertyList = Properties() { return new SqlAlterTableOptions( startPos.plus(getPos()), @@ -926,7 +952,7 @@ SqlAlterTable SqlAlterTable() : } /** Parse a table option key list. */ -SqlNodeList TablePropertyKeys(): +SqlNodeList PropertyKeys(): { SqlNode key; final List<SqlNode> proKeyList = new ArrayList<SqlNode>(); @@ -1115,7 +1141,7 @@ void AlterTableAddPartition(AlterTableAddPartitionContext context) : partProp = null; PartitionSpecCommaList(partSpec); } - [ <WITH> { partProp = TableProperties(); } ] + [ <WITH> { partProp = Properties(); } ] { partSpecs.add(partSpec); partProps.add(partProp); @@ -1360,8 +1386,8 @@ SqlNode TableOption() : } } -/** Parse a table properties. */ -SqlNodeList TableProperties(): +/** Parse properties such as ('k' = 'v'). */ +SqlNodeList Properties(): { SqlNode property; final List<SqlNode> proList = new ArrayList<SqlNode>(); @@ -1482,7 +1508,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : ] [ <WITH> - propertyList = TableProperties() + propertyList = Properties() ] [ <LIKE> @@ -1689,7 +1715,7 @@ SqlNode SqlReplaceTable() : ] [ <WITH> - propertyList = TableProperties() + propertyList = Properties() ] <AS> asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) @@ -1759,7 +1785,7 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar ] [ <WITH> - propertyList = TableProperties() + propertyList = Properties() ] <FRESHNESS> <EQ> freshness = Expression(ExprContext.ACCEPT_NON_QUERY) @@ -1851,7 +1877,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() : } | <RESUME> - [ <WITH> propertyList = TableProperties() ] + [ <WITH> propertyList = Properties() ] { return new SqlAlterMaterializedTableResume( startPos, @@ -1903,7 +1929,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() : } ) | - propertyList = TableProperties() + propertyList = Properties() { return new SqlAlterMaterializedTableOptions( startPos.plus(getPos()), @@ -1913,7 +1939,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() : ) | <RESET> - propertyKeyList = TablePropertyKeys() + propertyKeyList = PropertyKeys() { return new SqlAlterMaterializedTableReset( startPos.plus(getPos()), @@ -2437,6 +2463,8 @@ SqlCreate SqlCreateExtended(Span s, boolean replace) : create = SqlCreateDatabase(s, replace) | create = SqlCreateFunction(s, replace, isTemporary) + | + create = SqlCreateModel(s, isTemporary) ) { return create; @@ -2464,6 +2492,8 @@ SqlDrop SqlDropExtended(Span s, boolean replace) : drop = SqlDropDatabase(s, replace) | drop = SqlDropFunction(s, replace, isTemporary) + | + drop = SqlDropModel(s, isTemporary) ) { return drop; @@ -2504,7 +2534,7 @@ SqlLoadModule SqlLoadModule() : moduleName = SimpleIdentifier() [ <WITH> - propertyList = TableProperties() + propertyList = Properties() ] { return new SqlLoadModule(startPos.plus(getPos()), @@ -3089,3 +3119,187 @@ SqlTruncateTable SqlTruncateTable() : return new SqlTruncateTable(getPos(), sqlIdentifier); } } + +/** +* SHOW MODELS [FROM [catalog.] database] sql call. +*/ +SqlShowModels SqlShowModels() : +{ + SqlIdentifier databaseName = null; + SqlCharStringLiteral likeLiteral = null; + String prep = null; + boolean notLike = false; + SqlParserPos pos; +} +{ + <SHOW> <MODELS> + { pos = getPos(); } + [ + ( <FROM> { prep = "FROM"; } | <IN> { prep = "IN"; } ) + { pos = getPos(); } + databaseName = CompoundIdentifier() + ] + [ + [ + <NOT> + { + notLike = true; + } + ] + <LIKE> <QUOTED_STRING> + { + String likeCondition = SqlParserUtil.parseString(token.image); + likeLiteral = SqlLiteral.createCharString(likeCondition, getPos()); + } + ] + { + return new SqlShowModels(pos, prep, databaseName, notLike, likeLiteral); + } +} + +/** +* ALTER MODEL [IF EXISTS] modelName SET (property_key = property_val, ...) +* ALTER MODEL [IF EXISTS] modelName RENAME TO newModelName +*/ +SqlAlterModel SqlAlterModel() : +{ + SqlParserPos startPos; + boolean ifExists = false; + SqlIdentifier modelIdentifier; + SqlIdentifier newModelIdentifier = null; + SqlNodeList propertyList = SqlNodeList.EMPTY; +} +{ + <ALTER> <MODEL> { startPos = getPos(); } + ifExists = IfExistsOpt() + modelIdentifier = CompoundIdentifier() + ( + LOOKAHEAD(2) + <RENAME> <TO> + newModelIdentifier = CompoundIdentifier() + { + return new SqlAlterModel( + startPos.plus(getPos()), + modelIdentifier, + newModelIdentifier, + ifExists); + } + | + <SET> + propertyList = Properties() + { + return new SqlAlterModel( + startPos.plus(getPos()), + modelIdentifier, + propertyList, + ifExists); + } + ) +} + +/** +* DROP MODEL [IF EXIST] modelName +*/ +SqlDrop SqlDropModel(Span s, boolean isTemporary) : +{ + SqlIdentifier modelIdentifier = null; + boolean ifExists = false; +} +{ + <MODEL> + + ifExists = IfExistsOpt() + + modelIdentifier = CompoundIdentifier() + + { + return new SqlDropModel(s.pos(), modelIdentifier, ifExists, isTemporary); + } +} + +/** +* CREATE MODEL [IF NOT EXIST] modelName +* [INPUT(col1 type1, col2 type2, ...)] +* [OUTPUT(col3 type1, col4 type4, ...)] +* [COMMENT model_comment] +* WITH (option_key = option_val, ...) +* [AS SELECT ...] +*/ +SqlCreate SqlCreateModel(Span s, boolean isTemporary) : +{ + final SqlParserPos startPos = s.pos(); + boolean ifNotExists = false; + SqlIdentifier modelIdentifier; + SqlNodeList inputColumnList = SqlNodeList.EMPTY; + SqlNodeList outputColumnList = SqlNodeList.EMPTY; + SqlCharStringLiteral comment = null; + SqlNodeList propertyList = SqlNodeList.EMPTY; + SqlNode asQuery = null; + SqlParserPos pos = startPos; +} +{ + <MODEL> + + ifNotExists = IfNotExistsOpt() + + modelIdentifier = CompoundIdentifier() + [ + <INPUT> <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} + TableColumn(ctx) + ( + <COMMA> TableColumn(ctx) + )* + { + pos = pos.plus(getPos()); + inputColumnList = new SqlNodeList(ctx.columnList, pos); + } + <RPAREN> + ] + [ + <OUTPUT> <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} + TableColumn(ctx) + ( + <COMMA> TableColumn(ctx) + )* + { + pos = pos.plus(getPos()); + outputColumnList = new SqlNodeList(ctx.columnList, pos); + } + <RPAREN> + ] + [ <COMMENT> <QUOTED_STRING> + { + String p = SqlParserUtil.parseString(token.image); + comment = SqlLiteral.createCharString(p, getPos()); + } + ] + [ + <WITH> + propertyList = Properties() + ] + [ + <AS> + asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) + { + return new SqlCreateModelAs(startPos.plus(getPos()), + modelIdentifier, + comment, + inputColumnList, + outputColumnList, + propertyList, + asQuery, + isTemporary, + ifNotExists); + } + ] + { + return new SqlCreateModel(startPos.plus(getPos()), + modelIdentifier, + comment, + inputColumnList, + outputColumnList, + propertyList, + isTemporary, + ifNotExists); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModel.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModel.java new file mode 100644 index 00000000000..f09e9e04a8e --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModel.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * ALTER MODEL [IF EXISTS] [[catalogName.] dataBasesName].modelName SET ( name=value [, + * name=value]*). + */ +public class SqlAlterModel extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("ALTER MODEL", SqlKind.OTHER_DDL); + + protected final SqlIdentifier modelName; + protected final SqlIdentifier newModelName; + protected final boolean ifModelExists; + private final SqlNodeList propertyList; + + public SqlAlterModel( + SqlParserPos pos, + SqlIdentifier modelName, + SqlNodeList propertyList, + boolean ifModelExists) { + super(pos); + this.modelName = requireNonNull(modelName, "modelName should not be null"); + this.newModelName = null; + this.propertyList = requireNonNull(propertyList, "propertyList should not be null"); + this.ifModelExists = ifModelExists; + } + + public SqlAlterModel( + SqlParserPos pos, + SqlIdentifier modelName, + SqlIdentifier newModelName, + boolean ifModelExists) { + super(pos); + this.modelName = requireNonNull(modelName, "modelName should not be null"); + this.newModelName = requireNonNull(newModelName, "newModelName should not be null"); + this.propertyList = null; + this.ifModelExists = ifModelExists; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + public SqlIdentifier getModelName() { + return modelName; + } + + public String[] fullModelName() { + return modelName.names.toArray(new String[0]); + } + + /** + * Whether to ignore the error if the model doesn't exist. + * + * @return true when IF EXISTS is specified. + */ + public boolean ifModelExists() { + return ifModelExists; + } + + public SqlIdentifier getNewModelName() { + return newModelName; + } + + public String[] fullNewModelName() { + if (newModelName != null) { + return newModelName.names.toArray(new String[0]); + } + return new String[0]; + } + + public SqlNodeList getPropertyList() { + return propertyList; + } + + @Override + public List<SqlNode> getOperandList() { + // Rename Model. + if (newModelName != null) { + return ImmutableNullableList.of(modelName, newModelName); + } + return ImmutableNullableList.of(modelName, propertyList); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("ALTER MODEL"); + if (ifModelExists) { + writer.keyword("IF EXISTS"); + } + modelName.unparse(writer, leftPrec, rightPrec); + if (newModelName != null) { + // Rename Model. + writer.keyword("RENAME TO"); + newModelName.unparse(writer, leftPrec, rightPrec); + } else { + writer.keyword("SET"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + if (propertyList != null) { + for (SqlNode modelOption : propertyList) { + SqlUnparseUtils.printIndent(writer); + modelOption.unparse(writer, leftPrec, rightPrec); + } + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java new file mode 100644 index 00000000000..1e2ea31fcc6 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.ExtendedSqlNode; +import org.apache.flink.sql.parser.SqlUnparseUtils; +import org.apache.flink.sql.parser.error.SqlValidateException; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlCreate; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * {@link SqlNode} to describe the CREATE MODEL syntax. CREATE MODEL [IF NOT EXISTS] [[catalogName.] + * dataBasesName].modelName WITH (name=value, [name=value]*). + */ +public class SqlCreateModel extends SqlCreate implements ExtendedSqlNode { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("CREATE MODEL", SqlKind.OTHER_DDL); + + private final SqlIdentifier modelName; + + @Nullable private final SqlCharStringLiteral comment; + + private final SqlNodeList inputColumnList; + + private final SqlNodeList outputColumnList; + + private final SqlNodeList propertyList; + + private final boolean isTemporary; + + private final boolean ifNotExists; + + public SqlCreateModel( + SqlParserPos pos, + SqlIdentifier modelName, + SqlCharStringLiteral comment, + SqlNodeList inputColumnList, + SqlNodeList outputColumnList, + SqlNodeList propertyList, + boolean isTemporary, + boolean ifNotExists) { + super(OPERATOR, pos, false, ifNotExists); + this.modelName = requireNonNull(modelName, "modelName should not be null"); + this.comment = comment; + this.inputColumnList = inputColumnList; + this.outputColumnList = outputColumnList; + this.propertyList = requireNonNull(propertyList, "propertyList should not be null"); + this.isTemporary = isTemporary; + this.ifNotExists = ifNotExists; + } + + @Override + public @Nonnull SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public @Nonnull List<SqlNode> getOperandList() { + return ImmutableNullableList.of( + modelName, comment, inputColumnList, outputColumnList, propertyList); + } + + public SqlIdentifier getModelName() { + return modelName; + } + + public Optional<SqlCharStringLiteral> getComment() { + return Optional.ofNullable(comment); + } + + public SqlNodeList getInputColumnList() { + return inputColumnList; + } + + public SqlNodeList getOutputColumnList() { + return outputColumnList; + } + + public SqlNodeList getPropertyList() { + return propertyList; + } + + public boolean isTemporary() { + return isTemporary; + } + + public boolean isIfNotExists() { + return ifNotExists; + } + + @Override + public void validate() throws SqlValidateException { + if (!inputColumnList.isEmpty() && outputColumnList.isEmpty()) { + throw new SqlValidateException( + inputColumnList.get(0).getParserPosition(), + "Output column list can not be empty with non-empty input column list."); + } + if (inputColumnList.isEmpty() && !outputColumnList.isEmpty()) { + throw new SqlValidateException( + outputColumnList.get(0).getParserPosition(), + "Input column list can not be empty with non-empty output column list."); + } + if (propertyList.isEmpty()) { + throw new SqlValidateException( + getParserPosition(), "Model property list can not be empty."); + } + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("CREATE"); + writer.keyword("MODEL"); + if (isIfNotExists()) { + writer.keyword("IF NOT EXISTS"); + } + modelName.unparse(writer, leftPrec, rightPrec); + if (!inputColumnList.isEmpty()) { + writer.keyword("INPUT"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode column : inputColumnList) { + SqlUnparseUtils.printIndent(writer); + column.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } + + if (!outputColumnList.isEmpty()) { + writer.keyword("OUTPUT"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode column : outputColumnList) { + SqlUnparseUtils.printIndent(writer); + column.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } + + if (comment != null) { + writer.newlineAndIndent(); + writer.keyword("COMMENT"); + comment.unparse(writer, leftPrec, rightPrec); + } + + if (!this.propertyList.isEmpty()) { + writer.keyword("WITH"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode modelProperty : propertyList) { + SqlUnparseUtils.printIndent(writer); + modelProperty.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } + } + + public String[] fullModelName() { + return modelName.names.toArray(new String[0]); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModelAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModelAs.java new file mode 100644 index 00000000000..c1ec69efe20 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModelAs.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.error.SqlValidateException; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import javax.annotation.Nonnull; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * {@link SqlNode} to describe the CREATE MODEL AS syntax. The CTAS would create a pipeline to + * compute the result of the given query and use the data to train the model. + * + * <p>Example: + * + * <pre>{@code + * CREATE MODEL my_model WITH (name=value, [name=value]*) + * ) AS SELECT col1, col2, label FROM base_table; + * }</pre> + */ +public class SqlCreateModelAs extends SqlCreateModel { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("CREATE MODEL AS", SqlKind.OTHER_DDL); + + private final SqlNode asQuery; + + public SqlCreateModelAs( + SqlParserPos pos, + SqlIdentifier modelName, + SqlCharStringLiteral comment, + SqlNodeList inputColumnList, + SqlNodeList outputColumnList, + SqlNodeList propertyList, + SqlNode asQuery, + boolean isTemporary, + boolean ifNotExists) { + super( + pos, + modelName, + comment, + inputColumnList, + outputColumnList, + propertyList, + isTemporary, + ifNotExists); + this.asQuery = + requireNonNull(asQuery, "As clause is required for CREATE MODEL AS SELECT DDL"); + } + + @Override + public @Nonnull SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public @Nonnull List<SqlNode> getOperandList() { + return ImmutableNullableList.<SqlNode>builder() + .addAll(super.getOperandList()) + .add(asQuery) + .build(); + } + + @Override + public void validate() throws SqlValidateException { + if (!getInputColumnList().isEmpty()) { + throw new SqlValidateException( + getParserPosition(), + "CREATE MODEL AS SELECT syntax does not support to specify explicit input columns."); + } + if (!getOutputColumnList().isEmpty()) { + throw new SqlValidateException( + getParserPosition(), + "CREATE MODEL AS SELECT syntax does not support to specify explicit output columns."); + } + super.validate(); + } + + public SqlNode getAsQuery() { + return asQuery; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + + writer.newlineAndIndent(); + writer.keyword("AS"); + writer.newlineAndIndent(); + this.asQuery.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java new file mode 100644 index 00000000000..ac2a9acb78c --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlDrop; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +/** + * {@link SqlNode} to describe the DROP MODEL [IF EXISTS] [[catalogName.] dataBasesName].modelName + * syntax. + */ +public class SqlDropModel extends SqlDrop { + private static final SqlOperator OPERATOR = + new SqlSpecialOperator("DROP MODEL", SqlKind.OTHER_DDL); + + private SqlIdentifier modelName; + private boolean ifExists; + private boolean isTemporary; + + public SqlDropModel( + SqlParserPos pos, SqlIdentifier modelName, boolean ifExists, boolean isTemporary) { + super(OPERATOR, pos, ifExists); + this.modelName = modelName; + this.ifExists = ifExists; + this.isTemporary = isTemporary; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(modelName); + } + + public SqlIdentifier getModelName() { + return modelName; + } + + public void setModelName(SqlIdentifier modelName) { + this.modelName = modelName; + } + + public boolean getIfExists() { + return this.ifExists; + } + + public boolean getIsTemporary() { + return this.isTemporary; + } + + public void setIfExists(boolean ifExists) { + this.ifExists = ifExists; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("DROP"); + writer.keyword("MODEL"); + if (ifExists) { + writer.keyword("IF EXISTS"); + } + modelName.unparse(writer, leftPrec, rightPrec); + } + + public String[] fullModelName() { + return modelName.names.toArray(new String[0]); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeModel.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeModel.java new file mode 100644 index 00000000000..b756122cd1b --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeModel.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** + * DESCRIBE MODEL [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier sql call. Here we add Rich + * in className to follow the convention of {@link org.apache.calcite.sql.SqlDescribeTable}, which + * only had it to distinguish from calcite's original SqlDescribeTable, even though calcite does not + * have SqlDescribeModel. + */ +public class SqlRichDescribeModel extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("DESCRIBE MODEL", SqlKind.OTHER); + protected final SqlIdentifier modelNameIdentifier; + private boolean isExtended; + + public SqlRichDescribeModel( + SqlParserPos pos, SqlIdentifier modelNameIdentifier, boolean isExtended) { + super(pos); + this.modelNameIdentifier = modelNameIdentifier; + this.isExtended = isExtended; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return Collections.singletonList(modelNameIdentifier); + } + + public boolean isExtended() { + return isExtended; + } + + public String[] fullModelName() { + return modelNameIdentifier.names.toArray(new String[0]); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("DESCRIBE MODEL"); + if (isExtended) { + writer.keyword("EXTENDED"); + } + modelNameIdentifier.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateModel.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateModel.java new file mode 100644 index 00000000000..54b58b651e6 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateModel.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** SHOW CREATE MODEL sql call. */ +public class SqlShowCreateModel extends SqlShowCreate { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("SHOW CREATE MODEL", SqlKind.OTHER_DDL); + + public SqlShowCreateModel(SqlParserPos pos, SqlIdentifier modelName) { + super(pos, modelName); + } + + public SqlIdentifier getModelName() { + return sqlIdentifier; + } + + public String[] getFullModelName() { + return sqlIdentifier.names.toArray(new String[0]); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return Collections.singletonList(sqlIdentifier); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("SHOW CREATE MODEL"); + sqlIdentifier.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowModels.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowModels.java new file mode 100644 index 00000000000..af11de261bf --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowModels.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** {@link SqlNode} to describe the SHOW MODELS syntax. */ +public class SqlShowModels extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("SHOW MODELS", SqlKind.OTHER); + + protected final SqlIdentifier databaseName; + protected final String preposition; + protected final boolean notLike; + protected final SqlCharStringLiteral likeLiteral; + + public SqlShowModels(SqlParserPos pos) { + super(pos); + this.preposition = null; + this.databaseName = null; + this.notLike = false; + this.likeLiteral = null; + } + + public SqlShowModels( + SqlParserPos pos, + String preposition, + SqlIdentifier databaseName, + boolean notLike, + SqlCharStringLiteral likeLiteral) { + super(pos); + this.preposition = preposition; + this.databaseName = + preposition != null + ? requireNonNull(databaseName, "Database name must not be null.") + : null; + this.notLike = notLike; + this.likeLiteral = likeLiteral; + } + + public String getLikeSqlPattern() { + return Objects.isNull(this.likeLiteral) ? null : likeLiteral.getValueAs(String.class); + } + + public boolean isNotLike() { + return notLike; + } + + public SqlCharStringLiteral getLikeLiteral() { + return likeLiteral; + } + + public boolean isWithLike() { + return Objects.nonNull(likeLiteral); + } + + public String getPreposition() { + return preposition; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return Objects.isNull(this.databaseName) + ? Collections.emptyList() + : Collections.singletonList(databaseName); + } + + public String[] fullDatabaseName() { + return Objects.isNull(this.databaseName) + ? new String[] {} + : databaseName.names.toArray(new String[0]); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + if (this.preposition == null) { + writer.keyword("SHOW MODELS"); + } else if (databaseName != null) { + writer.keyword("SHOW MODELS " + this.preposition); + databaseName.unparse(writer, leftPrec, rightPrec); + } + if (isWithLike()) { + if (isNotLike()) { + writer.keyword(String.format("NOT LIKE '%s'", getLikeSqlPattern())); + } else { + writer.keyword(String.format("LIKE '%s'", getLikeSqlPattern())); + } + } + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 3397485635a..a6c2bab9b80 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -394,6 +394,12 @@ class FlinkSqlParserImplTest extends SqlParserTest { .fails("(?s).*Encountered \"likes\" at line 1, column 22.\n.*"); } + @Test + void testShowCreateModel() { + sql("show create model m1").ok("SHOW CREATE MODEL `M1`"); + sql("show create model catalog1.db1.m1").ok("SHOW CREATE MODEL `CATALOG1`.`DB1`.`M1`"); + } + @Test void testShowCreateTable() { sql("show create table tbl").ok("SHOW CREATE TABLE `TBL`"); @@ -418,6 +424,15 @@ class FlinkSqlParserImplTest extends SqlParserTest { sql("desc extended db1").ok("DESCRIBE EXTENDED `DB1`"); } + @Test + void testDescribeModel() { + sql("describe model mdl").ok("DESCRIBE MODEL `MDL`"); + sql("describe model catalog1.db1.mdl").ok("DESCRIBE MODEL `CATALOG1`.`DB1`.`MDL`"); + + sql("desc model mdl").ok("DESCRIBE MODEL `MDL`"); + sql("desc model catalog1.db1.mdl").ok("DESCRIBE MODEL `CATALOG1`.`DB1`.`MDL`"); + } + @Test void testShowColumns() { sql("show columns from tbl").ok("SHOW COLUMNS FROM `TBL`"); @@ -3002,6 +3017,196 @@ class FlinkSqlParserImplTest extends SqlParserTest { }; } + @Test + void testShowModels() { + sql("show models").ok("SHOW MODELS"); + sql("show models from db1").ok("SHOW MODELS FROM `DB1`"); + sql("show models from catalog1.db1").ok("SHOW MODELS FROM `CATALOG1`.`DB1`"); + sql("show models in db1").ok("SHOW MODELS IN `DB1`"); + sql("show models in catalog1.db1").ok("SHOW MODELS IN `CATALOG1`.`DB1`"); + } + + @Test + void testDropModel() { + sql("drop model m1").ok("DROP MODEL `M1`"); + sql("drop model db1.m1").ok("DROP MODEL `DB1`.`M1`"); + sql("drop model catalog1.db1.m1").ok("DROP MODEL `CATALOG1`.`DB1`.`M1`"); + } + + @Test + void testDropModelIfExists() { + sql("drop model if exists catalog1.db1.m1") + .ok("DROP MODEL IF EXISTS `CATALOG1`.`DB1`.`M1`"); + } + + @Test + void testAlterModel() { + final String sql = "alter model m1 set ('key1' = 'value1','key2' = 'value2')"; + final String expected = + "ALTER MODEL `M1` SET (\n" + + " 'key1' = 'value1',\n" + + " 'key2' = 'value2'\n" + + ")"; + sql(sql).ok(expected); + } + + @Test + void testAlterModelIfExists() { + final String sql = "alter model if exists m1 set ('key1' = 'value1','key2' = 'value2')"; + final String expected = + "ALTER MODEL IF EXISTS `M1` SET (\n" + + " 'key1' = 'value1',\n" + + " 'key2' = 'value2'\n" + + ")"; + sql(sql).ok(expected); + } + + @Test + void testAlterModelRename() { + final String sql = "alter model m1 rename to m2"; + final String expected = "ALTER MODEL `M1` RENAME TO `M2`"; + sql(sql).ok(expected); + } + + @Test + void testAlterModelRenameIfExists() { + final String sql = "alter model if exists m1 rename to m2"; + final String expected = "ALTER MODEL IF EXISTS `M1` RENAME TO `M2`"; + sql(sql).ok(expected); + } + + @Test + void testCreateModel() { + sql("create model m1\n" + + " INPUT(col1 INT, col2 STRING)\n" + + " OUTPUT(label DOUBLE)\n" + + " COMMENT 'model_comment'\n" + + " WITH (\n" + + " 'key1'='value1',\n" + + " 'key2'='value2'\n" + + " )\n") + .ok( + "CREATE MODEL `M1` INPUT (\n" + + " `COL1` INTEGER,\n" + + " `COL2` STRING\n" + + ") OUTPUT (\n" + + " `LABEL` DOUBLE\n" + + ")\n" + + "COMMENT 'model_comment' WITH (\n" + + " 'key1' = 'value1',\n" + + " 'key2' = 'value2'\n" + + ")"); + } + + @Test + void testCreateModelIfNotExists() { + sql("create model if not exists m1\n" + + " INPUT(col1 INT, col2 STRING)\n" + + " OUTPUT(label DOUBLE)\n" + + " COMMENT 'model_comment'\n" + + " WITH (\n" + + " 'key1'='value1',\n" + + " 'key2'='value2'\n" + + " )\n") + .ok( + "CREATE MODEL IF NOT EXISTS `M1` INPUT (\n" + + " `COL1` INTEGER,\n" + + " `COL2` STRING\n" + + ") OUTPUT (\n" + + " `LABEL` DOUBLE\n" + + ")\n" + + "COMMENT 'model_comment' WITH (\n" + + " 'key1' = 'value1',\n" + + " 'key2' = 'value2'\n" + + ")"); + } + + @Test + void testCreateModelAs() { + sql("create model m1\n" + + " WITH (\n" + + " 'key1'='value1',\n" + + " 'key2'='value2'\n" + + " ) as select f1, f2 from t1\n") + .ok( + "CREATE MODEL `M1` WITH (\n" + + " 'key1' = 'value1',\n" + + " 'key2' = 'value2'\n" + + ")\n" + + "AS\n" + + "SELECT `F1`, `F2`\n" + + "FROM `T1`"); + } + + @Test + void testCreateModelAsIfNotExists() { + sql("create model if not exists m1\n" + + " WITH (\n" + + " 'key1'='value1',\n" + + " 'key2'='value2'\n" + + " ) as select f1, f2 from t1\n") + .ok( + "CREATE MODEL IF NOT EXISTS `M1` WITH (\n" + + " 'key1' = 'value1',\n" + + " 'key2' = 'value2'\n" + + ")\n" + + "AS\n" + + "SELECT `F1`, `F2`\n" + + "FROM `T1`"); + } + + @Test + void testCreateModelAsWithInput() { + sql("create model if not exists m1\n" + + " INPUT(col1 INT, col2 STRING)\n" + + " OUTPUT(label DOUBLE)\n" + + " WITH (\n" + + " 'key1'='value1',\n" + + " 'key2'='value2'\n" + + " ) as select f1, f2 from t1\n") + .ok( + "CREATE MODEL IF NOT EXISTS `M1` INPUT (\n" + + " `COL1` INTEGER,\n" + + " `COL2` STRING\n" + + ") OUTPUT (\n" + + " `LABEL` DOUBLE\n" + + ") WITH (\n" + + " 'key1' = 'value1',\n" + + " 'key2' = 'value2'\n" + + ")\n" + + "AS\n" + + "SELECT `F1`, `F2`\n" + + "FROM `T1`") + .node( + new ValidationMatcher() + .fails( + "CREATE MODEL AS SELECT syntax does not support to specify explicit input columns.")); + } + + @Test + void testCreateModelAsWithOutput() { + sql("create model if not exists m1\n" + + " OUTPUT(label DOUBLE)\n" + + " WITH (\n" + + " 'key1'='value1',\n" + + " 'key2'='value2'\n" + + " ) as select f1, f2 from t1\n") + .ok( + "CREATE MODEL IF NOT EXISTS `M1` OUTPUT (\n" + + " `LABEL` DOUBLE\n" + + ") WITH (\n" + + " 'key1' = 'value1',\n" + + " 'key2' = 'value2'\n" + + ")\n" + + "AS\n" + + "SELECT `F1`, `F2`\n" + + "FROM `T1`") + .node( + new ValidationMatcher() + .fails( + "CREATE MODEL AS SELECT syntax does not support to specify explicit output columns.")); + } + /** Matcher that invokes the #validate() of the {@link ExtendedSqlNode} instance. * */ private static class ValidationMatcher extends BaseMatcher<SqlNode> { private String expectedColumnSql; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index 7ad24920c3e..4420ec96372 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -138,6 +138,7 @@ class FlinkPlannerImpl( || sqlNode.isInstanceOf[SqlShowDatabases] || sqlNode.isInstanceOf[SqlShowCurrentDatabase] || sqlNode.isInstanceOf[SqlShowTables] + || sqlNode.isInstanceOf[SqlShowModels] || sqlNode.isInstanceOf[SqlShowFunctions] || sqlNode.isInstanceOf[SqlShowJars] || sqlNode.isInstanceOf[SqlShowModules] @@ -147,6 +148,7 @@ class FlinkPlannerImpl( || sqlNode.isInstanceOf[SqlShowProcedures] || sqlNode.isInstanceOf[SqlShowJobs] || sqlNode.isInstanceOf[SqlDescribeJob] + || sqlNode.isInstanceOf[SqlRichDescribeModel] || sqlNode.isInstanceOf[SqlRichDescribeTable] || sqlNode.isInstanceOf[SqlUnloadModule] || sqlNode.isInstanceOf[SqlUseModules] diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index e4203d8361a..a894698d492 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -34,6 +34,7 @@ under the License. <suppress files="NoticeFileCheckerTest.java" checks="IllegalImport"/> <suppress files="DependencyTree.java" checks="IllegalImport"/> + <suppress files="FlinkSqlParserImplTest.java" checks="FileLength"/> <suppress files="JoinOperator.java" checks="FileLength"/> <suppress files="WindowOperatorTest.java" checks="FileLength"/> <suppress files="WindowOperatorContractTest.java" checks="FileLength"/>