This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 80189e470f4 [FLINK-39799][table] Preserve user-typed query text for 
materialized table and view definitions
80189e470f4 is described below

commit 80189e470f462ec02ee6aeafca31b8ac15efd439
Author: Ramin Gharib <[email protected]>
AuthorDate: Fri Jun 5 17:33:34 2026 +0200

    [FLINK-39799][table] Preserve user-typed query text for materialized table 
and view definitions
---
 .../src/main/codegen/includes/parserImpls.ftl      | 74 ++++++++++++++--------
 .../SqlAlterMaterializedTableAsQuery.java          | 13 +++-
 .../SqlCreateMaterializedTable.java                | 11 +++-
 .../SqlCreateOrAlterMaterializedTable.java         |  6 +-
 .../flink/sql/parser/ddl/view/SqlAlterViewAs.java  | 14 +++-
 .../flink/sql/parser/ddl/view/SqlCreateView.java   | 10 ++-
 .../flink/table/planner/delegation/ParserImpl.java |  3 +-
 .../planner/operations/SqlNodeConvertContext.java  | 13 +++-
 .../operations/SqlNodeToOperationConversion.java   | 58 +++++++++++------
 .../converters/SqlAlterViewAsConverter.java        |  1 +
 .../converters/SqlCreateViewConverter.java         |  7 +-
 .../operations/converters/SqlNodeConvertUtils.java | 60 +++++++++++++++---
 .../operations/converters/SqlNodeConverter.java    |  7 ++
 .../AbstractCreateMaterializedTableConverter.java  | 12 +++-
 .../SqlAlterMaterializedTableAsQueryConverter.java | 16 +++--
 .../operations/SqlCTASNodeToOperationTest.java     | 10 +++
 .../operations/SqlDdlToOperationConverterTest.java | 41 +++++++++++-
 ...erializedTableNodeToOperationConverterTest.java | 69 ++++++++++++++++++--
 .../SqlNodeToOperationConversionTestBase.java      |  4 +-
 ...reateOrAlterMaterializedTableConverterTest.java |  6 +-
 .../SqlRTASNodeToOperationConverterTest.java       | 10 +++
 .../table/planner/catalog/CatalogTableITCase.scala |  2 +-
 22 files changed, 365 insertions(+), 82 deletions(-)

diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index b3021950f12..aef621ddc7d 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1923,6 +1923,7 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s, 
boolean replace, boolean isT
     SqlNode freshness = null;
     SqlRefreshMode refreshMode = null;
     SqlNode asQuery = null;
+    SqlParserPos asQueryKeywordPos = null;
     SqlParserPos pos = startPos;
     boolean isColumnsIdentifiersOnly = false;
     boolean isOrAlter = false;
@@ -2062,24 +2063,27 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s, 
boolean replace, boolean isT
             }
         )
     ]
-    <AS>
+    <AS> { asQueryKeywordPos = getPos(); }
     asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
     {
-        return new SqlCreateOrAlterMaterializedTable(
-            startPos.plus(getPos()),
-            tableName,
-            columnList,
-            constraints,
-            watermark,
-            comment,
-            distribution,
-            partitionColumns,
-            propertyList,
-            (SqlIntervalLiteral) freshness,
-            refreshMode,
-            startMode,
-            asQuery,
-            isOrAlter);
+        final SqlCreateOrAlterMaterializedTable createMaterializedTable =
+            new SqlCreateOrAlterMaterializedTable(
+                startPos.plus(getPos()),
+                tableName,
+                columnList,
+                constraints,
+                watermark,
+                comment,
+                distribution,
+                partitionColumns,
+                propertyList,
+                (SqlIntervalLiteral) freshness,
+                refreshMode,
+                startMode,
+                asQuery,
+                isOrAlter,
+                asQueryKeywordPos);
+        return createMaterializedTable;
     }
 }
 
@@ -2123,6 +2127,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
     SqlNodeList partSpec = SqlNodeList.EMPTY;
     SqlNode freshness = null;
     SqlNode asQuery = null;
+    SqlParserPos asQueryKeywordPos = null;
     SqlIdentifier constraintName;
     AlterTableSchemaContext ctx = new AlterTableSchemaContext();
 }
@@ -2203,13 +2208,16 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
                     propertyKeyList);
             }
         |
-        <AS>
+        <AS> { asQueryKeywordPos = getPos(); }
             asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
             {
-                return new SqlAlterMaterializedTableAsQuery(
-                    startPos.plus(getPos()),
-                    tableIdentifier,
-                    asQuery);
+                final SqlAlterMaterializedTableAsQuery 
alterMaterializedTableAsQuery =
+                    new SqlAlterMaterializedTableAsQuery(
+                        startPos.plus(getPos()),
+                        tableIdentifier,
+                        asQuery,
+                        asQueryKeywordPos);
+                return alterMaterializedTableAsQuery;
             }
         |
         <ADD>
@@ -2427,6 +2435,7 @@ SqlCreate SqlCreateView(Span s, boolean replace, boolean 
isTemporary) : {
     SqlIdentifier viewName;
     SqlCharStringLiteral comment = null;
     SqlNode query;
+    SqlParserPos asQueryKeywordPos = null;
     SqlNodeList fieldList = SqlNodeList.EMPTY;
     boolean ifNotExists = false;
 }
@@ -2443,10 +2452,22 @@ SqlCreate SqlCreateView(Span s, boolean replace, 
boolean isTemporary) : {
             comment = Comment();
         }
     ]
