This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d39738587e [FLINK-34993][flink-sql-parser] Support MODEL syntax in 
parser
4d39738587e is described below

commit 4d39738587e0f07a4776eebb83e2bcaf28eef604
Author: Hao Li <1127478+lihao...@users.noreply.github.com>
AuthorDate: Fri May 10 01:04:58 2024 -0700

    [FLINK-34993][flink-sql-parser] Support MODEL syntax in parser
---
 .../src/main/codegen/data/Parser.tdd               |  12 +
 .../src/main/codegen/includes/parserImpls.ftl      | 248 +++++++++++++++++++--
 .../apache/flink/sql/parser/ddl/SqlAlterModel.java | 146 ++++++++++++
 .../flink/sql/parser/ddl/SqlCreateModel.java       | 195 ++++++++++++++++
 .../flink/sql/parser/ddl/SqlCreateModelAs.java     | 122 ++++++++++
 .../apache/flink/sql/parser/ddl/SqlDropModel.java  |  91 ++++++++
 .../flink/sql/parser/dql/SqlRichDescribeModel.java |  79 +++++++
 .../flink/sql/parser/dql/SqlShowCreateModel.java   |  65 ++++++
 .../apache/flink/sql/parser/dql/SqlShowModels.java | 126 +++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 205 +++++++++++++++++
 .../table/planner/calcite/FlinkPlannerImpl.scala   |   2 +
 tools/maven/suppressions.xml                       |   1 +
 12 files changed, 1275 insertions(+), 17 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 883b6aec1b2..b9215460555 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -45,6 +45,7 @@
     "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh"
     "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume"
     "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSuspend"
+    "org.apache.flink.sql.parser.ddl.SqlAlterModel"
     "org.apache.flink.sql.parser.ddl.SqlAlterTable"
     "org.apache.flink.sql.parser.ddl.SqlAlterTable.AlterTableContext"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableAdd"
@@ -67,6 +68,8 @@
     "org.apache.flink.sql.parser.ddl.SqlCreateDatabase"
     "org.apache.flink.sql.parser.ddl.SqlCreateFunction"
     "org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable"
+    "org.apache.flink.sql.parser.ddl.SqlCreateModel"
+    "org.apache.flink.sql.parser.ddl.SqlCreateModelAs"
     "org.apache.flink.sql.parser.ddl.SqlCreateTable"
     "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
     "org.apache.flink.sql.parser.ddl.SqlCreateTableAs"
@@ -77,6 +80,7 @@
     "org.apache.flink.sql.parser.ddl.SqlDropDatabase"
     "org.apache.flink.sql.parser.ddl.SqlDropFunction"
     "org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable"
+    "org.apache.flink.sql.parser.ddl.SqlDropModel"
     "org.apache.flink.sql.parser.ddl.SqlDropPartitions"
     
"org.apache.flink.sql.parser.ddl.SqlDropPartitions.AlterTableDropPartitionsContext"
     "org.apache.flink.sql.parser.ddl.SqlDropTable"
@@ -116,16 +120,19 @@
     "org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase"
     "org.apache.flink.sql.parser.dql.SqlShowFunctions"
     "org.apache.flink.sql.parser.dql.SqlShowJars"
+    "org.apache.flink.sql.parser.dql.SqlShowModels"
     "org.apache.flink.sql.parser.dql.SqlShowModules"
     "org.apache.flink.sql.parser.dql.SqlShowPartitions"
     "org.apache.flink.sql.parser.dql.SqlShowProcedures"
     "org.apache.flink.sql.parser.dql.SqlShowTables"
     "org.apache.flink.sql.parser.dql.SqlShowColumns"
     "org.apache.flink.sql.parser.dql.SqlShowCreate"
+    "org.apache.flink.sql.parser.dql.SqlShowCreateModel"
     "org.apache.flink.sql.parser.dql.SqlShowCreateTable"
     "org.apache.flink.sql.parser.dql.SqlShowCreateView"
     "org.apache.flink.sql.parser.dql.SqlShowCreateCatalog"
     "org.apache.flink.sql.parser.dql.SqlShowViews"
+    "org.apache.flink.sql.parser.dql.SqlRichDescribeModel"
     "org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
     "org.apache.flink.sql.parser.dql.SqlUnloadModule"
     "org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction"
@@ -185,6 +192,8 @@
     "JOBS"
     "LOAD"
     "METADATA"
+    "MODEL"
+    "MODELS"
     "MATERIALIZED"
     "MODIFY"
     "MODULES"
@@ -593,11 +602,13 @@
     "SqlDescribeDatabase()"
     "SqlAlterFunction()"
     "SqlShowFunctions()"
+    "SqlShowModels()"
     "SqlShowTables()"
     "SqlShowColumns()"
     "SqlShowCreate()"
     "SqlReplaceTable()"
     "SqlAlterMaterializedTable()"
+    "SqlAlterModel()"
     "SqlAlterTable()"
     "SqlAlterView()"
     "SqlShowModules()"
@@ -621,6 +632,7 @@
     "SqlShowJobs()"
     "SqlTruncateTable()"
     "SqlDescribeJob()"
