This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 321b17f4d15 [FLINK-38260][table] Add parser changes for connection sql
321b17f4d15 is described below

commit 321b17f4d15be28c8d6295c383fba9c63bc1cdb8
Author: Hao Li <[email protected]>
AuthorDate: Mon Dec 1 15:35:37 2025 -0800

    [FLINK-38260][table] Add parser changes for connection sql
---
 .../src/main/codegen/data/Parser.tdd               |  13 ++
 .../src/main/codegen/includes/parserImpls.ftl      | 216 ++++++++++++++++-
 .../org/apache/flink/sql/parser/SqlParseUtils.java |  18 +-
 .../{SqlDropModel.java => SqlAlterConnection.java} |  38 +--
 ...delReset.java => SqlAlterConnectionRename.java} |  42 ++--
 ...odelReset.java => SqlAlterConnectionReset.java} |  18 +-
 ...rModelReset.java => SqlAlterConnectionSet.java} |  38 +--
 .../flink/sql/parser/ddl/SqlAlterModelReset.java   |   5 +-
 .../flink/sql/parser/ddl/SqlAlterTableReset.java   |   5 +-
 .../flink/sql/parser/ddl/SqlAnalyzeTable.java      |   2 +-
 .../flink/sql/parser/ddl/SqlCreateConnection.java  | 110 +++++++++
 .../sql/parser/ddl/SqlCreateMaterializedTable.java |   2 +-
 .../flink/sql/parser/ddl/SqlCreateTable.java       |   2 +-
 .../{SqlDropModel.java => SqlDropConnection.java}  |  37 ++-
 .../apache/flink/sql/parser/ddl/SqlDropModel.java  |   7 +-
 .../sql/parser/dql/SqlRichDescribeConnection.java  |  79 +++++++
 .../SqlShowConnections.java}                       |  48 ++--
 .../SqlShowCreateConnection.java}                  |  45 ++--
 .../flink/sql/parser/utils/ParserResource.java     |  10 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 256 ++++++++++++++++++++-
 .../converters/SqlDropModelConverter.java          |   2 +-
 .../planner/utils/OperationConverterUtils.java     |   4 +-
 22 files changed, 844 insertions(+), 153 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 49ffde6d757..1585a8b1e04 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -39,6 +39,10 @@
     "org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions"
     "org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset"
     "org.apache.flink.sql.parser.ddl.SqlAlterCatalogComment"
+    "org.apache.flink.sql.parser.ddl.SqlAlterConnection"
+    "org.apache.flink.sql.parser.ddl.SqlAlterConnectionRename"
+    "org.apache.flink.sql.parser.ddl.SqlAlterConnectionReset"
+    "org.apache.flink.sql.parser.ddl.SqlAlterConnectionSet"
     "org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
     "org.apache.flink.sql.parser.ddl.SqlAlterFunction"
     "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable"
@@ -77,6 +81,7 @@
     "org.apache.flink.sql.parser.ddl.SqlAlterViewRename"
     "org.apache.flink.sql.parser.ddl.SqlCompilePlan"
     "org.apache.flink.sql.parser.ddl.SqlCreateCatalog"
+    "org.apache.flink.sql.parser.ddl.SqlCreateConnection"
     "org.apache.flink.sql.parser.ddl.SqlCreateDatabase"
     "org.apache.flink.sql.parser.ddl.SqlCreateFunction"
     "org.apache.flink.sql.parser.ddl.SqlCreateModel"
@@ -88,6 +93,7 @@
     "org.apache.flink.sql.parser.ddl.SqlCreateView"
     "org.apache.flink.sql.parser.ddl.SqlDistribution"
     "org.apache.flink.sql.parser.ddl.SqlDropCatalog"
+    "org.apache.flink.sql.parser.ddl.SqlDropConnection"
     "org.apache.flink.sql.parser.ddl.SqlDropDatabase"
     "org.apache.flink.sql.parser.ddl.SqlDropFunction"
     "org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable"
@@ -138,12 +144,15 @@
     "org.apache.flink.sql.parser.dql.SqlShowTables"
     "org.apache.flink.sql.parser.dql.SqlShowTables.SqlTableKind"
     "org.apache.flink.sql.parser.dql.SqlShowColumns"
+    "org.apache.flink.sql.parser.dql.SqlShowConnections"
     "org.apache.flink.sql.parser.dql.SqlShowCreate"
+    "org.apache.flink.sql.parser.dql.SqlShowCreateConnection"
     "org.apache.flink.sql.parser.dql.SqlShowCreateMaterializedTable"
     "org.apache.flink.sql.parser.dql.SqlShowCreateModel"
     "org.apache.flink.sql.parser.dql.SqlShowCreateTable"
     "org.apache.flink.sql.parser.dql.SqlShowCreateView"
     "org.apache.flink.sql.parser.dql.SqlShowCreateCatalog"
+    "org.apache.flink.sql.parser.dql.SqlRichDescribeConnection"
     "org.apache.flink.sql.parser.dql.SqlRichDescribeFunction"
     "org.apache.flink.sql.parser.dql.SqlRichDescribeModel"
     "org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
@@ -186,6 +195,7 @@
     "COMMENT"
     "COMPILE"
     "COMPUTE"
+    "CONNECTIONS",
     "CONTINUOUS"
     "DATABASES"
     "DISTRIBUTED"
@@ -619,12 +629,14 @@
     "SqlAlterFunction()"
     "SqlShowFunctions()"
     "SqlShowModels()"
+    "SqlShowConnections()"
     "SqlShowTables()"
     "SqlShowColumns()"
     "SqlShowCreate()"
     "SqlReplaceTable()"
     "SqlAlterMaterializedTable()"
     "SqlAlterModel()"
+    "SqlAlterConnection()"
     "SqlAlterTable()"
     "SqlAlterView()"
     "SqlShowModules()"
@@ -649,6 +661,7 @@
     "SqlDescribeJob()"
     "SqlRichDescribeFunction()"
     "SqlRichDescribeModel()"
+    "SqlRichDescribeConnection()"
     "SqlRichDescribeTable()"
   ]
 
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 14bcdfd6f19..d4e0d198d91 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -776,6 +776,13 @@ SqlShowCreate SqlShowCreate() :
         {
             return new SqlShowCreateModel(pos, sqlIdentifier);
         }
+    |
+        <CONNECTION>
+        { pos = getPos(); }
+        sqlIdentifier = CompoundIdentifier()
+        {
+            return new SqlShowCreateConnection(pos, sqlIdentifier);
+        }
     |
         <MATERIALIZED> <TABLE>
         { pos = getPos(); }