-    <AS>
+    <AS> { asQueryKeywordPos = getPos(); }
     query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
     {
-        return new SqlCreateView(s.pos(), viewName, fieldList, query, replace, 
isTemporary, ifNotExists, comment, null);
+      final SqlCreateView createView =
+              new SqlCreateView(
+                      s.pos(),
+                      viewName,
+                      fieldList,
+                      query,
+                      replace,
+                      isTemporary,
+                      ifNotExists,
+                      comment,
+                      null,
+                      asQueryKeywordPos);
+        return createView;
     }
 }
 
@@ -2472,6 +2493,7 @@ SqlAlterView SqlAlterView() :
   SqlIdentifier viewName;
   SqlIdentifier newViewName;
   SqlNode newQuery;
+  SqlParserPos asQueryKeywordPos = null;
 }
 {
   <ALTER> <VIEW> { startPos = getPos(); }
@@ -2483,10 +2505,12 @@ SqlAlterView SqlAlterView() :
         return new SqlAlterViewRename(startPos.plus(getPos()), viewName, 
newViewName);
       }
   |
-      <AS>
+      <AS> { asQueryKeywordPos = getPos(); }
       newQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
       {
-        return new SqlAlterViewAs(startPos.plus(getPos()), viewName, newQuery);
+        final SqlAlterViewAs alterViewAs =
+                new SqlAlterViewAs(startPos.plus(getPos()), viewName, 
newQuery, asQueryKeywordPos);
+        return alterViewAs;
       }
   )
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlAlterMaterializedTableAsQuery.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlAlterMaterializedTableAsQuery.java
index 53fe5b23c9c..cc57ec30796 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlAlterMaterializedTableAsQuery.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlAlterMaterializedTableAsQuery.java
@@ -36,16 +36,27 @@ public class SqlAlterMaterializedTableAsQuery extends 
SqlAlterMaterializedTable
 
     private final SqlNode asQuery;
 
+    private final SqlParserPos asQueryKeywordPos;
+
     public SqlAlterMaterializedTableAsQuery(
-            SqlParserPos pos, SqlIdentifier tableName, SqlNode asQuery) {
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNode asQuery,
+            SqlParserPos asQueryKeywordPos) {
         super(pos, tableName);
         this.asQuery = asQuery;
+        this.asQueryKeywordPos = asQueryKeywordPos;
     }
 
     public SqlNode getAsQuery() {
         return asQuery;
     }
 
+    /** Returns the parser position of the {@code AS} keyword. */
+    public SqlParserPos getAsQueryKeywordPos() {
+        return asQueryKeywordPos;
+    }
+
     @Override
     public List<SqlNode> getOperandList() {
         return ImmutableNullableList.of(name, asQuery);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
index a1d51cf6a93..f1ee603f60e 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
@@ -71,6 +71,8 @@ public class SqlCreateMaterializedTable extends 
SqlCreateObject implements Exten
 
     private final SqlNode asQuery;
 
+    private final SqlParserPos asQueryKeywordPos;
+
     public SqlCreateMaterializedTable(
             SqlSpecialOperator operator,
             SqlParserPos pos,
@@ -85,7 +87,8 @@ public class SqlCreateMaterializedTable extends 
SqlCreateObject implements Exten
             @Nullable SqlIntervalLiteral freshness,
             @Nullable SqlRefreshMode refreshMode,
             @Nullable SqlStartMode startMode,
-            SqlNode asQuery) {
+            SqlNode asQuery,
+            SqlParserPos asQueryKeywordPos) {
         super(operator, pos, tableName, false, false, false, propertyList, 
comment);
         this.columnList = columnList;
         this.tableConstraints = tableConstraints;
@@ -98,6 +101,7 @@ public class SqlCreateMaterializedTable extends 
SqlCreateObject implements Exten
         this.refreshMode = refreshMode;
         this.startMode = startMode;
         this.asQuery = requireNonNull(asQuery, "asQuery should not be null");
+        this.asQueryKeywordPos = asQueryKeywordPos;
     }
 
     @Override
@@ -153,6 +157,11 @@ public class SqlCreateMaterializedTable extends 
SqlCreateObject implements Exten
         return asQuery;
     }
 