+    "SqlRichDescribeModel()"
     "SqlRichDescribeTable()"
   ]
 
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index b2a5ea02d0f..6d524422625 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -121,7 +121,7 @@ SqlCreate SqlCreateCatalog(Span s, boolean replace) :
     catalogName = SimpleIdentifier()
     [
         <WITH>
-        propertyList = TableProperties()
+        propertyList = Properties()
     ]
     {
         return new SqlCreateCatalog(startPos.plus(getPos()),
@@ -161,7 +161,7 @@ SqlAlterCatalog SqlAlterCatalog() :
     <ALTER> <CATALOG> { startPos = getPos(); }
     catalogName = SimpleIdentifier()
     <SET>
-    propertyList = TableProperties()
+    propertyList = Properties()
     {
         return new SqlAlterCatalogOptions(startPos.plus(getPos()),
                     catalogName,
@@ -262,7 +262,7 @@ SqlCreate SqlCreateDatabase(Span s, boolean replace) :
     ]
     [
         <WITH>
-        propertyList = TableProperties()
+        propertyList = Properties()
     ]
 
     { return new SqlCreateDatabase(startPos.plus(getPos()),
@@ -283,7 +283,7 @@ SqlAlterDatabase SqlAlterDatabase() :
     <ALTER> <DATABASE> { startPos = getPos(); }
     databaseName = CompoundIdentifier()
     <SET>
-    propertyList = TableProperties()
+    propertyList = Properties()
     {
         return new SqlAlterDatabase(startPos.plus(getPos()),
                     databaseName,
@@ -712,9 +712,35 @@ SqlShowCreate SqlShowCreate() :
         {
             return new SqlShowCreateCatalog(pos, sqlIdentifier);
         }
+    |
+        <MODEL>
+        { pos = getPos(); }
+        sqlIdentifier = CompoundIdentifier()
+        {
+            return new SqlShowCreateModel(pos, sqlIdentifier);
+        }
     )
 }
 
+/**
+ * DESCRIBE | DESC MODEL [ EXTENDED] [[catalogName.] dataBasesName].modelName 
sql call.
+ * Here we add Rich in className to match the naming of SqlRichDescribeTable.
+ */
+SqlRichDescribeModel SqlRichDescribeModel() :
+{
+    SqlIdentifier modelName;
+    SqlParserPos pos;
+    boolean isExtended = false;
+}
+{
+    ( <DESCRIBE> | <DESC> ) <MODEL> { pos = getPos();}
+    [ <EXTENDED> { isExtended = true;} ]
+    modelName = CompoundIdentifier()
+    {
+        return new SqlRichDescribeModel(pos, modelName, isExtended);
+    }
+}
+
 /**
  * DESCRIBE | DESC [ EXTENDED] [[catalogName.] dataBasesName].tableName sql 
call.
  * Here we add Rich in className to distinguish from calcite's original 
SqlDescribeTable.
@@ -782,7 +808,7 @@ SqlAlterTable SqlAlterTable() :
         }
     |
         <RESET>
-        propertyKeyList = TablePropertyKeys()
+        propertyKeyList = PropertyKeys()
         {
             return new SqlAlterTableReset(
                         startPos.plus(getPos()),
@@ -792,7 +818,7 @@ SqlAlterTable SqlAlterTable() :
         }
     |
         <SET>
-        propertyList = TableProperties()
+        propertyList = Properties()
         {
             return new SqlAlterTableOptions(
                         startPos.plus(getPos()),
@@ -926,7 +952,7 @@ SqlAlterTable SqlAlterTable() :
 }
 
 /** Parse a table option key list. */
-SqlNodeList TablePropertyKeys():
+SqlNodeList PropertyKeys():
 {
     SqlNode key;
     final List<SqlNode> proKeyList = new ArrayList<SqlNode>();
@@ -1115,7 +1141,7 @@ void  
AlterTableAddPartition(AlterTableAddPartitionContext context) :
             partProp = null;
             PartitionSpecCommaList(partSpec);
         }
-        [ <WITH> { partProp = TableProperties(); } ]
+        [ <WITH> { partProp = Properties(); } ]
         {
             partSpecs.add(partSpec);
             partProps.add(partProp);
@@ -1360,8 +1386,8 @@ SqlNode TableOption() :
     }
 }
 
-/** Parse a table properties. */
-SqlNodeList TableProperties():
+/** Parse properties such as ('k' = 'v'). */
+SqlNodeList Properties():
 {
     SqlNode property;
     final List<SqlNode> proList = new ArrayList<SqlNode>();
@@ -1482,7 +1508,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean 
isTemporary) :
     ]
     [
         <WITH>
-        propertyList = TableProperties()
+        propertyList = Properties()
     ]
     [
         <LIKE>
@@ -1689,7 +1715,7 @@ SqlNode SqlReplaceTable() :
     ]
     [
         <WITH>
-        propertyList = TableProperties()
+        propertyList = Properties()
     ]
     <AS>
     asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
@@ -1759,7 +1785,7 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean 
replace, boolean isTemporar
     ]
     [
         <WITH>
-        propertyList = TableProperties()
+        propertyList = Properties()
     ]
     <FRESHNESS> <EQ>
     freshness = Expression(ExprContext.ACCEPT_NON_QUERY)
@@ -1851,7 +1877,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
             }
         |
         <RESUME>
-        [ <WITH> propertyList = TableProperties() ]
+        [ <WITH> propertyList = Properties() ]
             {
                 return new SqlAlterMaterializedTableResume(
                     startPos,
@@ -1903,7 +1929,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
                 }
             )
             |
-            propertyList = TableProperties()
+            propertyList = Properties()
             {
                 return new SqlAlterMaterializedTableOptions(
                     startPos.plus(getPos()),
@@ -1913,7 +1939,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
         )
         |
         <RESET>
-            propertyKeyList = TablePropertyKeys()
+            propertyKeyList = PropertyKeys()
             {
                 return new SqlAlterMaterializedTableReset(
                     startPos.plus(getPos()),
@@ -2437,6 +2463,8 @@ SqlCreate SqlCreateExtended(Span s, boolean replace) :
         create = SqlCreateDatabase(s, replace)
         |
         create = SqlCreateFunction(s, replace, isTemporary)
+        |
+        create = SqlCreateModel(s, isTemporary)
     )
     {
         return create;
@@ -2464,6 +2492,8 @@ SqlDrop SqlDropExtended(Span s, boolean replace) :
         drop = SqlDropDatabase(s, replace)
         |
         drop = SqlDropFunction(s, replace, isTemporary)
+        |
+        drop = SqlDropModel(s, isTemporary)
     )
     {
         return drop;
@@ -2504,7 +2534,7 @@ SqlLoadModule SqlLoadModule() :
     moduleName = SimpleIdentifier()
     [
         <WITH>
-        propertyList = TableProperties()
+        propertyList = Properties()
     ]
     {
         return new SqlLoadModule(startPos.plus(getPos()),
@@ -3089,3 +3119,187 @@ SqlTruncateTable SqlTruncateTable() :
         return new SqlTruncateTable(getPos(), sqlIdentifier);
     }
 }
+
+/**
+* SHOW MODELS [FROM [catalog.] database] sql call.
+*/
+SqlShowModels SqlShowModels() :
+{
+    SqlIdentifier databaseName = null;
+    SqlCharStringLiteral likeLiteral = null;
+    String prep = null;
+    boolean notLike = false;
+    SqlParserPos pos;
+}
+{
+    <SHOW> <MODELS>
+    { pos = getPos(); }
+    [
+        ( <FROM> { prep = "FROM"; } | <IN> { prep = "IN"; } )
+        { pos = getPos(); }
+        databaseName = CompoundIdentifier()
+    ]
+    [
+        [
+            <NOT>
+            {
+                notLike = true;
+            }
+        ]
+        <LIKE>  <QUOTED_STRING>
+        {
+            String likeCondition = SqlParserUtil.parseString(token.image);
+            likeLiteral = SqlLiteral.createCharString(likeCondition, getPos());
+        }
+    ]
+    {
+        return new SqlShowModels(pos, prep, databaseName, notLike, 
likeLiteral);
+    }
+}
+
+/**
+* ALTER MODEL [IF EXISTS] modelName SET (property_key = property_val, ...)
+* ALTER MODEL [IF EXISTS] modelName RENAME TO newModelName
+*/
+SqlAlterModel SqlAlterModel() :
+{
+    SqlParserPos startPos;
+    boolean ifExists = false;
+    SqlIdentifier modelIdentifier;
+    SqlIdentifier newModelIdentifier = null;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
+}
+{
+    <ALTER> <MODEL> { startPos = getPos(); }
+    ifExists = IfExistsOpt()
+    modelIdentifier = CompoundIdentifier()
+    (
+        LOOKAHEAD(2)
+        <RENAME> <TO>
+        newModelIdentifier = CompoundIdentifier()
+        {
+            return new SqlAlterModel(
+                        startPos.plus(getPos()),
+                        modelIdentifier,
+                        newModelIdentifier,
+                        ifExists);
+        }
+    |
+        <SET>
+        propertyList = Properties()
+        {
+            return new SqlAlterModel(
+                        startPos.plus(getPos()),
+                        modelIdentifier,
+                        propertyList,
+                        ifExists);
+        }
+    )
+}
+
+/**
+* DROP MODEL [IF EXIST] modelName
+*/
+SqlDrop SqlDropModel(Span s, boolean isTemporary) :
+{
+    SqlIdentifier modelIdentifier = null;
+    boolean ifExists = false;
+}
+{
+    <MODEL>
+
+    ifExists = IfExistsOpt()
+
+    modelIdentifier = CompoundIdentifier()
+
+    {
+         return new SqlDropModel(s.pos(), modelIdentifier, ifExists, 
isTemporary);
+    }
+}
+
+/**
+* CREATE MODEL [IF NOT EXIST] modelName
+* [INPUT(col1 type1, col2 type2, ...)]
+* [OUTPUT(col3 type1, col4 type4, ...)]
+* [COMMENT model_comment]
+* WITH (option_key = option_val, ...)
+* [AS SELECT ...]
+*/
+SqlCreate SqlCreateModel(Span s, boolean isTemporary) :
+{
+    final SqlParserPos startPos = s.pos();
+    boolean ifNotExists = false;
+    SqlIdentifier modelIdentifier;
+    SqlNodeList inputColumnList = SqlNodeList.EMPTY;
+    SqlNodeList outputColumnList = SqlNodeList.EMPTY;
+    SqlCharStringLiteral comment = null;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
+    SqlNode asQuery = null;
+    SqlParserPos pos = startPos;
+}
+{
+    <MODEL>
+
+    ifNotExists = IfNotExistsOpt()
+
+    modelIdentifier = CompoundIdentifier()
+    [
+        <INPUT> <LPAREN> { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
+        TableColumn(ctx)
+        (
+            <COMMA> TableColumn(ctx)
+        )*
+        {
+            pos = pos.plus(getPos());
+            inputColumnList = new SqlNodeList(ctx.columnList, pos);
+        }
+        <RPAREN>
+    ]
+    [
+        <OUTPUT> <LPAREN> { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
+        TableColumn(ctx)
+        (
+            <COMMA> TableColumn(ctx)
+        )*
+        {
+            pos = pos.plus(getPos());
+            outputColumnList = new SqlNodeList(ctx.columnList, pos);
+        }
+        <RPAREN>
+    ]
+    [ <COMMENT> <QUOTED_STRING>
+        {
+            String p = SqlParserUtil.parseString(token.image);
+            comment = SqlLiteral.createCharString(p, getPos());
+        }
+    ]
+    [
+        <WITH>
+        propertyList = Properties()
+    ]
+    [
+        <AS>
+        asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+        {
+            return new SqlCreateModelAs(startPos.plus(getPos()),
+                modelIdentifier,
+                comment,
+                inputColumnList,
+                outputColumnList,
+                propertyList,
+                asQuery,
+                isTemporary,
+                ifNotExists);
+        }
+    ]
+    {
+        return new SqlCreateModel(startPos.plus(getPos()),
+            modelIdentifier,
+            comment,
+            inputColumnList,
+            outputColumnList,
+            propertyList,
+            isTemporary,
+            ifNotExists);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModel.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModel.java
new file mode 100644
index 00000000000..f09e9e04a8e
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModel.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * ALTER MODEL [IF EXISTS] [[catalogName.] dataBasesName].modelName SET ( 
name=value [,
+ * name=value]*).
+ */
+public class SqlAlterModel extends SqlCall {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("ALTER MODEL", SqlKind.OTHER_DDL);
+
+    protected final SqlIdentifier modelName;
+    protected final SqlIdentifier newModelName;
+    protected final boolean ifModelExists;
+    private final SqlNodeList propertyList;
+
+    public SqlAlterModel(
+            SqlParserPos pos,
+            SqlIdentifier modelName,
+            SqlNodeList propertyList,
+            boolean ifModelExists) {
+        super(pos);
+        this.modelName = requireNonNull(modelName, "modelName should not be 
null");
+        this.newModelName = null;
+        this.propertyList = requireNonNull(propertyList, "propertyList should 
not be null");
+        this.ifModelExists = ifModelExists;
+    }
+
+    public SqlAlterModel(
+            SqlParserPos pos,
+            SqlIdentifier modelName,
+            SqlIdentifier newModelName,
+            boolean ifModelExists) {
+        super(pos);
+        this.modelName = requireNonNull(modelName, "modelName should not be 
null");
+        this.newModelName = requireNonNull(newModelName, "newModelName should 
not be null");
+        this.propertyList = null;
+        this.ifModelExists = ifModelExists;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    public SqlIdentifier getModelName() {
+        return modelName;
+    }
+
+    public String[] fullModelName() {
+        return modelName.names.toArray(new String[0]);
+    }
+
+    /**
+     * Whether to ignore the error if the model doesn't exist.
+     *
+     * @return true when IF EXISTS is specified.
+     */
+    public boolean ifModelExists() {
+        return ifModelExists;
+    }
+
+    public SqlIdentifier getNewModelName() {
+        return newModelName;
+    }
+
+    public String[] fullNewModelName() {
+        if (newModelName != null) {
+            return newModelName.names.toArray(new String[0]);
+        }
+        return new String[0];
+    }
+
+    public SqlNodeList getPropertyList() {
+        return propertyList;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        // Rename Model.
+        if (newModelName != null) {
+            return ImmutableNullableList.of(modelName, newModelName);
+        }
+        return ImmutableNullableList.of(modelName, propertyList);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("ALTER MODEL");
+        if (ifModelExists) {
+            writer.keyword("IF EXISTS");
+        }
+        modelName.unparse(writer, leftPrec, rightPrec);
+        if (newModelName != null) {
+            // Rename Model.
+            writer.keyword("RENAME TO");
+            newModelName.unparse(writer, leftPrec, rightPrec);
+        } else {
+            writer.keyword("SET");
+            SqlWriter.Frame withFrame = writer.startList("(", ")");
+            if (propertyList != null) {
+                for (SqlNode modelOption : propertyList) {
+                    SqlUnparseUtils.printIndent(writer);
+                    modelOption.unparse(writer, leftPrec, rightPrec);
+                }
+            }
+            writer.newlineAndIndent();
+            writer.endList(withFrame);
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java
new file mode 100644
index 00000000000..1e2ea31fcc6
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link SqlNode} to describe the CREATE MODEL syntax. CREATE MODEL [IF NOT 
EXISTS] [[catalogName.]
+ * dataBasesName].modelName WITH (name=value, [name=value]*).
+ */
+public class SqlCreateModel extends SqlCreate implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("CREATE MODEL", SqlKind.OTHER_DDL);
+
+    private final SqlIdentifier modelName;
+
+    @Nullable private final SqlCharStringLiteral comment;
+
+    private final SqlNodeList inputColumnList;
+
+    private final SqlNodeList outputColumnList;
+
+    private final SqlNodeList propertyList;
+
+    private final boolean isTemporary;
+
+    private final boolean ifNotExists;
+
+    public SqlCreateModel(
+            SqlParserPos pos,
+            SqlIdentifier modelName,
+            SqlCharStringLiteral comment,
+            SqlNodeList inputColumnList,
+            SqlNodeList outputColumnList,
+            SqlNodeList propertyList,
+            boolean isTemporary,
+            boolean ifNotExists) {
+        super(OPERATOR, pos, false, ifNotExists);
+        this.modelName = requireNonNull(modelName, "modelName should not be 
null");
+        this.comment = comment;
+        this.inputColumnList = inputColumnList;
+        this.outputColumnList = outputColumnList;
+        this.propertyList = requireNonNull(propertyList, "propertyList should 
not be null");
+        this.isTemporary = isTemporary;
+        this.ifNotExists = ifNotExists;
+    }
+
+    @Override
+    public @Nonnull SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public @Nonnull List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(
+                modelName, comment, inputColumnList, outputColumnList, 
propertyList);
+    }
+
+    public SqlIdentifier getModelName() {
+        return modelName;
+    }
+
+    public Optional<SqlCharStringLiteral> getComment() {
+        return Optional.ofNullable(comment);
+    }
+
+    public SqlNodeList getInputColumnList() {
+        return inputColumnList;
+    }
+
+    public SqlNodeList getOutputColumnList() {
+        return outputColumnList;
+    }
+
+    public SqlNodeList getPropertyList() {
+        return propertyList;
+    }
+
+    public boolean isTemporary() {
+        return isTemporary;
+    }
+
+    public boolean isIfNotExists() {
+        return ifNotExists;
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        if (!inputColumnList.isEmpty() && outputColumnList.isEmpty()) {
+            throw new SqlValidateException(
+                    inputColumnList.get(0).getParserPosition(),
+                    "Output column list can not be empty with non-empty input 
column list.");
+        }
+        if (inputColumnList.isEmpty() && !outputColumnList.isEmpty()) {
+            throw new SqlValidateException(
+                    outputColumnList.get(0).getParserPosition(),
+                    "Input column list can not be empty with non-empty output 
column list.");
+        }
+        if (propertyList.isEmpty()) {
+            throw new SqlValidateException(
+                    getParserPosition(), "Model property list can not be 
empty.");
+        }
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("CREATE");
+        writer.keyword("MODEL");
+        if (isIfNotExists()) {
+            writer.keyword("IF NOT EXISTS");
+        }
+        modelName.unparse(writer, leftPrec, rightPrec);
+        if (!inputColumnList.isEmpty()) {
+            writer.keyword("INPUT");
+            SqlWriter.Frame withFrame = writer.startList("(", ")");
+            for (SqlNode column : inputColumnList) {
+                SqlUnparseUtils.printIndent(writer);
+                column.unparse(writer, leftPrec, rightPrec);
+            }
+            writer.newlineAndIndent();
+            writer.endList(withFrame);
+        }
+
+        if (!outputColumnList.isEmpty()) {
+            writer.keyword("OUTPUT");
+            SqlWriter.Frame withFrame = writer.startList("(", ")");
+            for (SqlNode column : outputColumnList) {
+                SqlUnparseUtils.printIndent(writer);
+                column.unparse(writer, leftPrec, rightPrec);
+            }
+            writer.newlineAndIndent();
+            writer.endList(withFrame);
+        }
+
+        if (comment != null) {
+            writer.newlineAndIndent();
+            writer.keyword("COMMENT");
+            comment.unparse(writer, leftPrec, rightPrec);
+        }
+
+        if (!this.propertyList.isEmpty()) {
+            writer.keyword("WITH");
+            SqlWriter.Frame withFrame = writer.startList("(", ")");
+            for (SqlNode modelProperty : propertyList) {
+                SqlUnparseUtils.printIndent(writer);
+                modelProperty.unparse(writer, leftPrec, rightPrec);
+            }
+            writer.newlineAndIndent();
+            writer.endList(withFrame);
+        }
+    }
+
+    public String[] fullModelName() {
+        return modelName.names.toArray(new String[0]);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModelAs.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModelAs.java
new file mode 100644
index 00000000000..c1ec69efe20
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModelAs.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link SqlNode} to describe the CREATE MODEL AS syntax. The CTAS would 
create a pipeline to
+ * compute the result of the given query and use the data to train the model.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * CREATE MODEL my_model WITH (name=value, [name=value]*)
+ * ) AS SELECT col1, col2, label FROM base_table;
+ * }</pre>
+ */
+public class SqlCreateModelAs extends SqlCreateModel {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("CREATE MODEL AS", SqlKind.OTHER_DDL);
+
+    private final SqlNode asQuery;
+
+    public SqlCreateModelAs(
+            SqlParserPos pos,
+            SqlIdentifier modelName,
+            SqlCharStringLiteral comment,
+            SqlNodeList inputColumnList,
+            SqlNodeList outputColumnList,
+            SqlNodeList propertyList,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists) {
+        super(
+                pos,
+                modelName,
+                comment,
+                inputColumnList,
+                outputColumnList,
+                propertyList,
+                isTemporary,
+                ifNotExists);
+        this.asQuery =
+                requireNonNull(asQuery, "As clause is required for CREATE 
MODEL AS SELECT DDL");
+    }
+
+    @Override
+    public @Nonnull SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public @Nonnull List<SqlNode> getOperandList() {
+        return ImmutableNullableList.<SqlNode>builder()
+                .addAll(super.getOperandList())
+                .add(asQuery)
+                .build();
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        if (!getInputColumnList().isEmpty()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    "CREATE MODEL AS SELECT syntax does not support to specify 
explicit input columns.");
+        }
+        if (!getOutputColumnList().isEmpty()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    "CREATE MODEL AS SELECT syntax does not support to specify 
explicit output columns.");
+        }
+        super.validate();
+    }
+
+    public SqlNode getAsQuery() {
+        return asQuery;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+
+        writer.newlineAndIndent();
+        writer.keyword("AS");
+        writer.newlineAndIndent();
+        this.asQuery.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
new file mode 100644
index 00000000000..ac2a9acb78c
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDrop;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/**
+ * {@link SqlNode} to describe the DROP MODEL [IF EXISTS] [[catalogName.] 
dataBasesName].modelName
+ * syntax.
+ */
+public class SqlDropModel extends SqlDrop {
+    private static final SqlOperator OPERATOR =
+            new SqlSpecialOperator("DROP MODEL", SqlKind.OTHER_DDL);
+
+    private SqlIdentifier modelName;
+    private boolean ifExists;
+    private boolean isTemporary;
+
+    public SqlDropModel(
+            SqlParserPos pos, SqlIdentifier modelName, boolean ifExists, 
boolean isTemporary) {
+        super(OPERATOR, pos, ifExists);
+        this.modelName = modelName;
+        this.ifExists = ifExists;
+        this.isTemporary = isTemporary;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(modelName);
+    }
+
+    public SqlIdentifier getModelName() {
+        return modelName;
+    }
+
+    public void setModelName(SqlIdentifier modelName) {
+        this.modelName = modelName;
+    }
+
+    public boolean getIfExists() {
+        return this.ifExists;
+    }
+
+    public boolean getIsTemporary() {
+        return this.isTemporary;
+    }
+
+    public void setIfExists(boolean ifExists) {
+        this.ifExists = ifExists;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("DROP");
+        writer.keyword("MODEL");
+        if (ifExists) {
+            writer.keyword("IF EXISTS");
+        }
+        modelName.unparse(writer, leftPrec, rightPrec);
+    }
+
+    public String[] fullModelName() {
+        return modelName.names.toArray(new String[0]);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeModel.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeModel.java
new file mode 100644
index 00000000000..b756122cd1b
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeModel.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * DESCRIBE MODEL [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier sql 
call. Here we add Rich
+ * in className to follow the convention of {@link 
org.apache.calcite.sql.SqlDescribeTable}, which
+ * only had it to distinguish from calcite's original SqlDescribeTable, even 
though calcite does not
+ * have SqlDescribeModel.
+ */
+public class SqlRichDescribeModel extends SqlCall {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("DESCRIBE MODEL", SqlKind.OTHER);
+    protected final SqlIdentifier modelNameIdentifier;
+    private boolean isExtended;
+
+    public SqlRichDescribeModel(
+            SqlParserPos pos, SqlIdentifier modelNameIdentifier, boolean 
isExtended) {
+        super(pos);
+        this.modelNameIdentifier = modelNameIdentifier;
+        this.isExtended = isExtended;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return Collections.singletonList(modelNameIdentifier);
+    }
+
+    public boolean isExtended() {
+        return isExtended;
+    }
+
+    public String[] fullModelName() {
+        return modelNameIdentifier.names.toArray(new String[0]);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("DESCRIBE MODEL");
+        if (isExtended) {
+            writer.keyword("EXTENDED");
+        }
+        modelNameIdentifier.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateModel.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateModel.java
new file mode 100644
index 00000000000..54b58b651e6
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateModel.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/** SHOW CREATE MODEL sql call. */
+public class SqlShowCreateModel extends SqlShowCreate {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("SHOW CREATE MODEL", SqlKind.OTHER_DDL);
+
+    public SqlShowCreateModel(SqlParserPos pos, SqlIdentifier modelName) {
+        super(pos, modelName);
+    }
+
+    public SqlIdentifier getModelName() {
+        return sqlIdentifier;
+    }
+
+    public String[] getFullModelName() {
+        return sqlIdentifier.names.toArray(new String[0]);
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return Collections.singletonList(sqlIdentifier);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("SHOW CREATE MODEL");
+        sqlIdentifier.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowModels.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowModels.java
new file mode 100644
index 00000000000..af11de261bf
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowModels.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/** {@link SqlNode} to describe the SHOW MODELS syntax. */
+public class SqlShowModels extends SqlCall {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("SHOW MODELS", SqlKind.OTHER);
+
+    protected final SqlIdentifier databaseName;
+    protected final String preposition;
+    protected final boolean notLike;
+    protected final SqlCharStringLiteral likeLiteral;
+
+    public SqlShowModels(SqlParserPos pos) {
+        super(pos);
+        this.preposition = null;
+        this.databaseName = null;
+        this.notLike = false;
+        this.likeLiteral = null;
+    }
+
+    public SqlShowModels(
+            SqlParserPos pos,
+            String preposition,
+            SqlIdentifier databaseName,
+            boolean notLike,
+            SqlCharStringLiteral likeLiteral) {
+        super(pos);
+        this.preposition = preposition;
+        this.databaseName =
+                preposition != null
+                        ? requireNonNull(databaseName, "Database name must not 
be null.")
+                        : null;
+        this.notLike = notLike;
+        this.likeLiteral = likeLiteral;
+    }
+
+    public String getLikeSqlPattern() {
+        return Objects.isNull(this.likeLiteral) ? null : 
likeLiteral.getValueAs(String.class);
+    }
+
+    public boolean isNotLike() {
+        return notLike;
+    }
+
+    public SqlCharStringLiteral getLikeLiteral() {
+        return likeLiteral;
+    }
+
+    public boolean isWithLike() {
+        return Objects.nonNull(likeLiteral);
+    }
+
+    public String getPreposition() {
+        return preposition;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return Objects.isNull(this.databaseName)
+                ? Collections.emptyList()
+                : Collections.singletonList(databaseName);
+    }
+
+    public String[] fullDatabaseName() {
+        return Objects.isNull(this.databaseName)
+                ? new String[] {}
+                : databaseName.names.toArray(new String[0]);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        if (this.preposition == null) {
+            writer.keyword("SHOW MODELS");
+        } else if (databaseName != null) {
+            writer.keyword("SHOW MODELS " + this.preposition);
+            databaseName.unparse(writer, leftPrec, rightPrec);
+        }
+        if (isWithLike()) {
+            if (isNotLike()) {
+                writer.keyword(String.format("NOT LIKE '%s'", 
getLikeSqlPattern()));
+            } else {
+                writer.keyword(String.format("LIKE '%s'", 
getLikeSqlPattern()));
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 3397485635a..a6c2bab9b80 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -394,6 +394,12 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                 .fails("(?s).*Encountered \"likes\" at line 1, column 
22.\n.*");
     }
 
+    @Test
+    void testShowCreateModel() {
+        sql("show create model m1").ok("SHOW CREATE MODEL `M1`");
+        sql("show create model catalog1.db1.m1").ok("SHOW CREATE MODEL 
`CATALOG1`.`DB1`.`M1`");
+    }
+
     @Test
     void testShowCreateTable() {
         sql("show create table tbl").ok("SHOW CREATE TABLE `TBL`");
@@ -418,6 +424,15 @@ class FlinkSqlParserImplTest extends SqlParserTest {
         sql("desc extended db1").ok("DESCRIBE EXTENDED `DB1`");
     }
 
+    @Test
+    void testDescribeModel() {
+        sql("describe model mdl").ok("DESCRIBE MODEL `MDL`");
+        sql("describe model catalog1.db1.mdl").ok("DESCRIBE MODEL 
`CATALOG1`.`DB1`.`MDL`");
+
+        sql("desc model mdl").ok("DESCRIBE MODEL `MDL`");
+        sql("desc model catalog1.db1.mdl").ok("DESCRIBE MODEL 
`CATALOG1`.`DB1`.`MDL`");
+    }
+
     @Test
     void testShowColumns() {
         sql("show columns from tbl").ok("SHOW COLUMNS FROM `TBL`");
@@ -3002,6 +3017,196 @@ class FlinkSqlParserImplTest extends SqlParserTest {
         };
     }
 
+    @Test
+    void testShowModels() {
+        sql("show models").ok("SHOW MODELS");
+        sql("show models from db1").ok("SHOW MODELS FROM `DB1`");
+        sql("show models from catalog1.db1").ok("SHOW MODELS FROM 
`CATALOG1`.`DB1`");
+        sql("show models in db1").ok("SHOW MODELS IN `DB1`");
+        sql("show models in catalog1.db1").ok("SHOW MODELS IN 
`CATALOG1`.`DB1`");
+    }
+
+    @Test
+    void testDropModel() {
+        sql("drop model m1").ok("DROP MODEL `M1`");
+        sql("drop model db1.m1").ok("DROP MODEL `DB1`.`M1`");
+        sql("drop model catalog1.db1.m1").ok("DROP MODEL 
`CATALOG1`.`DB1`.`M1`");
+    }
+
+    @Test
+    void testDropModelIfExists() {
+        sql("drop model if exists catalog1.db1.m1")
+                .ok("DROP MODEL IF EXISTS `CATALOG1`.`DB1`.`M1`");
+    }
+
+    @Test
+    void testAlterModel() {
+        final String sql = "alter model m1 set ('key1' = 'value1','key2' = 
'value2')";
+        final String expected =
+                "ALTER MODEL `M1` SET (\n"
+                        + "  'key1' = 'value1',\n"
+                        + "  'key2' = 'value2'\n"
+                        + ")";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testAlterModelIfExists() {
+        final String sql = "alter model if exists m1 set ('key1' = 
'value1','key2' = 'value2')";
+        final String expected =
+                "ALTER MODEL IF EXISTS `M1` SET (\n"
+                        + "  'key1' = 'value1',\n"
+                        + "  'key2' = 'value2'\n"
+                        + ")";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testAlterModelRename() {
+        final String sql = "alter model m1 rename to m2";
+        final String expected = "ALTER MODEL `M1` RENAME TO `M2`";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testAlterModelRenameIfExists() {
+        final String sql = "alter model if exists m1 rename to m2";
+        final String expected = "ALTER MODEL IF EXISTS `M1` RENAME TO `M2`";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testCreateModel() {
+        sql("create model m1\n"
+                        + " INPUT(col1 INT, col2 STRING)\n"
+                        + " OUTPUT(label DOUBLE)\n"
+                        + " COMMENT 'model_comment'\n"
+                        + " WITH (\n"
+                        + "  'key1'='value1',\n"
+                        + "  'key2'='value2'\n"
+                        + " )\n")
+                .ok(
+                        "CREATE MODEL `M1` INPUT (\n"
+                                + "  `COL1` INTEGER,\n"
+                                + "  `COL2` STRING\n"
+                                + ") OUTPUT (\n"
+                                + "  `LABEL` DOUBLE\n"
+                                + ")\n"
+                                + "COMMENT 'model_comment' WITH (\n"
+                                + "  'key1' = 'value1',\n"
+                                + "  'key2' = 'value2'\n"
+                                + ")");
+    }
+
+    @Test
+    void testCreateModelIfNotExists() {
+        sql("create model if not exists m1\n"
+                        + " INPUT(col1 INT, col2 STRING)\n"
+                        + " OUTPUT(label DOUBLE)\n"
+                        + " COMMENT 'model_comment'\n"
+                        + " WITH (\n"
+                        + "  'key1'='value1',\n"
+                        + "  'key2'='value2'\n"
+                        + " )\n")
+                .ok(
+                        "CREATE MODEL IF NOT EXISTS `M1` INPUT (\n"
+                                + "  `COL1` INTEGER,\n"
+                                + "  `COL2` STRING\n"
+                                + ") OUTPUT (\n"
+                                + "  `LABEL` DOUBLE\n"
+                                + ")\n"
+                                + "COMMENT 'model_comment' WITH (\n"
+                                + "  'key1' = 'value1',\n"
+                                + "  'key2' = 'value2'\n"
+                                + ")");
+    }
+
+    @Test
+    void testCreateModelAs() {
+        sql("create model m1\n"
+                        + " WITH (\n"
+                        + "  'key1'='value1',\n"
+                        + "  'key2'='value2'\n"
+                        + " ) as select f1, f2 from t1\n")
+                .ok(
+                        "CREATE MODEL `M1` WITH (\n"
+                                + "  'key1' = 'value1',\n"
+                                + "  'key2' = 'value2'\n"
+                                + ")\n"
+                                + "AS\n"
+                                + "SELECT `F1`, `F2`\n"
+                                + "FROM `T1`");
+    }
+
+    @Test
+    void testCreateModelAsIfNotExists() {
+        sql("create model if not exists m1\n"
+                        + " WITH (\n"
+                        + "  'key1'='value1',\n"
+                        + "  'key2'='value2'\n"
+                        + " ) as select f1, f2 from t1\n")
+                .ok(
+                        "CREATE MODEL IF NOT EXISTS `M1` WITH (\n"
+                                + "  'key1' = 'value1',\n"
+                                + "  'key2' = 'value2'\n"
+                                + ")\n"
+                                + "AS\n"
+                                + "SELECT `F1`, `F2`\n"
+                                + "FROM `T1`");
+    }
+
+    @Test
+    void testCreateModelAsWithInput() {
+        sql("create model if not exists m1\n"
+                        + " INPUT(col1 INT, col2 STRING)\n"
+                        + " OUTPUT(label DOUBLE)\n"
+                        + " WITH (\n"
+                        + "  'key1'='value1',\n"
+                        + "  'key2'='value2'\n"
+                        + " ) as select f1, f2 from t1\n")
+                .ok(
+                        "CREATE MODEL IF NOT EXISTS `M1` INPUT (\n"
+                                + "  `COL1` INTEGER,\n"
+                                + "  `COL2` STRING\n"
+                                + ") OUTPUT (\n"
+                                + "  `LABEL` DOUBLE\n"
+                                + ") WITH (\n"
+                                + "  'key1' = 'value1',\n"
+                                + "  'key2' = 'value2'\n"
+                                + ")\n"
+                                + "AS\n"
+                                + "SELECT `F1`, `F2`\n"
+                                + "FROM `T1`")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "CREATE MODEL AS SELECT syntax does 
not support to specify explicit input columns."));
+    }
+
+    @Test
+    void testCreateModelAsWithOutput() {
+        sql("create model if not exists m1\n"
+                        + " OUTPUT(label DOUBLE)\n"
+                        + " WITH (\n"
+                        + "  'key1'='value1',\n"
+                        + "  'key2'='value2'\n"
+                        + " ) as select f1, f2 from t1\n")
+                .ok(
+                        "CREATE MODEL IF NOT EXISTS `M1` OUTPUT (\n"
+                                + "  `LABEL` DOUBLE\n"
+                                + ") WITH (\n"
+                                + "  'key1' = 'value1',\n"
+                                + "  'key2' = 'value2'\n"
+                                + ")\n"
+                                + "AS\n"
+                                + "SELECT `F1`, `F2`\n"
+                                + "FROM `T1`")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "CREATE MODEL AS SELECT syntax does 
not support to specify explicit output columns."));
+    }
+
     /** Matcher that invokes the #validate() of the {@link ExtendedSqlNode} 
instance. * */
     private static class ValidationMatcher extends BaseMatcher<SqlNode> {
         private String expectedColumnSql;
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 7ad24920c3e..4420ec96372 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -138,6 +138,7 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowDatabases]
         || sqlNode.isInstanceOf[SqlShowCurrentDatabase]
         || sqlNode.isInstanceOf[SqlShowTables]
+        || sqlNode.isInstanceOf[SqlShowModels]
         || sqlNode.isInstanceOf[SqlShowFunctions]
         || sqlNode.isInstanceOf[SqlShowJars]
         || sqlNode.isInstanceOf[SqlShowModules]
@@ -147,6 +148,7 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowProcedures]
         || sqlNode.isInstanceOf[SqlShowJobs]
         || sqlNode.isInstanceOf[SqlDescribeJob]
+        || sqlNode.isInstanceOf[SqlRichDescribeModel]
         || sqlNode.isInstanceOf[SqlRichDescribeTable]
         || sqlNode.isInstanceOf[SqlUnloadModule]
         || sqlNode.isInstanceOf[SqlUseModules]
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index e4203d8361a..a894698d492 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -34,6 +34,7 @@ under the License.
                <suppress files="NoticeFileCheckerTest.java" 
checks="IllegalImport"/>
                <suppress files="DependencyTree.java" checks="IllegalImport"/>
 
+               <suppress files="FlinkSqlParserImplTest.java" 
checks="FileLength"/>
                <suppress files="JoinOperator.java" checks="FileLength"/>
                <suppress files="WindowOperatorTest.java" checks="FileLength"/>
                <suppress files="WindowOperatorContractTest.java" 
checks="FileLength"/>

Reply via email to