@@ -787,7 +794,7 @@ SqlShowCreate SqlShowCreate() :
 }
 
 /**
- * DESCRIBE | DESC FUNCTION [ EXTENDED] [[catalogName.] 
dataBasesName].functionName sql call.
+ * (DESCRIBE | DESC) FUNCTION [ EXTENDED] [[catalogName.] 
dataBasesName].functionName sql call.
  * Here we add Rich in className to match the naming of SqlRichDescribeTable.
  */
 SqlRichDescribeFunction SqlRichDescribeFunction() :
@@ -806,7 +813,7 @@ SqlRichDescribeFunction SqlRichDescribeFunction() :
 }
 
 /**
- * DESCRIBE | DESC MODEL [ EXTENDED] [[catalogName.] dataBasesName].modelName 
sql call.
+ * (DESCRIBE | DESC) MODEL [ EXTENDED] [[catalogName.] 
dataBasesName].modelName sql call.
  * Here we add Rich in className to match the naming of SqlRichDescribeTable.
  */
 SqlRichDescribeModel SqlRichDescribeModel() :
@@ -825,7 +832,26 @@ SqlRichDescribeModel SqlRichDescribeModel() :
 }
 
 /**
- * DESCRIBE | DESC [ EXTENDED] [[catalogName.] dataBasesName].tableName sql 
call.
+ * (DESCRIBE | DESC) CONNECTION [ EXTENDED] [[catalogName.] 
dataBasesName].connectionName sql call.
+ * Here we add Rich in className to match the naming of SqlRichDescribeTable.
+ */
+SqlRichDescribeConnection SqlRichDescribeConnection() :
+{
+    SqlIdentifier connectionName;
+    SqlParserPos pos;
+    boolean isExtended = false;
+}
+{
+    ( <DESCRIBE> | <DESC> ) <CONNECTION> { pos = getPos();}
+    [ <EXTENDED> { isExtended = true;} ]
+    connectionName = CompoundIdentifier()
+    {
+        return new SqlRichDescribeConnection(pos, connectionName, isExtended);
+    }
+}
+
+/**
+ * (DESCRIBE | DESC) [ EXTENDED] [[catalogName.] dataBasesName].tableName sql 
call.
  * Here we add Rich in className to distinguish from calcite's original 
SqlDescribeTable.
  */
 SqlRichDescribeTable SqlRichDescribeTable() :
@@ -2683,9 +2709,13 @@ SqlCreate SqlCreateExtended(Span s, boolean replace) :
         |
         create = SqlCreateDatabase(s, replace)
         |
+        create = SqlCreateModel(s, isTemporary)
+        |
+        // Lookahead to distinguish <SYSTEM> FUNCTION and <SYSTEM> <CONNECTION>
+        LOOKAHEAD(2)
         create = SqlCreateFunction(s, replace, isTemporary)
         |
-        create = SqlCreateModel(s, isTemporary)
+        create = SqlCreateConnection(s, isTemporary)
     )
     {
         return create;
@@ -2712,9 +2742,13 @@ SqlDrop SqlDropExtended(Span s, boolean replace) :
         |
         drop = SqlDropDatabase(s, replace)
         |
+        drop = SqlDropModel(s, isTemporary)
+        |
+        // Lookahead to distinguish <SYSTEM> FUNCTION and <SYSTEM> <CONNECTION>
+        LOOKAHEAD(2)
         drop = SqlDropFunction(s, replace, isTemporary)
         |
-        drop = SqlDropModel(s, isTemporary)
+        drop = SqlDropConnection(s, isTemporary)
     )
     {
         return drop;
@@ -3345,7 +3379,7 @@ SqlTruncateTable SqlTruncateTable() :
 }
 
 /**
-* SHOW MODELS [FROM [catalog.] database] [[NOT] LIKE pattern]; sql call.
+* SHOW MODELS [FROM [catalog.] database] [[NOT] LIKE pattern];
 */
 SqlShowModels SqlShowModels() :
 {
@@ -3381,6 +3415,43 @@ SqlShowModels SqlShowModels() :
     }
 }
 
+/**
+* SHOW CONNECTIONS [LIKE 'pattern'] [FROM catalog_name.db_name];
+*/
+SqlShowConnections SqlShowConnections() :
+{
+    SqlIdentifier databaseName = null;
+    SqlCharStringLiteral likeLiteral = null;
+    String prep = null;
+    boolean notLike = false;
+    SqlParserPos pos;
+}
+{
+    <SHOW> <CONNECTIONS>
+    { pos = getPos(); }
+    [
+        ( <FROM> { prep = "FROM"; } | <IN> { prep = "IN"; } )
+        { pos = getPos(); }
+        databaseName = CompoundIdentifier()
+    ]
+    [
+        [
+            <NOT>
+            {
+                notLike = true;
+            }
+        ]
+        <LIKE>  <QUOTED_STRING>
+        {
+            String likeCondition = SqlParserUtil.parseString(token.image);
+            likeLiteral = SqlLiteral.createCharString(likeCondition, getPos());
+        }
+    ]
+    {
+        return new SqlShowConnections(pos, prep, databaseName, notLike, 
likeLiteral);
+    }
+}
+
 /**
 * ALTER MODEL [IF EXISTS] modelName SET (property_key = property_val, ...)
 * ALTER MODEL [IF EXISTS] modelName RENAME TO newModelName
@@ -3433,6 +3504,59 @@ SqlAlterModel SqlAlterModel() :
     )
 }
 
+/**
+* ALTER CONNECTION [IF EXISTS] connectionName SET (property_key = 
property_val, ...)
+* ALTER CONNECTION [IF EXISTS] connectionName RENAME TO newConnectionName
+* ALTER CONNECTION [IF EXISTS] connectionName RESET (property_key, ...)
+* Alter temporary or system connection is not supported.
+*/
+SqlAlterConnection SqlAlterConnection() :
+{
+    SqlParserPos startPos;
+    boolean ifExists = false;
+    SqlIdentifier connectionIdentifier;
+    SqlIdentifier newConnectionIdentifier = null;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
+    SqlNodeList propertyKeyList = SqlNodeList.EMPTY;
+}
+{
+    <ALTER> <CONNECTION> { startPos = getPos(); }
+    ifExists = IfExistsOpt()
+    connectionIdentifier = CompoundIdentifier()
+    (
+        LOOKAHEAD(2)
+        <RENAME> <TO>
+        newConnectionIdentifier = CompoundIdentifier()
+        {
+            return new SqlAlterConnectionRename(
+                        startPos.plus(getPos()),
+                        connectionIdentifier,
+                        newConnectionIdentifier,
+                        ifExists);
+        }
+    |
+        <SET>
+        propertyList = Properties()
+        {
+            return new SqlAlterConnectionSet(
+                        startPos.plus(getPos()),
+                        connectionIdentifier,
+                        ifExists,
+                        propertyList);
+        }
+    |
+        <RESET>
+        propertyKeyList = PropertyKeys()
+        {
+            return new SqlAlterConnectionReset(
+                        startPos.plus(getPos()),
+                        connectionIdentifier,
+                        ifExists,
+                        propertyKeyList);
+        }
+    )
+}
+
 /**
 * DROP MODEL [IF EXIST] modelName
 */