+    /** Returns the parser position of the {@code AS} keyword. */
+    public SqlParserPos getAsQueryKeywordPos() {
+        return asQueryKeywordPos;
+    }
+
     /** Returns the column constraints plus the table constraints. */
     public List<SqlTableConstraint> getFullConstraints() {
         return SqlConstraintValidator.getFullConstraints(tableConstraints, 
columnList);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
index b1859ba6114..6d13aa108ea 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
@@ -57,7 +57,8 @@ public class SqlCreateOrAlterMaterializedTable extends 
SqlCreateMaterializedTabl
             @Nullable SqlRefreshMode refreshMode,
             @Nullable SqlStartMode startMode,
             SqlNode asQuery,
-            boolean isOrAlter) {
+            boolean isOrAlter,
+            SqlParserPos asQueryKeywordPos) {
         super(
                 isOrAlter ? CREATE_OR_ALTER_OPERATOR : CREATE_OPERATOR,
                 pos,
@@ -72,7 +73,8 @@ public class SqlCreateOrAlterMaterializedTable extends 
SqlCreateMaterializedTabl
                 freshness,
                 refreshMode,
                 startMode,
-                asQuery);
+                asQuery,
+                asQueryKeywordPos);
     }
 
     @Override
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlAlterViewAs.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlAlterViewAs.java
index 1a5a4bd724b..ba74abead09 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlAlterViewAs.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlAlterViewAs.java
@@ -35,9 +35,16 @@ public class SqlAlterViewAs extends SqlAlterView {
 
     private final SqlNode newQuery;
 
-    public SqlAlterViewAs(SqlParserPos pos, SqlIdentifier viewIdentifier, 
SqlNode newQuery) {
+    private final SqlParserPos asQueryKeywordPos;
+
+    public SqlAlterViewAs(
+            SqlParserPos pos,
+            SqlIdentifier viewIdentifier,
+            SqlNode newQuery,
+            SqlParserPos asQueryKeywordPos) {
         super(pos, viewIdentifier);
         this.newQuery = newQuery;
+        this.asQueryKeywordPos = asQueryKeywordPos;
     }
 
     @Nonnull
@@ -55,4 +62,9 @@ public class SqlAlterViewAs extends SqlAlterView {
     public SqlNode getNewQuery() {
         return newQuery;
     }
+
+    /** Returns the parser position of the {@code AS} keyword. */
+    public SqlParserPos getAsQueryKeywordPos() {
+        return asQueryKeywordPos;
+    }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlCreateView.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlCreateView.java
index cd0baa8f92c..fc4c754fb59 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlCreateView.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlCreateView.java
@@ -43,6 +43,7 @@ public class SqlCreateView extends SqlCreateObject {
 
     private final SqlNodeList fieldList;
     private final SqlNode query;
+    private final SqlParserPos asQueryKeywordPos;
 
     public SqlCreateView(
             SqlParserPos pos,
@@ -53,10 +54,12 @@ public class SqlCreateView extends SqlCreateObject {
             boolean isTemporary,
             boolean ifNotExists,
             SqlCharStringLiteral comment,
-            SqlNodeList properties) {
+            SqlNodeList properties,
+            SqlParserPos asQueryKeywordPos) {
         super(OPERATOR, pos, viewName, isTemporary, replace, ifNotExists, 
properties, comment);
         this.fieldList = requireNonNull(fieldList, "fieldList should not be 
null");
         this.query = requireNonNull(query, "query should not be null");
+        this.asQueryKeywordPos = asQueryKeywordPos;
     }
 
     @Override
@@ -77,6 +80,11 @@ public class SqlCreateView extends SqlCreateObject {
         return query;
     }
 
+    /** Returns the parser position of the {@code AS} keyword. */
+    public SqlParserPos getAsQueryKeywordPos() {
+        return asQueryKeywordPos;
+    }
+
     @Override
     protected String getScope() {
         return "VIEW";
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
index 6371f7a7778..b45c199fe64 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
@@ -103,7 +103,8 @@ public class ParserImpl implements Parser {
         List<SqlNode> parsed = sqlNodeList.getList();
         Preconditions.checkArgument(parsed.size() == 1, "only single statement 
supported");
         return Collections.singletonList(
-                SqlNodeToOperationConversion.convert(planner, catalogManager, 
parsed.get(0))
+                SqlNodeToOperationConversion.convert(
+                                planner, catalogManager, parsed.get(0), 
statement)
                         .orElseThrow(() -> new TableException("Unsupported 
query: " + statement)));
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
index 05364388fba..fa06ce6703e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
@@ -51,10 +51,15 @@ public class SqlNodeConvertContext implements 
SqlNodeConverter.ConvertContext {
 
     private final FlinkPlannerImpl flinkPlanner;
     private final CatalogManager catalogManager;
+    private final @Nullable String originalSql;
 
-    public SqlNodeConvertContext(FlinkPlannerImpl flinkPlanner, CatalogManager 
catalogManager) {
+    public SqlNodeConvertContext(
+            FlinkPlannerImpl flinkPlanner,
+            CatalogManager catalogManager,
+            @Nullable String originalSql) {
         this.flinkPlanner = flinkPlanner;
         this.catalogManager = catalogManager;
+        this.originalSql = originalSql;
     }
 
     @Override
@@ -110,6 +115,12 @@ public class SqlNodeConvertContext implements 
SqlNodeConverter.ConvertContext {
         return sqlNode.toSqlString(getSqlDialect()).getSql();
     }
 
+    @Override
+    @Nullable
+    public String getStatementText() {
+        return this.originalSql;
+    }
+
     private SqlDialect getSqlDialect() {
         SqlParser.Config parserConfig = 
flinkPlanner.config().getParserConfig();
         return
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index 3609545be94..eb29049dfb2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -151,13 +151,17 @@ import java.util.stream.Collectors;
 public class SqlNodeToOperationConversion {
     private final FlinkPlannerImpl flinkPlanner;
     private final CatalogManager catalogManager;
+    @Nullable private final String originalSql;
 
     // ~ Constructors 
-----------------------------------------------------------
 
     private SqlNodeToOperationConversion(
-            FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager) {
+            FlinkPlannerImpl flinkPlanner,
+            CatalogManager catalogManager,
+            @Nullable String originalSql) {
         this.flinkPlanner = flinkPlanner;
         this.catalogManager = catalogManager;
+        this.originalSql = originalSql;
     }
 
     /**
@@ -168,21 +172,39 @@ public class SqlNodeToOperationConversion {
      * @param flinkPlanner FlinkPlannerImpl to convertCreateTable sql node to 
rel node
      * @param catalogManager CatalogManager to resolve full path for operations
      * @param sqlNode SqlNode to execute on
+     * @param originalSql original SQL statement text, or {@code null} when 
the node has no source
+     *     text (e.g. a synthesized node)
      */
     public static Optional<Operation> convert(
-            FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, 
SqlNode sqlNode) {
-        // validate the query
+            FlinkPlannerImpl flinkPlanner,
+            CatalogManager catalogManager,
+            SqlNode sqlNode,
+            @Nullable String originalSql) {
         final SqlNode validated = flinkPlanner.validate(sqlNode);
-        return convertValidatedSqlNode(flinkPlanner, catalogManager, 
validated);
+        return convertValidatedSqlNode(flinkPlanner, catalogManager, 
validated, originalSql);
+    }
+
+    /**
+     * Converts the given {@link SqlNode} without an original statement text. 
Used for nested
+     * conversions where the node was reached recursively (e.g. the {@code AS} 
query of {@code
+     * CREATE TABLE AS}) and the verbatim source text is neither available nor 
needed.
+     */
+    public static Optional<Operation> convert(
+            FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, 
SqlNode sqlNode) {
+        return convert(flinkPlanner, catalogManager, sqlNode, null);
     }
 
     /** Convert a validated sql node to Operation. */
     private static Optional<Operation> convertValidatedSqlNode(
-            FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, 
SqlNode validated) {
+            FlinkPlannerImpl flinkPlanner,
+            CatalogManager catalogManager,
+            SqlNode validated,
+            @Nullable String statement) {
         beforeConversion();
 
         // delegate conversion to the registered converters first
-        SqlNodeConvertContext context = new 
SqlNodeConvertContext(flinkPlanner, catalogManager);
+        SqlNodeConvertContext context =
+                new SqlNodeConvertContext(flinkPlanner, catalogManager, 
statement);
         Optional<Operation> operation = 
SqlNodeConverters.convertSqlNode(validated, context);
         if (operation.isPresent()) {
             return operation;
@@ -190,7 +212,7 @@ public class SqlNodeToOperationConversion {
 
         // TODO: all the below conversion logic should be migrated to 
SqlNodeConverters
         SqlNodeToOperationConversion converter =
-                new SqlNodeToOperationConversion(flinkPlanner, catalogManager);
+                new SqlNodeToOperationConversion(flinkPlanner, catalogManager, 
statement);
         if (validated instanceof SqlDropCatalog) {
             return Optional.of(converter.convertDropCatalog((SqlDropCatalog) 
validated));
         } else if (validated instanceof SqlLoadModule) {
@@ -258,7 +280,10 @@ public class SqlNodeToOperationConversion {
             return 
Optional.of(converter.convertSqlStatementSet((SqlStatementSet) validated));
         } else if (validated instanceof SqlExecute) {
             return convertValidatedSqlNode(
-                    flinkPlanner, catalogManager, ((SqlExecute) 
validated).getStatement());
+                    flinkPlanner,
+                    catalogManager,
+                    ((SqlExecute) validated).getStatement(),
+                    statement);
         } else if (validated instanceof SqlExecutePlan) {
             return Optional.of(converter.convertExecutePlan((SqlExecutePlan) 
validated));
         } else if (validated instanceof SqlCompilePlan) {
@@ -277,9 +302,8 @@ public class SqlNodeToOperationConversion {
         }
     }
 
-    private static Operation convertValidatedSqlNodeOrFail(
-            FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, 
SqlNode validated) {
-        return convertValidatedSqlNode(flinkPlanner, catalogManager, validated)
+    private Operation convertValidatedSqlNodeOrFail(SqlNode validated) {
+        return convertValidatedSqlNode(flinkPlanner, catalogManager, 
validated, originalSql)
                 .orElseThrow(
                         () ->
                                 new TableException(
@@ -330,9 +354,7 @@ public class SqlNodeToOperationConversion {
                 catalogManager.getTableOrError(identifier).toCatalogTable();
 
         PlannerQueryOperation query =
-                (PlannerQueryOperation)
-                        convertValidatedSqlNodeOrFail(
-                                flinkPlanner, catalogManager, 
insert.getSource());
+                (PlannerQueryOperation) 
convertValidatedSqlNodeOrFail(insert.getSource());
         // TODO calc target column list to index array, currently only simple 
SqlIdentifiers are
         // available, this should be updated after FLINK-31301 fixed
         int[][] columnIndices =
@@ -650,17 +672,13 @@ public class SqlNodeToOperationConversion {
         return new CompilePlanOperation(
                 compilePlan.getPlanFile(),
                 compilePlan.isIfNotExists(),
-                convertValidatedSqlNodeOrFail(
-                        flinkPlanner, catalogManager, 
compilePlan.getOperandList().get(0)));
+                
convertValidatedSqlNodeOrFail(compilePlan.getOperandList().get(0)));
     }
 
     private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan 
compileAndExecutePlan) {
         return new CompileAndExecutePlanOperation(
                 compileAndExecutePlan.getPlanFile(),
-                convertValidatedSqlNodeOrFail(
-                        flinkPlanner,
-                        catalogManager,
-                        compileAndExecutePlan.getOperandList().get(0)));
+                
convertValidatedSqlNodeOrFail(compileAndExecutePlan.getOperandList().get(0)));
     }
 
     private Operation convertShowJobs(SqlShowJobs sqlStopJob) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewAsConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewAsConverter.java
index f4702f05086..8a3924077ee 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewAsConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewAsConverter.java
@@ -46,6 +46,7 @@ public class SqlAlterViewAsConverter implements 
SqlNodeConverter<SqlAlterViewAs>
         CatalogView newView =
                 toCatalogView(
                         newQuery,
+                        alterView.getAsQueryKeywordPos(),
                         Collections.emptyList(),
                         oldView.getOptions(),
                         oldView.getComment(),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
index 5a58b075a66..35ec8dcbb9d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
@@ -47,7 +47,12 @@ public class SqlCreateViewConverter implements 
SqlNodeConverter<SqlCreateView> {
         Map<String, String> viewOptions = sqlCreateView.getProperties();
         CatalogView catalogView =
                 SqlNodeConvertUtils.toCatalogView(
-                        query, viewFields, viewOptions, viewComment, context);
+                        query,
+                        sqlCreateView.getAsQueryKeywordPos(),
+                        viewFields,
+                        viewOptions,
+                        viewComment,
+                        context);
         return new CreateViewOperation(
                 identifier,
                 catalogView,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
index 5300144d80a..f2a6e2595a1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
@@ -34,6 +34,8 @@ import 
org.apache.flink.table.operations.utils.ValidationUtils;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
 import 
org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
 
+import org.apache.flink.shaded.guava33.com.google.common.base.Splitter;
+
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlNode;
@@ -46,14 +48,58 @@ import 
org.apache.calcite.sql.validate.SqlValidatorNamespace;
 import javax.annotation.Nullable;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.stream.Collectors;
 
 /** Utilities for SqlNode conversions. */
-class SqlNodeConvertUtils {
+public class SqlNodeConvertUtils {
+
+    /**
+     * Returns the {@code AS}-query of a VIEW or MATERIALIZED TABLE DDL in 
verbatim shape: the
+     * statement text from just after {@code AS} to its end, trimmed. Slicing 
to the statement end
+     * (the {@code AS}-query is the last clause) keeps a comment after {@code 
AS} and queries whose
+     * node position is narrower than their text, e.g. {@code WITH ... SELECT}.
+     *
+     * @param asQueryKeywordPos parser position of the {@code AS} keyword
+     * @return the verbatim AS-query, or empty when no statement text is 
available
+     */
+    public static Optional<String> extractOriginalAsQueryText(
+            ConvertContext context, SqlParserPos asQueryKeywordPos) {
+        final String statementText = context.getStatementText();
+        if (statementText == null) {
+            return Optional.empty();
+        }
+        return offsetAfter(statementText, asQueryKeywordPos).stream()
+                .mapToObj(start -> statementText.substring(start).strip())
+                .findFirst();
+    }
+
+    /**
+     * Returns the offset of the first character after the given parser 
position, or empty if the
+     * position's end line falls outside {@code statementText}.
+     */
+    private static OptionalInt offsetAfter(String statementText, SqlParserPos 
pos) {
+        final int endLine = pos.getEndLineNum();
+        final int endCol = pos.getEndColumnNum();
+
+        final Iterator<String> iterator = 
Splitter.on('\n').split(statementText).iterator();
+        int currentLine = 0;
+        int lineStartOffset = 0;
+        while (iterator.hasNext()) {
+            final String lineText = iterator.next();
+            currentLine++;
+            if (currentLine == endLine) {
+                return OptionalInt.of(lineStartOffset + endCol);
+            }
+            lineStartOffset += lineText.length() + 1;
+        }
+        return OptionalInt.empty();
+    }
 
     static PlannerQueryOperation toQueryOperation(SqlNode validated, 
ConvertContext context) {
         // transform to a relational tree
@@ -65,17 +111,11 @@ class SqlNodeConvertUtils {
     /** convert the query part of a VIEW statement into a {@link CatalogView}. 
*/
     static CatalogView toCatalogView(
             SqlNode query,
+            SqlParserPos asQueryKeywordPos,
             List<SqlNode> viewFields,
             Map<String, String> viewOptions,
             String viewComment,
             ConvertContext context) {
-        // Put the sql string unparse (getQuotedSqlString()) in front of
-        // the node conversion (toQueryOperation()),
-        // because before Calcite 1.22.0, during sql-to-rel conversion, the 
SqlWindow
-        // bounds state would be mutated as default when they are null (not 
specified).
-
-        // This bug is fixed in CALCITE-3877 of Calcite 1.23.0.
-        String originalQuery = context.toQuotedSqlString(query);
         SqlNode validateQuery = context.getSqlValidator().validate(query);
         // FLINK-38950: SqlValidator.validate() mutates its input parameter. 
Always use the
         // returned validateQuery instead of the mutated query for all 
subsequent operations.
@@ -110,6 +150,10 @@ class SqlNodeConvertUtils {
             schema = ResolvedSchema.physical(aliasFieldNames, 
schema.getColumnDataTypes());
         }
 
+        final String originalQuery =
+                extractOriginalAsQueryText(context, asQueryKeywordPos)
+                        .orElse(context.toQuotedSqlString(query));
+
         return new ResolvedCatalogView(
                 CatalogView.of(
                         Schema.newBuilder().fromResolvedSchema(schema).build(),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
index 23f8f1716b2..360d6c38e01 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
@@ -100,6 +100,13 @@ public interface SqlNodeConverter<S extends SqlNode> {
         /** Convert the given {@param sqlNode} into a quoted SQL string. */
         String toQuotedSqlString(SqlNode sqlNode);
 
+        /**
+         * Returns the original SQL statement text being converted, or {@code 
null} when the node
+         * was synthesized (e.g. produced by an internal rewrite) and has no 
source text.
+         */
+        @Nullable
+        String getStatementText();
+
         /**
          * Expands identifiers in a given SQL string.
          *
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
index b788f0560e3..3ed73870835 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.catalog.StartMode;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import 
org.apache.flink.table.planner.operations.converters.SqlNodeConvertUtils;
 import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
 import org.apache.flink.table.planner.utils.MaterializedTableUtils;
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
@@ -141,10 +142,17 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
         return 
MaterializedTableUtils.fromLogicalRefreshModeToRefreshMode(logicalRefreshMode);
     }
 
+    /**
+     * Returns the user's original {@code AS} query text, sliced verbatim from 
the statement so
+     * formatting and identifier casing are preserved (e.g. {@code int} is not 
normalized to {@code
+     * INTEGER}). A comment placed between {@code AS} and the query is kept; 
the {@code AS} keyword
+     * itself is excluded.
+     */
     protected final String getDerivedOriginalQuery(
             T sqlCreateMaterializedTable, ConvertContext context) {
-        SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
-        return context.toQuotedSqlString(selectQuery);
+        return SqlNodeConvertUtils.extractOriginalAsQueryText(
+                        context, 
sqlCreateMaterializedTable.getAsQueryKeywordPos())
+                
.orElse(context.toQuotedSqlString(sqlCreateMaterializedTable.getAsQuery()));
     }
 
     protected final String getDerivedExpandedQuery(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
index b48d472593a..3860a3f93c9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.operations.Operation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import 
org.apache.flink.table.planner.operations.converters.SqlNodeConvertUtils;
 import org.apache.flink.table.planner.utils.MaterializedTableUtils;
 
 import org.apache.calcite.sql.SqlNode;
@@ -54,13 +55,12 @@ public class SqlAlterMaterializedTableAsQueryConverter
             SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, 
ConvertContext context) {
         return oldTable -> {
             // Validate and extract schema from query
-            String originalQuery = 
context.toQuotedSqlString(sqlAlterTableAsQuery.getAsQuery());
-            SqlNode validatedQuery =
-                    
context.getSqlValidator().validate(sqlAlterTableAsQuery.getAsQuery());
-            String definitionQuery = context.toQuotedSqlString(validatedQuery);
+            SqlNode asQuery = sqlAlterTableAsQuery.getAsQuery();
+            SqlNode validatedQuery = 
context.getSqlValidator().validate(asQuery);
+            String expandedQuery = context.toQuotedSqlString(validatedQuery);
             PlannerQueryOperation queryOperation =
                     new PlannerQueryOperation(
-                            context.toRelRoot(validatedQuery).project(), () -> 
definitionQuery);
+                            context.toRelRoot(validatedQuery).project(), () -> 
expandedQuery);
 
             ResolvedSchema oldSchema = oldTable.getResolvedSchema();
             ResolvedSchema newSchema = queryOperation.getResolvedSchema();
@@ -78,7 +78,11 @@ public class SqlAlterMaterializedTableAsQueryConverter
                                     + "consider using CREATE OR ALTER 
MATERIALIZED TABLE instead");
                 }
             }
-            tableChanges.add(TableChange.modifyDefinitionQuery(originalQuery, 
definitionQuery));
+            String originalQuery =
+                    SqlNodeConvertUtils.extractOriginalAsQueryText(
+                                    context, 
sqlAlterTableAsQuery.getAsQueryKeywordPos())
+                            .orElse(context.toQuotedSqlString(asQuery));
+            tableChanges.add(TableChange.modifyDefinitionQuery(originalQuery, 
expandedQuery));
             return tableChanges;
         };
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
index d610569bf95..9c666397c2d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.operations.CreateTableASOperation;
+import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.parse.CalciteParser;
@@ -422,6 +423,15 @@ class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTestBase {
                                                         .build()))));
     }
 
+    @Test
+    void testExplainCreateTableAs() {
+        final Operation operation = parse("EXPLAIN CREATE TABLE myTable123 AS 
SELECT 123");
+
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
+        assertThat(((ExplainOperation) operation).getChild())
+                .isInstanceOf(CreateTableASOperation.class);
+    }
+
     private Operation parseAndConvert(String sql) {
         final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 3344616c798..2c5dd5d0d71 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -1459,7 +1459,7 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                                 + "  `user_id` INT NOT NULL,\n"
                                 + "  CONSTRAINT `PK_user_id` PRIMARY KEY 
(`user_id`) NOT ENFORCED\n"
                                 + "), comment='null', distribution=null, 
partitionKeys=[], "
-                                + "options={format=debezium-json}, 
snapshot=null, originalQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', 
expandedQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+                                + "options={format=debezium-json}, 
snapshot=null, originalQuery='SELECT 1 as shop_id, 2 as user_id', 
expandedQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
                                 + "freshness=INTERVAL '30' SECOND, 
logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, "
                                 + "refreshStatus=INITIALIZING, 
refreshHandlerDescription='null', serializedRefreshHandler=null}, 
resolvedSchema=(\n"
                                 + "  `shop_id` INT,\n"
@@ -1491,7 +1491,7 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                                 + "  `user_id` INT NOT NULL,\n"
                                 + "  CONSTRAINT `PK_user_id` PRIMARY KEY 
(`user_id`) NOT ENFORCED\n"
                                 + "), comment='null', distribution=DISTRIBUTED 
BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], "
-                                + "options={format=debezium-json}, 
snapshot=null, originalQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', 
expandedQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+                                + "options={format=debezium-json}, 
snapshot=null, originalQuery='SELECT 1 as shop_id, 2 as user_id', 
expandedQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
                                 + "freshness=INTERVAL '30' SECOND, 
logicalRefreshMode=AUTOMATIC, refreshMode=null, "
                                 + "refreshStatus=INITIALIZING, 
refreshHandlerDescription='null', serializedRefreshHandler=null}, 
resolvedSchema=(\n"
                                 + "  `shop_id` INT NOT NULL,\n"
@@ -2591,6 +2591,43 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
         assertThat(operation).isInstanceOf(CreateViewOperation.class);
     }
 
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("viewOriginalQueryCases")
+    void testCreateViewOriginalQuery(String name, String sql, String 
expectedOriginalQuery) {
+        final Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(CreateViewOperation.class);
+        assertThat(((CreateViewOperation) 
operation).getCatalogView().getOriginalQuery())
+                .isEqualTo(expectedOriginalQuery);
+    }
+
+    private static Stream<Arguments> viewOriginalQueryCases() {
+        return Stream.of(
+                // The AS query is sliced verbatim, preserving the user's 
wording.
+                viewAsQueryCase(
+                        "AS query with a leading comment is kept verbatim",
+                        "/* keep me */\nselect 1"),
+                viewAsQueryCase(
+                        "trailing comment after the AS query is kept",
+                        "select 1 -- trailing comment"),
+                // Everything before AS must be excluded and must not shift 
the slice.
+                Arguments.of(
+                        "comment before AS is dropped",
+                        "CREATE VIEW v1\n-- note before AS\nAS SELECT 1",
+                        "SELECT 1"),
+                Arguments.of(
+                        "As-query is stripped and comments are kept",
+                        "CREATE VIEW v1 AS \n\n\n\n--keep\n--me\nSELECT * FROM 
t1\n\n",
+                        "--keep\n--me\nSELECT * FROM t1"),
+                Arguments.of(
+                        "column list and COMMENT before AS are dropped",
+                        "CREATE VIEW v1 (w, x, y, z)\nCOMMENT 'a view'\nAS 
SELECT a, b, c, d FROM t1",
+                        "SELECT a, b, c, d FROM t1"));
+    }
+
+    private static Arguments viewAsQueryCase(String name, String query) {
+        return Arguments.of(name, "CREATE VIEW v1 AS " + query, query);
+    }
+
     @Test
     void testAlterTableAddPartitions() throws Exception {
         prepareTable("tb1", true, true, 0);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 6bb5fc5503d..6e99aa29ed3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -66,6 +66,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -320,6 +321,66 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                 + "LATERAL 
TABLE(`builtin`.`default`.`myFunc`(`b`)) AS `T` (`f1`, `f2`)");
     }
 
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("originalQueryCases")
+    void testOriginalQuery(String name, String sql, String 
expectedOriginalQuery) {
+        final String originalQuery =
+                createMaterializedTableOperation(sql)
+                        .getCatalogMaterializedTable()
+                        .getOriginalQuery();
+        assertThat(originalQuery).isEqualTo(expectedOriginalQuery);
+    }
+
+    private static Stream<Arguments> originalQueryCases() {
+        final String query = "SELECT * FROM t1";
+        return Stream.of(
+                // The AS query is sliced verbatim from just after AS to the 
end of the statement,
+                // preserving the user's wording (case, formatting, comments).
+                asQueryCase(
+                        "AS query with a leading comment is kept verbatim",
+                        "/* keep me */\nselect a, b from t1"),
+                asQueryCase(
+                        "trailing comment after the AS query is kept",
+                        "select a, b from t1 -- trailing comment"),
+                asQueryCase(
+                        "WITH query is kept whole (its node position is 
narrower than its text)",
+                        "WITH q AS (SELECT a FROM t1) SELECT a FROM q"),
+                Arguments.of(
+                        "As-query is stripped and comments are kept",
+                        "CREATE MATERIALIZED TABLE mtbl1 AS 
\n\n\n\n--keep\n--me\nSELECT * FROM t1\n\n",
+                        "--keep\n--me\nSELECT * FROM t1"),
+                // Everything before AS must be excluded and must not shift 
the slice, regardless of
+                // comments, identifier quoting, or a multi-line clause prefix.
+                Arguments.of(
+                        "comment before AS is dropped",
+                        "CREATE MATERIALIZED TABLE mtbl1\n-- note before 
AS\nAS " + query,
+                        query),
+                Arguments.of(
+                        "escaped-backtick identifier and COMMENT before AS are 
dropped",
+                        "CREATE MATERIALIZED TABLE `m``tbl` COMMENT 'a''b' AS 
" + query,
+                        query),
+                Arguments.of(
+                        "multi-line DDL prefix before AS is dropped",
+                        "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                                + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT 
ENFORCED\n"
+                                + ")\n"
+                                + "COMMENT 'materialized table comment'\n"
+                                + "PARTITIONED BY (a)\n"
+                                + "WITH (\n"
+                                + "  'connector' = 'filesystem',\n"
+                                + "  'format' = 'json'\n"
+                                + ")\n"
+                                + "FRESHNESS = INTERVAL '30' SECOND\n"
+                                + "REFRESH_MODE = FULL\n"
+                                + "AS "
+                                + query,
+                        query));
+    }
+
+    private static Arguments asQueryCase(String name, String query) {
+        return Arguments.of(name, "CREATE MATERIALIZED TABLE mtbl1 AS " + 
query, query);
+    }
+
     @Test
     void testCreateMaterializedTableWithUDTFQueryWithoutAlias() {
         functionCatalog.registerCatalogFunction(
@@ -572,7 +633,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
     @Test
     void testAlterMaterializedTableAsQuery() throws TableNotExistException {
         String sql =
-                "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, d as 
e, cast('123' as string) as f FROM t3";
+                "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, d as 
e, cast('123' as string) as f\n FROM\n t3";
         Operation operation = parse(sql);
 
         
assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class);
@@ -584,7 +645,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         TableChange.add(Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
                         TableChange.add(Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
                         TableChange.modifyDefinitionQuery(
-                                "SELECT `a`, `b`, `c`, `d`, `d` AS `e`, 
CAST('123' AS STRING) AS `f`\nFROM `t3`",
+                                "SELECT a, b, c, d, d as e, cast('123' as 
string) as f\n FROM\n t3",
                                 "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
                                         + "FROM `builtin`.`default`.`t3` AS 
`t3`"));
         assertThat(operation.asSummaryString())
@@ -638,7 +699,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                 .containsExactly(
                         TableChange.add(Column.physical("a0", 
DataTypes.INT())),
                         TableChange.modifyDefinitionQuery(
-                                "SELECT `a`, `b`, `c`, `d`, `c` AS `a`\nFROM 
`t3`",
+                                "SELECT a, b, c, d, c as a FROM t3",
                                 "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`c` AS `a`\n"
                                         + "FROM `builtin`.`default`.`t3` AS 
`t3`"));
     }
@@ -1373,7 +1434,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                 .comment("materialized table comment")
                 .options(Map.of("connector", "filesystem", "format", "json"))
                 .partitionKeys(List.of("a", "d"))
-                .originalQuery("SELECT *\nFROM `t1`")
+                .originalQuery("SELECT * FROM t1")
                 .expandedQuery(
                         "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
                                 + "FROM `builtin`.`default`.`t1` AS `t1`");
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
index 9ed19411a7a..0814e1556a5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
@@ -131,14 +131,14 @@ class SqlNodeToOperationConversionTestBase {
 
     protected Operation parse(String sql, FlinkPlannerImpl planner, 
CalciteParser parser) {
         SqlNode node = parser.parse(sql);
-        return SqlNodeToOperationConversion.convert(planner, catalogManager, 
node).get();
+        return SqlNodeToOperationConversion.convert(planner, catalogManager, 
node, sql).get();
     }
 
     protected Operation parse(String sql) {
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
         SqlNode node = parser.parse(sql);
-        return SqlNodeToOperationConversion.convert(planner, catalogManager, 
node).get();
+        return SqlNodeToOperationConversion.convert(planner, catalogManager, 
node, sql).get();
     }
 
     protected FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
index 4082919ef02..1eeeeafc627 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
@@ -92,7 +92,7 @@ class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
                         TableChange.add(Column.physical("f", DataTypes.INT())),
                         TableChange.dropConstraint("ct1"),
                         TableChange.modifyDefinitionQuery(
-                                "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS 
`f`\nFROM `t1`",
+                                "SELECT a, b, c, d, a as `a1`, 3 as f FROM t1",
                                 "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`, `t1`.`a` AS `a1`, 3 AS `f`\n"
                                         + "FROM `builtin`.`default`.`t1` AS 
`t1`"),
                         TableChange.reset("connector"),
@@ -112,7 +112,7 @@ class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
                         // No explicit schema, so nullable will be used
                         TableChange.add(Column.physical("a1", 
DataTypes.BIGINT())),
                         TableChange.modifyDefinitionQuery(
-                                "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM 
`t1`",
+                                "SELECT a, b, c, d, a as `a1` FROM t1",
                                 "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`, `t1`.`a` AS `a1`\n"
                                         + "FROM `builtin`.`default`.`t1` AS 
`t1`"),
                         TableChange.reset("connector"),
@@ -201,7 +201,7 @@ class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
                         TableChange.add(Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
                         TableChange.add(Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
                         TableChange.modifyDefinitionQuery(
-                                "SELECT `a`, `b`, `c`, `d`, `d` AS `e`, 
CAST('123' AS STRING) AS `f`\nFROM `t1`",
+                                "SELECT a, b, c, d, d as e, cast('123' as 
string) as f FROM t1",
                                 "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`, `t1`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
                                         + "FROM `builtin`.`default`.`t1` AS 
`t1`"),
                         TableChange.set("format", "json2"),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index 524bdcce224..8ed698f99d8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ReplaceTableAsOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -280,6 +281,15 @@ class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConversionTe
         testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, 
Collections.emptyList());
     }
 
+    @Test
+    void testExplainReplaceTableAs() {
+        final Operation operation = parse("EXPLAIN REPLACE TABLE myTable123 AS 
SELECT 123");
+
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
+        assertThat(((ExplainOperation) operation).getChild())
+                .isInstanceOf(ReplaceTableAsOperation.class);
+    }
+
     private void testCommonReplaceTableAs(
             String sql, String tableName, @Nullable String tableComment) {
         testCommonReplaceTableAs(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index f8546d4a678..b0c0dd4385e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -1234,7 +1234,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) 
extends TableITCaseBase {
       .get()
       .getTable(objectPath)
       .asInstanceOf[CatalogView]
-    assertThat(view.getOriginalQuery).isEqualTo("SELECT `b`\nFROM `T`")
+    assertThat(view.getOriginalQuery).isEqualTo("SELECT b FROM T")
   }
 
   @TestTemplate

Reply via email to