@@ -3453,6 +3577,38 @@ SqlDrop SqlDropModel(Span s, boolean isTemporary) :
     }
 }
 
+/**
+* DROP [TEMPORARY] [SYSTEM] CONNECTION [IF EXIST] connectionName
+*/
+SqlDrop SqlDropConnection(Span s, boolean isTemporary) :
+{
+    SqlIdentifier connectionIdentifier = null;
+    boolean ifExists = false;
+    boolean isSystemConnection = false;
+}
+{
+    [
+        <SYSTEM>
+        {
+            if (!isTemporary){
+                throw SqlUtil.newContextException(getPos(),
+                    
ParserResource.RESOURCE.dropSystemConnectionOnlySupportTemporary());
+            }
+            isSystemConnection = true;
+        }
+    ]
+
+    <CONNECTION>
+
+    ifExists = IfExistsOpt()
+
+    connectionIdentifier = CompoundIdentifier()
+
+    {
+         return new SqlDropConnection(s.pos(), connectionIdentifier, ifExists, 
isTemporary, isSystemConnection);
+    }
+}
+
 /**
 * CREATE MODEL [IF NOT EXIST] modelName
 * [INPUT(col1 type1, col2 type2, ...)]
@@ -3539,6 +3695,54 @@ SqlCreate SqlCreateModel(Span s, boolean isTemporary) :
     }
 }
 
+/**
+* CREATE [TEMPORARY] [SYSTEM] CONNECTION [IF NOT EXISTS] 
[catalog_name.][db_name.]connection_name
+* [COMMENT connection_comment]
+* WITH (property_key = property_val, ...)
+*/
+SqlCreate SqlCreateConnection(Span s, boolean isTemporary) :
+{
+    final SqlParserPos startPos = s.pos();
+    boolean ifNotExists = false;
+    boolean isSystem = false;
+    SqlIdentifier connectionIdentifier;
+    SqlCharStringLiteral comment = null;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
+}
+{
+    [
+        <SYSTEM>
+        {
+            if (!isTemporary){
+                throw SqlUtil.newContextException(getPos(),
+                    
ParserResource.RESOURCE.createSystemConnectionOnlySupportTemporary());
+            }
+            isSystem = true;
+        }
+    ]
+    <CONNECTION>
+
+    ifNotExists = IfNotExistsOpt()
+
+    connectionIdentifier = CompoundIdentifier()
+    [ <COMMENT> <QUOTED_STRING>
+        {
+            comment = Comment();
+        }
+    ]
+    <WITH>
+    propertyList = Properties()
+    {
+        return new SqlCreateConnection(startPos.plus(getPos()),
+            connectionIdentifier,
+            comment,
+            propertyList,
+            isTemporary,
+            isSystem,
+            ifNotExists);
+    }
+}
+
 SqlCharStringLiteral Comment() :
 {
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
index 8f3a3831aec..2a289d09bd8 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.sql.parser;
 
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
 
-import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
@@ -31,6 +30,8 @@ import javax.annotation.Nullable;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /** Utils methods for parsing DDLs. */
@@ -89,12 +90,19 @@ public class SqlParseUtils {
                 .collect(Collectors.toMap(k -> k.getKeyString(), 
SqlTableOption::getValueString));
     }
 
-    public static List<String> extractList(@Nullable SqlNodeList sqlNodeList) {
+    public static List<String> extractList(
+            @Nullable SqlNodeList sqlNodeList, Function<SqlNode, String> 
mapper) {
         if (sqlNodeList == null) {
             return List.of();
         }
-        return sqlNodeList.getList().stream()
-                .map(p -> ((SqlIdentifier) p).getSimple())
-                .collect(Collectors.toList());
+        return 
sqlNodeList.getList().stream().map(mapper).collect(Collectors.toList());
+    }
+
+    public static Set<String> extractSet(
+            @Nullable SqlNodeList sqlNodeList, Function<SqlNode, String> 
mapper) {
+        if (sqlNodeList == null) {
+            return Set.of();
+        }
+        return 
sqlNodeList.getList().stream().map(mapper).collect(Collectors.toSet());
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java
similarity index 56%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java
index 53b6417f7f5..d8c490c547a 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java
@@ -20,37 +20,39 @@ package org.apache.flink.sql.parser.ddl;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
 /**
- * {@link SqlNode} to describe the DROP MODEL [IF EXISTS] [[catalogName.] 
dataBasesName].modelName
- * syntax.
+ * Abstract class to describe statements like ALTER CONNECTION [IF EXISTS] 
[[catalogName.]
+ * dataBasesName.]connectionName ...
  */
-public class SqlDropModel extends SqlDropObject {
-    private static final SqlOperator OPERATOR =
-            new SqlSpecialOperator("DROP MODEL", SqlKind.OTHER_DDL);
+public abstract class SqlAlterConnection extends SqlAlterObject {
 
-    private final boolean isTemporary;
+    private static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("ALTER CONNECTION", SqlKind.OTHER_DDL);
 
-    public SqlDropModel(
-            SqlParserPos pos, SqlIdentifier modelName, boolean ifExists, 
boolean isTemporary) {
-        super(OPERATOR, pos, modelName, ifExists);
-        this.isTemporary = isTemporary;
+    protected final boolean ifConnectionExists;
+
+    public SqlAlterConnection(
+            SqlParserPos pos, SqlIdentifier connectionName, boolean 
ifConnectionExists) {
+        super(OPERATOR, pos, "CONNECTION", connectionName);
+        this.ifConnectionExists = ifConnectionExists;
     }
 
-    public boolean getIsTemporary() {
-        return this.isTemporary;
+    /**
+     * Whether to ignore the error if the connection doesn't exist.
+     *
+     * @return true when IF EXISTS is specified.
+     */
+    public boolean ifConnectionExists() {
+        return ifConnectionExists;
     }
 
     @Override
-    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-        writer.keyword("DROP");
-        writer.keyword("MODEL");
-        if (ifExists) {
+    public void unparseAlterOperation(SqlWriter writer, int leftPrec, int 
rightPrec) {
+        if (ifConnectionExists) {
             writer.keyword("IF EXISTS");
         }
         name.unparse(writer, leftPrec, rightPrec);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java
similarity index 58%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java
index e9392daaa1a..ce50651c4c6 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java
@@ -18,50 +18,46 @@
 
 package org.apache.flink.sql.parser.ddl;
 
-import org.apache.flink.sql.parser.SqlParseUtils;
-import org.apache.flink.sql.parser.SqlUnparseUtils;
-
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 
 /**
- * ALTER MODEL [IF EXISTS] [[catalogName.] dataBasesName.]modelName RESET ( 
'key1' [, 'key2']...).
+ * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName 
RENAME TO
+ * newConnectionName.
  */
-public class SqlAlterModelReset extends SqlAlterModel {
-    private final SqlNodeList optionKeyList;
+public class SqlAlterConnectionRename extends SqlAlterConnection {
+
+    private final SqlIdentifier newConnectionName;
 
-    public SqlAlterModelReset(
+    public SqlAlterConnectionRename(
             SqlParserPos pos,
-            SqlIdentifier modelName,
-            boolean ifModelExists,
-            SqlNodeList optionKeyList) {
-        super(pos, modelName, ifModelExists);
-        this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList 
should not be null");
+            SqlIdentifier connectionName,
+            SqlIdentifier newConnectionName,
+            boolean ifConnectionExists) {
+        super(pos, connectionName, ifConnectionExists);
+        this.newConnectionName =
+                requireNonNull(newConnectionName, "newConnectionName should 
not be null");
     }
 
-    @Override
-    public List<SqlNode> getOperandList() {
-        return List.of(name, optionKeyList);
+    public SqlIdentifier getNewConnectionName() {
+        return newConnectionName;
     }
 
-    public Set<String> getResetKeys() {
-        return optionKeyList.getList().stream()
-                .map(SqlParseUtils::extractString)
-                .collect(Collectors.toSet());
+    @Override
+    public List<SqlNode> getOperandList() {
+        return List.of(name, newConnectionName);
     }
 
     @Override
     public void unparseAlterOperation(SqlWriter writer, int leftPrec, int 
rightPrec) {
         super.unparseAlterOperation(writer, leftPrec, rightPrec);
-        SqlUnparseUtils.unparseResetOptions(optionKeyList, writer, leftPrec, 
rightPrec);
+        writer.keyword("RENAME TO");
+        newConnectionName.unparse(writer, leftPrec, rightPrec);
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java
similarity index 79%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java
index e9392daaa1a..23501f75529 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java
@@ -29,22 +29,22 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 
 /**
- * ALTER MODEL [IF EXISTS] [[catalogName.] dataBasesName.]modelName RESET ( 
'key1' [, 'key2']...).
+ * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName 
RESET ( 'key1' [,
+ * 'key2']...).
  */
-public class SqlAlterModelReset extends SqlAlterModel {
+public class SqlAlterConnectionReset extends SqlAlterConnection {
     private final SqlNodeList optionKeyList;
 
-    public SqlAlterModelReset(
+    public SqlAlterConnectionReset(
             SqlParserPos pos,
-            SqlIdentifier modelName,
-            boolean ifModelExists,
+            SqlIdentifier connectionName,
+            boolean ifConnectionExists,
             SqlNodeList optionKeyList) {
-        super(pos, modelName, ifModelExists);
+        super(pos, connectionName, ifConnectionExists);
         this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList 
should not be null");
     }
 
@@ -54,9 +54,7 @@ public class SqlAlterModelReset extends SqlAlterModel {
     }
 
     public Set<String> getResetKeys() {
-        return optionKeyList.getList().stream()
-                .map(SqlParseUtils::extractString)
-                .collect(Collectors.toSet());
+        return SqlParseUtils.extractSet(optionKeyList, 
SqlParseUtils::extractString);
     }
 
     @Override
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java
similarity index 63%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java
index e9392daaa1a..c738d9a7530 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java
@@ -28,40 +28,40 @@ import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
 /**
- * ALTER MODEL [IF EXISTS] [[catalogName.] dataBasesName.]modelName RESET ( 
'key1' [, 'key2']...).
+ * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName 
SET ( name=value [,
+ * name=value]*).
  */
-public class SqlAlterModelReset extends SqlAlterModel {
-    private final SqlNodeList optionKeyList;
+public class SqlAlterConnectionSet extends SqlAlterConnection {
 
-    public SqlAlterModelReset(
+    private final SqlNodeList connectionOptionList;
+
+    public SqlAlterConnectionSet(
             SqlParserPos pos,
-            SqlIdentifier modelName,
-            boolean ifModelExists,
-            SqlNodeList optionKeyList) {
-        super(pos, modelName, ifModelExists);
-        this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList 
should not be null");
+            SqlIdentifier connectionName,
+            boolean ifConnectionExists,
+            SqlNodeList connectionOptionList) {
+        super(pos, connectionName, ifConnectionExists);
+        this.connectionOptionList =
+                requireNonNull(connectionOptionList, "connectionOptionList 
should not be null");
     }
 
-    @Override
-    public List<SqlNode> getOperandList() {
-        return List.of(name, optionKeyList);
+    public Map<String, String> getProperties() {
+        return SqlParseUtils.extractMap(connectionOptionList);
     }
 
-    public Set<String> getResetKeys() {
-        return optionKeyList.getList().stream()
-                .map(SqlParseUtils::extractString)
-                .collect(Collectors.toSet());
+    @Override
+    public List<SqlNode> getOperandList() {
+        return List.of(name, connectionOptionList);
     }
 
     @Override
     public void unparseAlterOperation(SqlWriter writer, int leftPrec, int 
rightPrec) {
         super.unparseAlterOperation(writer, leftPrec, rightPrec);
-        SqlUnparseUtils.unparseResetOptions(optionKeyList, writer, leftPrec, 
rightPrec);
+        SqlUnparseUtils.unparseSetOptions(connectionOptionList, writer, 
leftPrec, rightPrec);
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
index e9392daaa1a..00bfbc1079e 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
@@ -29,7 +29,6 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 
@@ -54,9 +53,7 @@ public class SqlAlterModelReset extends SqlAlterModel {
     }
 
     public Set<String> getResetKeys() {
-        return optionKeyList.getList().stream()
-                .map(SqlParseUtils::extractString)
-                .collect(Collectors.toSet());
+        return SqlParseUtils.extractSet(optionKeyList, 
SqlParseUtils::extractString);
     }
 
     @Override
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
index a76163b999e..b6581a5f419 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
@@ -30,7 +30,6 @@ import org.apache.calcite.util.ImmutableNullableList;
 
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 
@@ -58,9 +57,7 @@ public class SqlAlterTableReset extends SqlAlterTable {
     }
 
     public Set<String> getResetKeys() {
-        return propertyKeyList.getList().stream()
-                .map(SqlParseUtils::extractString)
-                .collect(Collectors.toSet());
+        return SqlParseUtils.extractSet(propertyKeyList, 
SqlParseUtils::extractString);
     }
 
     @Override
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
index 4ab91890c19..c90c56a7b8c 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
@@ -81,7 +81,7 @@ public class SqlAnalyzeTable extends SqlCall {
     }
 
     public List<String> getColumnNames() {
-        return SqlParseUtils.extractList(columns);
+        return SqlParseUtils.extractList(columns, p -> ((SqlIdentifier) 
p).getSimple());
     }
 
     public boolean isAllColumns() {
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java
new file mode 100644
index 00000000000..c9936893a14
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+/**
+ * {@link SqlNode} to describe the CREATE CONNECTION syntax. CREATE 
[TEMPORARY] [SYSTEM] CONNECTION
+ * [IF NOT EXISTS] [[catalogName.] dataBasesName].connectionName [COMMENT 
connection_comment] WITH
+ * (name=value, [name=value]*).
+ */
+public class SqlCreateConnection extends SqlCreateObject implements 
ExtendedSqlNode {
+
+    private static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("CREATE CONNECTION", SqlKind.OTHER_DDL);
+
+    private final boolean isSystem;
+
+    public SqlCreateConnection(
+            SqlParserPos pos,
+            SqlIdentifier connectionName,
+            SqlCharStringLiteral comment,
+            SqlNodeList propertyList,
+            boolean isTemporary,
+            boolean isSystem,
+            boolean ifNotExists) {
+        super(
+                OPERATOR,
+                pos,
+                connectionName,
+                isTemporary,
+                false,
+                ifNotExists,
+                propertyList,
+                comment);
+        this.isSystem = isSystem;
+    }
+
+    @Override
+    public @Nonnull List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(name, comment, properties);
+    }
+
+    public boolean isSystem() {
+        return isSystem;
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        if (properties == null || properties.isEmpty()) {
+            throw new SqlValidateException(
+                    getParserPosition(), "Connection property list can not be 
empty.");
+        }
+    }
+
+    @Override
+    protected String getScope() {
+        return "CONNECTION";
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("CREATE");
+        if (isTemporary()) {
+            writer.keyword("TEMPORARY");
+        }
+        if (isSystem) {
+            writer.keyword("SYSTEM");
+        }
+        writer.keyword("CONNECTION");
+        if (isIfNotExists()) {
+            writer.keyword("IF NOT EXISTS");
+        }
+        name.unparse(writer, leftPrec, rightPrec);
+        SqlUnparseUtils.unparseComment(comment, true, writer, leftPrec, 
rightPrec);
+        SqlUnparseUtils.unparseProperties(properties, writer, leftPrec, 
rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
index 628571fdcb4..8039e282968 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
@@ -119,7 +119,7 @@ public class SqlCreateMaterializedTable extends 
SqlCreateObject implements Exten
     }
 
     public List<String> getPartitionKeyList() {
-        return SqlParseUtils.extractList(partitionKeyList);
+        return SqlParseUtils.extractList(partitionKeyList, p -> 
((SqlIdentifier) p).getSimple());
     }
 
     @Nullable
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 2d6769b2252..8fa0a4c7ca5 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -136,7 +136,7 @@ public class SqlCreateTable extends SqlCreateObject 
implements ExtendedSqlNode {
     }
 
     public List<String> getPartitionKeyList() {
-        return SqlParseUtils.extractList(partitionKeyList);
+        return SqlParseUtils.extractList(partitionKeyList, p -> 
((SqlIdentifier) p).getSimple());
     }
 
     public List<SqlTableConstraint> getTableConstraints() {
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java
similarity index 60%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java
index 53b6417f7f5..3af65a4ba11 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java
@@ -20,36 +20,51 @@ package org.apache.flink.sql.parser.ddl;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
 /**
- * {@link SqlNode} to describe the DROP MODEL [IF EXISTS] [[catalogName.] 
dataBasesName].modelName
- * syntax.
+ * {@link org.apache.calcite.sql.SqlNode} to describe the DROP CONNECTION [IF 
EXISTS]
+ * [[catalogName.] dataBasesName].connectionName syntax.
  */
-public class SqlDropModel extends SqlDropObject {
+public class SqlDropConnection extends SqlDropObject {
     private static final SqlOperator OPERATOR =
-            new SqlSpecialOperator("DROP MODEL", SqlKind.OTHER_DDL);
+            new SqlSpecialOperator("DROP CONNECTION", SqlKind.OTHER_DDL);
 
     private final boolean isTemporary;
+    private final boolean isSystemConnection;
 
-    public SqlDropModel(
-            SqlParserPos pos, SqlIdentifier modelName, boolean ifExists, 
boolean isTemporary) {
-        super(OPERATOR, pos, modelName, ifExists);
+    public SqlDropConnection(
+            SqlParserPos pos,
+            SqlIdentifier connectionName,
+            boolean ifExists,
+            boolean isTemporary,
+            boolean isSystemConnection) {
+        super(OPERATOR, pos, connectionName, ifExists);
         this.isTemporary = isTemporary;
+        this.isSystemConnection = isSystemConnection;
     }
 
-    public boolean getIsTemporary() {
-        return this.isTemporary;
+    public boolean isTemporary() {
+        return isTemporary;
+    }
+
+    public boolean isSystemConnection() {
+        return isSystemConnection;
     }
 
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         writer.keyword("DROP");
-        writer.keyword("MODEL");
+        if (isTemporary) {
+            writer.keyword("TEMPORARY");
+        }
+        if (isSystemConnection) {
+            writer.keyword("SYSTEM");
+        }
+        writer.keyword("CONNECTION");
         if (ifExists) {
             writer.keyword("IF EXISTS");
         }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
index 53b6417f7f5..8aeb082a54f 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
@@ -42,13 +42,16 @@ public class SqlDropModel extends SqlDropObject {
         this.isTemporary = isTemporary;
     }
 
-    public boolean getIsTemporary() {
-        return this.isTemporary;
+    public boolean isTemporary() {
+        return isTemporary;
     }
 
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         writer.keyword("DROP");
+        if (isTemporary) {
+            writer.keyword("TEMPORARY");
+        }
         writer.keyword("MODEL");
         if (ifExists) {
             writer.keyword("IF EXISTS");
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java
new file mode 100644
index 00000000000..9fb9bb9b79c
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * DESCRIBE CONNECTION [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier 
sql call. Here we add
+ * Rich in className to follow the convention of {@link 
org.apache.calcite.sql.SqlDescribeTable},
+ * which only had it to distinguish from calcite's original SqlDescribeTable, 
even though calcite
+ * does not have SqlDescribeConnection.
+ */
+public class SqlRichDescribeConnection extends SqlCall {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("DESCRIBE CONNECTION", SqlKind.OTHER);
+    protected final SqlIdentifier connectionNameIdentifier;
+    private final boolean isExtended;
+
+    public SqlRichDescribeConnection(
+            SqlParserPos pos, SqlIdentifier connectionNameIdentifier, boolean 
isExtended) {
+        super(pos);
+        this.connectionNameIdentifier = connectionNameIdentifier;
+        this.isExtended = isExtended;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return Collections.singletonList(connectionNameIdentifier);
+    }
+
+    public boolean isExtended() {
+        return isExtended;
+    }
+
+    public String[] fullConnectionName() {
+        return connectionNameIdentifier.names.toArray(new String[0]);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("DESCRIBE CONNECTION");
+        if (isExtended) {
+            writer.keyword("EXTENDED");
+        }
+        connectionNameIdentifier.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java
similarity index 53%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java
index 53b6417f7f5..a0dd89f99e6 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java
@@ -16,43 +16,45 @@
  * limitations under the License.
  */
 
-package org.apache.flink.sql.parser.ddl;
+package org.apache.flink.sql.parser.dql;
 
+import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
-import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
-/**
- * {@link SqlNode} to describe the DROP MODEL [IF EXISTS] [[catalogName.] 
dataBasesName].modelName
- * syntax.
- */
-public class SqlDropModel extends SqlDropObject {
-    private static final SqlOperator OPERATOR =
-            new SqlSpecialOperator("DROP MODEL", SqlKind.OTHER_DDL);
+/** {@link SqlNode} to describe the SHOW CONNECTIONS syntax. */
+public class SqlShowConnections extends SqlShowCall {
 
-    private final boolean isTemporary;
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("SHOW CONNECTIONS", SqlKind.OTHER);
 
-    public SqlDropModel(
-            SqlParserPos pos, SqlIdentifier modelName, boolean ifExists, 
boolean isTemporary) {
-        super(OPERATOR, pos, modelName, ifExists);
-        this.isTemporary = isTemporary;
+    public SqlShowConnections(
+            SqlParserPos pos,
+            String preposition,
+            SqlIdentifier databaseName,
+            boolean notLike,
+            SqlCharStringLiteral likeLiteral) {
+        // only LIKE currently supported for SHOW CONNECTIONS
+        super(
+                pos,
+                preposition,
+                databaseName,
+                likeLiteral == null ? null : "LIKE",
+                likeLiteral,
+                notLike);
     }
 
-    public boolean getIsTemporary() {
-        return this.isTemporary;
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
     }
 
     @Override
-    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-        writer.keyword("DROP");
-        writer.keyword("MODEL");
-        if (ifExists) {
-            writer.keyword("IF EXISTS");
-        }
-        name.unparse(writer, leftPrec, rightPrec);
+    String getOperationName() {
+        return "SHOW CONNECTIONS";
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java
similarity index 59%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java
index 53b6417f7f5..0434bbb6dc1 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.sql.parser.ddl;
+package org.apache.flink.sql.parser.dql;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
@@ -26,33 +26,36 @@ import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
-/**
- * {@link SqlNode} to describe the DROP MODEL [IF EXISTS] [[catalogName.] 
dataBasesName].modelName
- * syntax.
- */
-public class SqlDropModel extends SqlDropObject {
-    private static final SqlOperator OPERATOR =
-            new SqlSpecialOperator("DROP MODEL", SqlKind.OTHER_DDL);
+import java.util.Collections;
+import java.util.List;
+
+/** SHOW CREATE CONNECTION sql call. */
+public class SqlShowCreateConnection extends SqlShowCreate {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("SHOW CREATE CONNECTION", 
SqlKind.OTHER_DDL);
+
+    public SqlShowCreateConnection(SqlParserPos pos, SqlIdentifier 
connectionName) {
+        super(pos, connectionName);
+    }
 
-    private final boolean isTemporary;
+    public SqlIdentifier getConnectionName() {
+        return sqlIdentifier;
+    }
 
-    public SqlDropModel(
-            SqlParserPos pos, SqlIdentifier modelName, boolean ifExists, 
boolean isTemporary) {
-        super(OPERATOR, pos, modelName, ifExists);
-        this.isTemporary = isTemporary;
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
     }
 
-    public boolean getIsTemporary() {
-        return this.isTemporary;
+    @Override
+    public List<SqlNode> getOperandList() {
+        return Collections.singletonList(sqlIdentifier);
     }
 
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-        writer.keyword("DROP");
-        writer.keyword("MODEL");
-        if (ifExists) {
-            writer.keyword("IF EXISTS");
-        }
-        name.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("SHOW CREATE CONNECTION");
+        sqlIdentifier.unparse(writer, leftPrec, rightPrec);
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 9c29119ba9e..57e2e55c17d 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -35,7 +35,15 @@ public interface ParserResource {
     Resources.ExInst<ParseException> overwriteIsOnlyUsedWithInsert();
 
     @Resources.BaseMessage(
-            "CREATE SYSTEM FUNCTION is not supported, system functions can 
only be registered as temporary function, you can use CREATE TEMPORARY SYSTEM 
FUNCTION instead.")
+            "CREATE SYSTEM CONNECTION is not supported, system connections can 
only be registered as temporary connections, you can use CREATE TEMPORARY 
SYSTEM CONNECTION instead.")
+    Resources.ExInst<ParseException> 
createSystemConnectionOnlySupportTemporary();
+
+    @Resources.BaseMessage(
+            "DROP SYSTEM CONNECTION is not supported, system connections can 
only be dropped as temporary connections, you can use DROP TEMPORARY SYSTEM 
CONNECTION instead.")
+    Resources.ExInst<ParseException> 
dropSystemConnectionOnlySupportTemporary();
+
+    @Resources.BaseMessage(
+            "CREATE SYSTEM FUNCTION is not supported, system functions can 
only be registered as temporary functions, you can use CREATE TEMPORARY SYSTEM 
FUNCTION instead.")
     Resources.ExInst<ParseException> 
createSystemFunctionOnlySupportTemporary();
 
     @Resources.BaseMessage("Duplicate EXPLAIN DETAIL is not allowed.")
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 8a6b3abe8a6..64ea1fa26e7 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2479,7 +2479,7 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                 .fails(
                         "CREATE SYSTEM FUNCTION is not supported, "
                                 + "system functions can only be registered as 
temporary "
-                                + "function, you can use CREATE TEMPORARY 
SYSTEM FUNCTION instead.");
+                                + "functions, you can use CREATE TEMPORARY 
SYSTEM FUNCTION instead.");
 
         // test create function using jar
         sql("create temporary function function1 as 
'org.apache.flink.function.function1' language java using jar 
'file:///path/to/test.jar'")
@@ -3262,6 +3262,12 @@ class FlinkSqlParserImplTest extends SqlParserTest {
         sql("drop model catalog1.db1.m1").ok("DROP MODEL 
`CATALOG1`.`DB1`.`M1`");
     }
 
+    @Test
+    void testDropTemporaryModel() {
+        sql("drop temporary model m1").ok("DROP TEMPORARY MODEL `M1`");
+        sql("drop temporary model if exists m1").ok("DROP TEMPORARY MODEL IF 
EXISTS `M1`");
+    }
+
     @Test
     void testDropModelIfExists() {
         sql("drop model if exists catalog1.db1.m1")
@@ -3480,6 +3486,254 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                                 + "FROM TABLE(`ML_PREDICT`(`INPUT` => (TABLE 
`MY_TABLE`), `MODEL` => (MODEL `MY_MODEL`)))");
     }
 
+    // 
=====================================================================================
+    // Connection DDL/DQL Tests
+    // 
=====================================================================================
+
+    @Test
+    void testCreateConnection() {
+        sql("create connection conn1\n"
+                        + " COMMENT 'connection_comment'\n"
+                        + " WITH (\n"
+                        + "  'type'='basic',\n"
+                        + "  'url'='http://example.com',\n"
+                        + "  'username'='user1',\n"
+                        + "  'password'='pass1'\n"
+                        + " )\n")
+                .ok(
+                        "CREATE CONNECTION `CONN1`\n"
+                                + "COMMENT 'connection_comment'\n"
+                                + "WITH (\n"
+                                + "  'type' = 'basic',\n"
+                                + "  'url' = 'http://example.com',\n"
+                                + "  'username' = 'user1',\n"
+                                + "  'password' = 'pass1'\n"
+                                + ")");
+    }
+
+    @Test
+    void testCreateConnectionIfNotExists() {
+        sql("create connection if not exists conn1\n"
+                        + " WITH (\n"
+                        + "  'type'='bearer',\n"
+                        + "  'token'='my_token'\n"
+                        + " )\n")
+                .ok(
+                        "CREATE CONNECTION IF NOT EXISTS `CONN1`\n"
+                                + "WITH (\n"
+                                + "  'type' = 'bearer',\n"
+                                + "  'token' = 'my_token'\n"
+                                + ")");
+    }
+
+    @Test
+    void testCreateTemporaryConnection() {
+        sql("create temporary connection conn1\n"
+                        + " WITH (\n"
+                        + "  'type'='oauth',\n"
+                        + "  'client_id'='client1'\n"
+                        + " )\n")
+                .ok(
+                        "CREATE TEMPORARY CONNECTION `CONN1`\n"
+                                + "WITH (\n"
+                                + "  'type' = 'oauth',\n"
+                                + "  'client_id' = 'client1'\n"
+                                + ")");
+    }
+
+    @Test
+    void testCreateSystemConnection() {
+        sql("create ^system^ connection conn1\n"
+                        + " WITH (\n"
+                        + "  'type'='basic',\n"
+                        + "  'url'='http://example.com'\n"
+                        + " )\n")
+                .fails(
+                        "(?s)CREATE SYSTEM CONNECTION is not supported, "
+                                + "system connections can only be registered 
as temporary "
+                                + "connections, you can use CREATE TEMPORARY 
SYSTEM CONNECTION "
+                                + "instead\\..*");
+    }
+
+    @Test
+    void testCreateTemporarySystemConnection() {
+        sql("create temporary system connection conn1\n"
+                        + " WITH (\n"
+                        + "  'type'='custom_type',\n"
+                        + "  'api_key'='key123'\n"
+                        + " )\n")
+                .ok(
+                        "CREATE TEMPORARY SYSTEM CONNECTION `CONN1`\n"
+                                + "WITH (\n"
+                                + "  'type' = 'custom_type',\n"
+                                + "  'api_key' = 'key123'\n"
+                                + ")");
+    }
+
+    @Test
+    void testCreateConnectionWithQualifiedName() {
+        sql("create connection catalog1.db1.conn1\n"
+                        + " WITH ('type'='basic', 
'url'='http://example.com')\n")
+                .ok(
+                        "CREATE CONNECTION `CATALOG1`.`DB1`.`CONN1`\n"
+                                + "WITH (\n"
+                                + "  'type' = 'basic',\n"
+                                + "  'url' = 'http://example.com'\n"
+                                + ")");
+    }
+
+    @Test
+    void testDropConnection() {
+        sql("drop connection conn1").ok("DROP CONNECTION `CONN1`");
+        sql("drop connection db1.conn1").ok("DROP CONNECTION `DB1`.`CONN1`");
+        sql("drop connection catalog1.db1.conn1").ok("DROP CONNECTION 
`CATALOG1`.`DB1`.`CONN1`");
+    }
+
+    @Test
+    void testDropConnectionIfExists() {
+        sql("drop connection if exists catalog1.db1.conn1")
+                .ok("DROP CONNECTION IF EXISTS `CATALOG1`.`DB1`.`CONN1`");
+    }
+
+    @Test
+    void testDropTemporaryConnection() {
+        sql("drop temporary connection conn1").ok("DROP TEMPORARY CONNECTION 
`CONN1`");
+        sql("drop temporary connection if exists conn1")
+                .ok("DROP TEMPORARY CONNECTION IF EXISTS `CONN1`");
+    }
+
+    @Test
+    void testDropTemporarySystemConnection() {
+        sql("drop temporary system connection conn1")
+                .ok("DROP TEMPORARY SYSTEM CONNECTION `CONN1`");
+        sql("drop temporary system connection if exists conn1")
+                .ok("DROP TEMPORARY SYSTEM CONNECTION IF EXISTS `CONN1`");
+    }
+
+    @Test
+    void testDropSystemConnection() {
+        sql("drop ^system^ connection conn1")
+                .fails(
+                        "(?s)DROP SYSTEM CONNECTION is not supported, "
+                                + "system connections can only be dropped as 
temporary "
+                                + "connections, you can use DROP TEMPORARY 
SYSTEM CONNECTION "
+                                + "instead\\..*");
+    }
+
+    @Test
+    void testAlterConnectionSet() {
+        final String sql =
+                "alter connection conn1 set ('password' = 'new_password','url' 
= 'http://new.com')";
+        final String expected =
+                "ALTER CONNECTION `CONN1` SET (\n"
+                        + "  'password' = 'new_password',\n"
+                        + "  'url' = 'http://new.com'\n"
+                        + ")";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testAlterConnectionSetWithQualifiedName() {
+        final String sql = "alter connection catalog1.db1.conn1 set ('token' = 
'new_token')";
+        final String expected =
+                "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1` SET (\n"
+                        + "  'token' = 'new_token'\n"
+                        + ")";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testAlterConnectionRename() {
+        final String sql = "alter connection conn1 rename to conn2";
+        final String expected = "ALTER CONNECTION `CONN1` RENAME TO `CONN2`";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testAlterConnectionRenameWithQualifiedName() {
+        final String sql = "alter connection catalog1.db1.conn1 rename to 
conn2";
+        final String expected = "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1` 
RENAME TO `CONN2`";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testAlterConnectionReset() {
+        final String sql = "alter connection conn1 reset ('password', 'url')";
+        final String expected = "ALTER CONNECTION `CONN1` RESET (\n  
'password',\n  'url'\n)";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testAlterConnectionResetWithQualifiedName() {
+        final String sql = "alter connection catalog1.db1.conn1 reset 
('token')";
+        final String expected = "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1` 
RESET (\n  'token'\n)";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testAlterConnectionIfExists() {
+        final String sql =
+                "alter connection if exists conn1 set ('password' = 
'new_password','url' = 'http://new.com')";
+        final String expected =
+                "ALTER CONNECTION IF EXISTS `CONN1` SET (\n"
+                        + "  'password' = 'new_password',\n"
+                        + "  'url' = 'http://new.com'\n"
+                        + ")";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testAlterConnectionRenameIfExists() {
+        final String sql = "alter connection if exists conn1 rename to conn2";
+        final String expected = "ALTER CONNECTION IF EXISTS `CONN1` RENAME TO 
`CONN2`";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testAlterConnectionResetIfExists() {
+        final String sql = "alter connection if exists conn1 reset 
('password', 'url')";
+        final String expected =
+                "ALTER CONNECTION IF EXISTS `CONN1` RESET (\n  'password',\n  
'url'\n)";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testShowConnections() {
+        sql("show connections").ok("SHOW CONNECTIONS");
+        sql("show connections from db1").ok("SHOW CONNECTIONS FROM `DB1`");
+        sql("show connections from catalog1.db1").ok("SHOW CONNECTIONS FROM 
`CATALOG1`.`DB1`");
+        sql("show connections in db1").ok("SHOW CONNECTIONS IN `DB1`");
+        sql("show connections in catalog1.db1").ok("SHOW CONNECTIONS IN 
`CATALOG1`.`DB1`");
+    }
+
+    @Test
+    void testShowConnectionsLike() {
+        sql("show connections like '%conn%'").ok("SHOW CONNECTIONS LIKE 
'%CONN%'");
+        sql("show connections from db1 like 'my_%'").ok("SHOW CONNECTIONS FROM 
`DB1` LIKE 'MY_%'");
+        sql("show connections not like 'temp_%'").ok("SHOW CONNECTIONS NOT 
LIKE 'TEMP_%'");
+    }
+
+    @Test
+    void testShowCreateConnection() {
+        sql("show create connection conn1").ok("SHOW CREATE CONNECTION 
`CONN1`");
+        sql("show create connection catalog1.db1.conn1")
+                .ok("SHOW CREATE CONNECTION `CATALOG1`.`DB1`.`CONN1`");
+    }
+
+    @Test
+    void testDescribeConnection() {
+        sql("describe connection conn1").ok("DESCRIBE CONNECTION `CONN1`");
+        sql("describe connection catalog1.db1.conn1")
+                .ok("DESCRIBE CONNECTION `CATALOG1`.`DB1`.`CONN1`");
+        sql("describe connection extended conn1").ok("DESCRIBE CONNECTION 
EXTENDED `CONN1`");
+
+        sql("desc connection conn1").ok("DESCRIBE CONNECTION `CONN1`");
+        sql("desc connection catalog1.db1.conn1")
+                .ok("DESCRIBE CONNECTION `CATALOG1`.`DB1`.`CONN1`");
+        sql("desc connection extended catalog1.db1.conn1")
+                .ok("DESCRIBE CONNECTION EXTENDED `CATALOG1`.`DB1`.`CONN1`");
+    }
+
     /*
      * This test was backported from Calcite 1.38 (CALCITE-6266).
      * Remove it together with upgrade to Calcite 1.38.
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java
index 90a45c5a425..4e0d85c6dbc 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java
@@ -35,6 +35,6 @@ public class SqlDropModelConverter implements 
SqlNodeConverter<SqlDropModel> {
                 
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
 
         return new DropModelOperation(
-                identifier, sqlDropModel.getIfExists(), 
sqlDropModel.getIsTemporary());
+                identifier, sqlDropModel.getIfExists(), 
sqlDropModel.isTemporary());
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index b9992b43dd8..ff25ff6b0e0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 
 import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlNumericLiteral;
@@ -52,7 +53,8 @@ public class OperationConverterUtils {
         }
 
         SqlNodeList columns = distribution.getBucketColumns();
-        List<String> bucketColumns = SqlParseUtils.extractList(columns);
+        List<String> bucketColumns =
+                SqlParseUtils.extractList(columns, p -> ((SqlIdentifier) 
p).getSimple());
         return TableDistribution.of(kind, bucketCount, bucketColumns);
     }
 

Reply via email to