This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 80189e470f4 [FLINK-39799][table] Preserve user-typed query text for
materialized table and view definitions
80189e470f4 is described below
commit 80189e470f462ec02ee6aeafca31b8ac15efd439
Author: Ramin Gharib <[email protected]>
AuthorDate: Fri Jun 5 17:33:34 2026 +0200
[FLINK-39799][table] Preserve user-typed query text for materialized table
and view definitions
---
.../src/main/codegen/includes/parserImpls.ftl | 74 ++++++++++++++--------
.../SqlAlterMaterializedTableAsQuery.java | 13 +++-
.../SqlCreateMaterializedTable.java | 11 +++-
.../SqlCreateOrAlterMaterializedTable.java | 6 +-
.../flink/sql/parser/ddl/view/SqlAlterViewAs.java | 14 +++-
.../flink/sql/parser/ddl/view/SqlCreateView.java | 10 ++-
.../flink/table/planner/delegation/ParserImpl.java | 3 +-
.../planner/operations/SqlNodeConvertContext.java | 13 +++-
.../operations/SqlNodeToOperationConversion.java | 58 +++++++++++------
.../converters/SqlAlterViewAsConverter.java | 1 +
.../converters/SqlCreateViewConverter.java | 7 +-
.../operations/converters/SqlNodeConvertUtils.java | 60 +++++++++++++++---
.../operations/converters/SqlNodeConverter.java | 7 ++
.../AbstractCreateMaterializedTableConverter.java | 12 +++-
.../SqlAlterMaterializedTableAsQueryConverter.java | 16 +++--
.../operations/SqlCTASNodeToOperationTest.java | 10 +++
.../operations/SqlDdlToOperationConverterTest.java | 41 +++++++++++-
...erializedTableNodeToOperationConverterTest.java | 69 ++++++++++++++++++--
.../SqlNodeToOperationConversionTestBase.java | 4 +-
...reateOrAlterMaterializedTableConverterTest.java | 6 +-
.../SqlRTASNodeToOperationConverterTest.java | 10 +++
.../table/planner/catalog/CatalogTableITCase.scala | 2 +-
22 files changed, 365 insertions(+), 82 deletions(-)
diff --git
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index b3021950f12..aef621ddc7d 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1923,6 +1923,7 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s,
boolean replace, boolean isT
SqlNode freshness = null;
SqlRefreshMode refreshMode = null;
SqlNode asQuery = null;
+ SqlParserPos asQueryKeywordPos = null;
SqlParserPos pos = startPos;
boolean isColumnsIdentifiersOnly = false;
boolean isOrAlter = false;
@@ -2062,24 +2063,27 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s,
boolean replace, boolean isT
}
)
]
- <AS>
+ <AS> { asQueryKeywordPos = getPos(); }
asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
- return new SqlCreateOrAlterMaterializedTable(
- startPos.plus(getPos()),
- tableName,
- columnList,
- constraints,
- watermark,
- comment,
- distribution,
- partitionColumns,
- propertyList,
- (SqlIntervalLiteral) freshness,
- refreshMode,
- startMode,
- asQuery,
- isOrAlter);
+ final SqlCreateOrAlterMaterializedTable createMaterializedTable =
+ new SqlCreateOrAlterMaterializedTable(
+ startPos.plus(getPos()),
+ tableName,
+ columnList,
+ constraints,
+ watermark,
+ comment,
+ distribution,
+ partitionColumns,
+ propertyList,
+ (SqlIntervalLiteral) freshness,
+ refreshMode,
+ startMode,
+ asQuery,
+ isOrAlter,
+ asQueryKeywordPos);
+ return createMaterializedTable;
}
}
@@ -2123,6 +2127,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
SqlNodeList partSpec = SqlNodeList.EMPTY;
SqlNode freshness = null;
SqlNode asQuery = null;
+ SqlParserPos asQueryKeywordPos = null;
SqlIdentifier constraintName;
AlterTableSchemaContext ctx = new AlterTableSchemaContext();
}
@@ -2203,13 +2208,16 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
propertyKeyList);
}
|
- <AS>
+ <AS> { asQueryKeywordPos = getPos(); }
asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
- return new SqlAlterMaterializedTableAsQuery(
- startPos.plus(getPos()),
- tableIdentifier,
- asQuery);
+ final SqlAlterMaterializedTableAsQuery
alterMaterializedTableAsQuery =
+ new SqlAlterMaterializedTableAsQuery(
+ startPos.plus(getPos()),
+ tableIdentifier,
+ asQuery,
+ asQueryKeywordPos);
+ return alterMaterializedTableAsQuery;
}
|
<ADD>
@@ -2427,6 +2435,7 @@ SqlCreate SqlCreateView(Span s, boolean replace, boolean
isTemporary) : {
SqlIdentifier viewName;
SqlCharStringLiteral comment = null;
SqlNode query;
+ SqlParserPos asQueryKeywordPos = null;
SqlNodeList fieldList = SqlNodeList.EMPTY;
boolean ifNotExists = false;
}
@@ -2443,10 +2452,22 @@ SqlCreate SqlCreateView(Span s, boolean replace,
boolean isTemporary) : {
comment = Comment();
}
]
- <AS>
+ <AS> { asQueryKeywordPos = getPos(); }
query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
- return new SqlCreateView(s.pos(), viewName, fieldList, query, replace,
isTemporary, ifNotExists, comment, null);
+ final SqlCreateView createView =
+ new SqlCreateView(
+ s.pos(),
+ viewName,
+ fieldList,
+ query,
+ replace,
+ isTemporary,
+ ifNotExists,
+ comment,
+ null,
+ asQueryKeywordPos);
+ return createView;
}
}
@@ -2472,6 +2493,7 @@ SqlAlterView SqlAlterView() :
SqlIdentifier viewName;
SqlIdentifier newViewName;
SqlNode newQuery;
+ SqlParserPos asQueryKeywordPos = null;
}
{
<ALTER> <VIEW> { startPos = getPos(); }
@@ -2483,10 +2505,12 @@ SqlAlterView SqlAlterView() :
return new SqlAlterViewRename(startPos.plus(getPos()), viewName,
newViewName);
}
|
- <AS>
+ <AS> { asQueryKeywordPos = getPos(); }
newQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
- return new SqlAlterViewAs(startPos.plus(getPos()), viewName, newQuery);
+ final SqlAlterViewAs alterViewAs =
+ new SqlAlterViewAs(startPos.plus(getPos()), viewName,
newQuery, asQueryKeywordPos);
+ return alterViewAs;
}
)
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlAlterMaterializedTableAsQuery.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlAlterMaterializedTableAsQuery.java
index 53fe5b23c9c..cc57ec30796 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlAlterMaterializedTableAsQuery.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlAlterMaterializedTableAsQuery.java
@@ -36,16 +36,27 @@ public class SqlAlterMaterializedTableAsQuery extends
SqlAlterMaterializedTable
private final SqlNode asQuery;
+ private final SqlParserPos asQueryKeywordPos;
+
public SqlAlterMaterializedTableAsQuery(
- SqlParserPos pos, SqlIdentifier tableName, SqlNode asQuery) {
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNode asQuery,
+ SqlParserPos asQueryKeywordPos) {
super(pos, tableName);
this.asQuery = asQuery;
+ this.asQueryKeywordPos = asQueryKeywordPos;
}
public SqlNode getAsQuery() {
return asQuery;
}
+ /** Returns the parser position of the {@code AS} keyword. */
+ public SqlParserPos getAsQueryKeywordPos() {
+ return asQueryKeywordPos;
+ }
+
@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(name, asQuery);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
index a1d51cf6a93..f1ee603f60e 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
@@ -71,6 +71,8 @@ public class SqlCreateMaterializedTable extends
SqlCreateObject implements Exten
private final SqlNode asQuery;
+ private final SqlParserPos asQueryKeywordPos;
+
public SqlCreateMaterializedTable(
SqlSpecialOperator operator,
SqlParserPos pos,
@@ -85,7 +87,8 @@ public class SqlCreateMaterializedTable extends
SqlCreateObject implements Exten
@Nullable SqlIntervalLiteral freshness,
@Nullable SqlRefreshMode refreshMode,
@Nullable SqlStartMode startMode,
- SqlNode asQuery) {
+ SqlNode asQuery,
+ SqlParserPos asQueryKeywordPos) {
super(operator, pos, tableName, false, false, false, propertyList,
comment);
this.columnList = columnList;
this.tableConstraints = tableConstraints;
@@ -98,6 +101,7 @@ public class SqlCreateMaterializedTable extends
SqlCreateObject implements Exten
this.refreshMode = refreshMode;
this.startMode = startMode;
this.asQuery = requireNonNull(asQuery, "asQuery should not be null");
+ this.asQueryKeywordPos = asQueryKeywordPos;
}
@Override
@@ -153,6 +157,11 @@ public class SqlCreateMaterializedTable extends
SqlCreateObject implements Exten
return asQuery;
}
+ /** Returns the parser position of the {@code AS} keyword. */
+ public SqlParserPos getAsQueryKeywordPos() {
+ return asQueryKeywordPos;
+ }
+
/** Returns the column constraints plus the table constraints. */
public List<SqlTableConstraint> getFullConstraints() {
return SqlConstraintValidator.getFullConstraints(tableConstraints,
columnList);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
index b1859ba6114..6d13aa108ea 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
@@ -57,7 +57,8 @@ public class SqlCreateOrAlterMaterializedTable extends
SqlCreateMaterializedTabl
@Nullable SqlRefreshMode refreshMode,
@Nullable SqlStartMode startMode,
SqlNode asQuery,
- boolean isOrAlter) {
+ boolean isOrAlter,
+ SqlParserPos asQueryKeywordPos) {
super(
isOrAlter ? CREATE_OR_ALTER_OPERATOR : CREATE_OPERATOR,
pos,
@@ -72,7 +73,8 @@ public class SqlCreateOrAlterMaterializedTable extends
SqlCreateMaterializedTabl
freshness,
refreshMode,
startMode,
- asQuery);
+ asQuery,
+ asQueryKeywordPos);
}
@Override
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlAlterViewAs.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlAlterViewAs.java
index 1a5a4bd724b..ba74abead09 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlAlterViewAs.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlAlterViewAs.java
@@ -35,9 +35,16 @@ public class SqlAlterViewAs extends SqlAlterView {
private final SqlNode newQuery;
- public SqlAlterViewAs(SqlParserPos pos, SqlIdentifier viewIdentifier,
SqlNode newQuery) {
+ private final SqlParserPos asQueryKeywordPos;
+
+ public SqlAlterViewAs(
+ SqlParserPos pos,
+ SqlIdentifier viewIdentifier,
+ SqlNode newQuery,
+ SqlParserPos asQueryKeywordPos) {
super(pos, viewIdentifier);
this.newQuery = newQuery;
+ this.asQueryKeywordPos = asQueryKeywordPos;
}
@Nonnull
@@ -55,4 +62,9 @@ public class SqlAlterViewAs extends SqlAlterView {
public SqlNode getNewQuery() {
return newQuery;
}
+
+ /** Returns the parser position of the {@code AS} keyword. */
+ public SqlParserPos getAsQueryKeywordPos() {
+ return asQueryKeywordPos;
+ }
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlCreateView.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlCreateView.java
index cd0baa8f92c..fc4c754fb59 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlCreateView.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/view/SqlCreateView.java
@@ -43,6 +43,7 @@ public class SqlCreateView extends SqlCreateObject {
private final SqlNodeList fieldList;
private final SqlNode query;
+ private final SqlParserPos asQueryKeywordPos;
public SqlCreateView(
SqlParserPos pos,
@@ -53,10 +54,12 @@ public class SqlCreateView extends SqlCreateObject {
boolean isTemporary,
boolean ifNotExists,
SqlCharStringLiteral comment,
- SqlNodeList properties) {
+ SqlNodeList properties,
+ SqlParserPos asQueryKeywordPos) {
super(OPERATOR, pos, viewName, isTemporary, replace, ifNotExists,
properties, comment);
this.fieldList = requireNonNull(fieldList, "fieldList should not be
null");
this.query = requireNonNull(query, "query should not be null");
+ this.asQueryKeywordPos = asQueryKeywordPos;
}
@Override
@@ -77,6 +80,11 @@ public class SqlCreateView extends SqlCreateObject {
return query;
}
+ /** Returns the parser position of the {@code AS} keyword. */
+ public SqlParserPos getAsQueryKeywordPos() {
+ return asQueryKeywordPos;
+ }
+
@Override
protected String getScope() {
return "VIEW";
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
index 6371f7a7778..b45c199fe64 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
@@ -103,7 +103,8 @@ public class ParserImpl implements Parser {
List<SqlNode> parsed = sqlNodeList.getList();
Preconditions.checkArgument(parsed.size() == 1, "only single statement
supported");
return Collections.singletonList(
- SqlNodeToOperationConversion.convert(planner, catalogManager,
parsed.get(0))
+ SqlNodeToOperationConversion.convert(
+ planner, catalogManager, parsed.get(0),
statement)
.orElseThrow(() -> new TableException("Unsupported
query: " + statement)));
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
index 05364388fba..fa06ce6703e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
@@ -51,10 +51,15 @@ public class SqlNodeConvertContext implements
SqlNodeConverter.ConvertContext {
private final FlinkPlannerImpl flinkPlanner;
private final CatalogManager catalogManager;
+ private final @Nullable String originalSql;
- public SqlNodeConvertContext(FlinkPlannerImpl flinkPlanner, CatalogManager
catalogManager) {
+ public SqlNodeConvertContext(
+ FlinkPlannerImpl flinkPlanner,
+ CatalogManager catalogManager,
+ @Nullable String originalSql) {
this.flinkPlanner = flinkPlanner;
this.catalogManager = catalogManager;
+ this.originalSql = originalSql;
}
@Override
@@ -110,6 +115,12 @@ public class SqlNodeConvertContext implements
SqlNodeConverter.ConvertContext {
return sqlNode.toSqlString(getSqlDialect()).getSql();
}
+ @Override
+ @Nullable
+ public String getStatementText() {
+ return this.originalSql;
+ }
+
private SqlDialect getSqlDialect() {
SqlParser.Config parserConfig =
flinkPlanner.config().getParserConfig();
return
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index 3609545be94..eb29049dfb2 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -151,13 +151,17 @@ import java.util.stream.Collectors;
public class SqlNodeToOperationConversion {
private final FlinkPlannerImpl flinkPlanner;
private final CatalogManager catalogManager;
+ @Nullable private final String originalSql;
// ~ Constructors
-----------------------------------------------------------
private SqlNodeToOperationConversion(
- FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager) {
+ FlinkPlannerImpl flinkPlanner,
+ CatalogManager catalogManager,
+ @Nullable String originalSql) {
this.flinkPlanner = flinkPlanner;
this.catalogManager = catalogManager;
+ this.originalSql = originalSql;
}
/**
@@ -168,21 +172,39 @@ public class SqlNodeToOperationConversion {
* @param flinkPlanner FlinkPlannerImpl to convertCreateTable sql node to
rel node
* @param catalogManager CatalogManager to resolve full path for operations
* @param sqlNode SqlNode to execute on
+ * @param originalSql original SQL statement text, or {@code null} when
the node has no source
+ * text (e.g. a synthesized node)
*/
public static Optional<Operation> convert(
- FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager,
SqlNode sqlNode) {
- // validate the query
+ FlinkPlannerImpl flinkPlanner,
+ CatalogManager catalogManager,
+ SqlNode sqlNode,
+ @Nullable String originalSql) {
final SqlNode validated = flinkPlanner.validate(sqlNode);
- return convertValidatedSqlNode(flinkPlanner, catalogManager,
validated);
+ return convertValidatedSqlNode(flinkPlanner, catalogManager,
validated, originalSql);
+ }
+
+ /**
+ * Converts the given {@link SqlNode} without an original statement text.
Used for nested
+ * conversions where the node was reached recursively (e.g. the {@code AS}
query of {@code
+ * CREATE TABLE AS}) and the verbatim source text is neither available nor
needed.
+ */
+ public static Optional<Operation> convert(
+ FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager,
SqlNode sqlNode) {
+ return convert(flinkPlanner, catalogManager, sqlNode, null);
}
/** Convert a validated sql node to Operation. */
private static Optional<Operation> convertValidatedSqlNode(
- FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager,
SqlNode validated) {
+ FlinkPlannerImpl flinkPlanner,
+ CatalogManager catalogManager,
+ SqlNode validated,
+ @Nullable String statement) {
beforeConversion();
// delegate conversion to the registered converters first
- SqlNodeConvertContext context = new
SqlNodeConvertContext(flinkPlanner, catalogManager);
+ SqlNodeConvertContext context =
+ new SqlNodeConvertContext(flinkPlanner, catalogManager,
statement);
Optional<Operation> operation =
SqlNodeConverters.convertSqlNode(validated, context);
if (operation.isPresent()) {
return operation;
@@ -190,7 +212,7 @@ public class SqlNodeToOperationConversion {
// TODO: all the below conversion logic should be migrated to
SqlNodeConverters
SqlNodeToOperationConversion converter =
- new SqlNodeToOperationConversion(flinkPlanner, catalogManager);
+ new SqlNodeToOperationConversion(flinkPlanner, catalogManager,
statement);
if (validated instanceof SqlDropCatalog) {
return Optional.of(converter.convertDropCatalog((SqlDropCatalog)
validated));
} else if (validated instanceof SqlLoadModule) {
@@ -258,7 +280,10 @@ public class SqlNodeToOperationConversion {
return
Optional.of(converter.convertSqlStatementSet((SqlStatementSet) validated));
} else if (validated instanceof SqlExecute) {
return convertValidatedSqlNode(
- flinkPlanner, catalogManager, ((SqlExecute)
validated).getStatement());
+ flinkPlanner,
+ catalogManager,
+ ((SqlExecute) validated).getStatement(),
+ statement);
} else if (validated instanceof SqlExecutePlan) {
return Optional.of(converter.convertExecutePlan((SqlExecutePlan)
validated));
} else if (validated instanceof SqlCompilePlan) {
@@ -277,9 +302,8 @@ public class SqlNodeToOperationConversion {
}
}
- private static Operation convertValidatedSqlNodeOrFail(
- FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager,
SqlNode validated) {
- return convertValidatedSqlNode(flinkPlanner, catalogManager, validated)
+ private Operation convertValidatedSqlNodeOrFail(SqlNode validated) {
+ return convertValidatedSqlNode(flinkPlanner, catalogManager,
validated, originalSql)
.orElseThrow(
() ->
new TableException(
@@ -330,9 +354,7 @@ public class SqlNodeToOperationConversion {
catalogManager.getTableOrError(identifier).toCatalogTable();
PlannerQueryOperation query =
- (PlannerQueryOperation)
- convertValidatedSqlNodeOrFail(
- flinkPlanner, catalogManager,
insert.getSource());
+ (PlannerQueryOperation)
convertValidatedSqlNodeOrFail(insert.getSource());
// TODO calc target column list to index array, currently only simple
SqlIdentifiers are
// available, this should be updated after FLINK-31301 fixed
int[][] columnIndices =
@@ -650,17 +672,13 @@ public class SqlNodeToOperationConversion {
return new CompilePlanOperation(
compilePlan.getPlanFile(),
compilePlan.isIfNotExists(),
- convertValidatedSqlNodeOrFail(
- flinkPlanner, catalogManager,
compilePlan.getOperandList().get(0)));
+
convertValidatedSqlNodeOrFail(compilePlan.getOperandList().get(0)));
}
private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan
compileAndExecutePlan) {
return new CompileAndExecutePlanOperation(
compileAndExecutePlan.getPlanFile(),
- convertValidatedSqlNodeOrFail(
- flinkPlanner,
- catalogManager,
- compileAndExecutePlan.getOperandList().get(0)));
+
convertValidatedSqlNodeOrFail(compileAndExecutePlan.getOperandList().get(0)));
}
private Operation convertShowJobs(SqlShowJobs sqlStopJob) {
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewAsConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewAsConverter.java
index f4702f05086..8a3924077ee 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewAsConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewAsConverter.java
@@ -46,6 +46,7 @@ public class SqlAlterViewAsConverter implements
SqlNodeConverter<SqlAlterViewAs>
CatalogView newView =
toCatalogView(
newQuery,
+ alterView.getAsQueryKeywordPos(),
Collections.emptyList(),
oldView.getOptions(),
oldView.getComment(),
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
index 5a58b075a66..35ec8dcbb9d 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
@@ -47,7 +47,12 @@ public class SqlCreateViewConverter implements
SqlNodeConverter<SqlCreateView> {
Map<String, String> viewOptions = sqlCreateView.getProperties();
CatalogView catalogView =
SqlNodeConvertUtils.toCatalogView(
- query, viewFields, viewOptions, viewComment, context);
+ query,
+ sqlCreateView.getAsQueryKeywordPos(),
+ viewFields,
+ viewOptions,
+ viewComment,
+ context);
return new CreateViewOperation(
identifier,
catalogView,
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
index 5300144d80a..f2a6e2595a1 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
@@ -34,6 +34,8 @@ import
org.apache.flink.table.operations.utils.ValidationUtils;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import
org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
+import org.apache.flink.shaded.guava33.com.google.common.base.Splitter;
+
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
@@ -46,14 +48,58 @@ import
org.apache.calcite.sql.validate.SqlValidatorNamespace;
import javax.annotation.Nullable;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalInt;
import java.util.stream.Collectors;
/** Utilities for SqlNode conversions. */
-class SqlNodeConvertUtils {
+public class SqlNodeConvertUtils {
+
+ /**
+ * Returns the {@code AS}-query of a VIEW or MATERIALIZED TABLE DDL in
verbatim shape: the
+ * statement text from just after {@code AS} to its end, trimmed. Slicing
to the statement end
+ * (the {@code AS}-query is the last clause) keeps a comment after {@code
AS} and queries whose
+ * node position is narrower than their text, e.g. {@code WITH ... SELECT}.
+ *
+ * @param asQueryKeywordPos parser position of the {@code AS} keyword
+ * @return the verbatim AS-query, or empty when no statement text is
available
+ */
+ public static Optional<String> extractOriginalAsQueryText(
+ ConvertContext context, SqlParserPos asQueryKeywordPos) {
+ final String statementText = context.getStatementText();
+ if (statementText == null) {
+ return Optional.empty();
+ }
+ return offsetAfter(statementText, asQueryKeywordPos).stream()
+ .mapToObj(start -> statementText.substring(start).strip())
+ .findFirst();
+ }
+
+ /**
+ * Returns the offset of the first character after the given parser
position, or empty if the
+ * position's end line falls outside {@code statementText}.
+ */
+ private static OptionalInt offsetAfter(String statementText, SqlParserPos
pos) {
+ final int endLine = pos.getEndLineNum();
+ final int endCol = pos.getEndColumnNum();
+
+ final Iterator<String> iterator =
Splitter.on('\n').split(statementText).iterator();
+ int currentLine = 0;
+ int lineStartOffset = 0;
+ while (iterator.hasNext()) {
+ final String lineText = iterator.next();
+ currentLine++;
+ if (currentLine == endLine) {
+ return OptionalInt.of(lineStartOffset + endCol);
+ }
+ lineStartOffset += lineText.length() + 1;
+ }
+ return OptionalInt.empty();
+ }
static PlannerQueryOperation toQueryOperation(SqlNode validated,
ConvertContext context) {
// transform to a relational tree
@@ -65,17 +111,11 @@ class SqlNodeConvertUtils {
/** convert the query part of a VIEW statement into a {@link CatalogView}.
*/
static CatalogView toCatalogView(
SqlNode query,
+ SqlParserPos asQueryKeywordPos,
List<SqlNode> viewFields,
Map<String, String> viewOptions,
String viewComment,
ConvertContext context) {
- // Put the sql string unparse (getQuotedSqlString()) in front of
- // the node conversion (toQueryOperation()),
- // because before Calcite 1.22.0, during sql-to-rel conversion, the
SqlWindow
- // bounds state would be mutated as default when they are null (not
specified).
-
- // This bug is fixed in CALCITE-3877 of Calcite 1.23.0.
- String originalQuery = context.toQuotedSqlString(query);
SqlNode validateQuery = context.getSqlValidator().validate(query);
// FLINK-38950: SqlValidator.validate() mutates its input parameter.
Always use the
// returned validateQuery instead of the mutated query for all
subsequent operations.
@@ -110,6 +150,10 @@ class SqlNodeConvertUtils {
schema = ResolvedSchema.physical(aliasFieldNames,
schema.getColumnDataTypes());
}
+ final String originalQuery =
+ extractOriginalAsQueryText(context, asQueryKeywordPos)
+ .orElse(context.toQuotedSqlString(query));
+
return new ResolvedCatalogView(
CatalogView.of(
Schema.newBuilder().fromResolvedSchema(schema).build(),
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
index 23f8f1716b2..360d6c38e01 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
@@ -100,6 +100,13 @@ public interface SqlNodeConverter<S extends SqlNode> {
/** Convert the given {@param sqlNode} into a quoted SQL string. */
String toQuotedSqlString(SqlNode sqlNode);
+ /**
+ * Returns the original SQL statement text being converted, or {@code
null} when the node
+ * was synthesized (e.g. produced by an internal rewrite) and has no
source text.
+ */
+ @Nullable
+ String getStatementText();
+
/**
* Expands identifiers in a given SQL string.
*
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
index b788f0560e3..3ed73870835 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.catalog.StartMode;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import
org.apache.flink.table.planner.operations.converters.SqlNodeConvertUtils;
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
import org.apache.flink.table.planner.utils.MaterializedTableUtils;
import org.apache.flink.table.planner.utils.OperationConverterUtils;
@@ -141,10 +142,17 @@ public abstract class
AbstractCreateMaterializedTableConverter<T extends SqlCrea
return
MaterializedTableUtils.fromLogicalRefreshModeToRefreshMode(logicalRefreshMode);
}
+ /**
+ * Returns the user's original {@code AS} query text, sliced verbatim from
the statement so
+ * formatting and identifier casing are preserved (e.g. {@code int} is not
normalized to {@code
+ * INTEGER}). A comment placed between {@code AS} and the query is kept;
the {@code AS} keyword
+ * itself is excluded.
+ */
protected final String getDerivedOriginalQuery(
T sqlCreateMaterializedTable, ConvertContext context) {
- SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
- return context.toQuotedSqlString(selectQuery);
+ return SqlNodeConvertUtils.extractOriginalAsQueryText(
+ context,
sqlCreateMaterializedTable.getAsQueryKeywordPos())
+
.orElse(context.toQuotedSqlString(sqlCreateMaterializedTable.getAsQuery()));
}
protected final String getDerivedExpandedQuery(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
index b48d472593a..3860a3f93c9 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.operations.Operation;
import
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import
org.apache.flink.table.planner.operations.converters.SqlNodeConvertUtils;
import org.apache.flink.table.planner.utils.MaterializedTableUtils;
import org.apache.calcite.sql.SqlNode;
@@ -54,13 +55,12 @@ public class SqlAlterMaterializedTableAsQueryConverter
SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery,
ConvertContext context) {
return oldTable -> {
// Validate and extract schema from query
- String originalQuery =
context.toQuotedSqlString(sqlAlterTableAsQuery.getAsQuery());
- SqlNode validatedQuery =
-
context.getSqlValidator().validate(sqlAlterTableAsQuery.getAsQuery());
- String definitionQuery = context.toQuotedSqlString(validatedQuery);
+ SqlNode asQuery = sqlAlterTableAsQuery.getAsQuery();
+ SqlNode validatedQuery =
context.getSqlValidator().validate(asQuery);
+ String expandedQuery = context.toQuotedSqlString(validatedQuery);
PlannerQueryOperation queryOperation =
new PlannerQueryOperation(
- context.toRelRoot(validatedQuery).project(), () ->
definitionQuery);
+ context.toRelRoot(validatedQuery).project(), () ->
expandedQuery);
ResolvedSchema oldSchema = oldTable.getResolvedSchema();
ResolvedSchema newSchema = queryOperation.getResolvedSchema();
@@ -78,7 +78,11 @@ public class SqlAlterMaterializedTableAsQueryConverter
+ "consider using CREATE OR ALTER
MATERIALIZED TABLE instead");
}
}
- tableChanges.add(TableChange.modifyDefinitionQuery(originalQuery,
definitionQuery));
+ String originalQuery =
+ SqlNodeConvertUtils.extractOriginalAsQueryText(
+ context,
sqlAlterTableAsQuery.getAsQueryKeywordPos())
+ .orElse(context.toQuotedSqlString(asQuery));
+ tableChanges.add(TableChange.modifyDefinitionQuery(originalQuery,
expandedQuery));
return tableChanges;
};
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
index d610569bf95..9c666397c2d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.operations.CreateTableASOperation;
+import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.parse.CalciteParser;
@@ -422,6 +423,15 @@ class SqlCTASNodeToOperationTest extends
SqlNodeToOperationConversionTestBase {
.build()))));
}
+ @Test
+ void testExplainCreateTableAs() {
+ final Operation operation = parse("EXPLAIN CREATE TABLE myTable123 AS
SELECT 123");
+
+ assertThat(operation).isInstanceOf(ExplainOperation.class);
+ assertThat(((ExplainOperation) operation).getChild())
+ .isInstanceOf(CreateTableASOperation.class);
+ }
+
private Operation parseAndConvert(String sql) {
final FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.DEFAULT);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 3344616c798..2c5dd5d0d71 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -1459,7 +1459,7 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
+ " `user_id` INT NOT NULL,\n"
+ " CONSTRAINT `PK_user_id` PRIMARY KEY
(`user_id`) NOT ENFORCED\n"
+ "), comment='null', distribution=null,
partitionKeys=[], "
- + "options={format=debezium-json},
snapshot=null, originalQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`',
expandedQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+ + "options={format=debezium-json},
snapshot=null, originalQuery='SELECT 1 as shop_id, 2 as user_id',
expandedQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+ "freshness=INTERVAL '30' SECOND,
logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, "
+ "refreshStatus=INITIALIZING,
refreshHandlerDescription='null', serializedRefreshHandler=null},
resolvedSchema=(\n"
+ " `shop_id` INT,\n"
@@ -1491,7 +1491,7 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
+ " `user_id` INT NOT NULL,\n"
+ " CONSTRAINT `PK_user_id` PRIMARY KEY
(`user_id`) NOT ENFORCED\n"
+ "), comment='null', distribution=DISTRIBUTED
BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], "
- + "options={format=debezium-json},
snapshot=null, originalQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`',
expandedQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+ + "options={format=debezium-json},
snapshot=null, originalQuery='SELECT 1 as shop_id, 2 as user_id',
expandedQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+ "freshness=INTERVAL '30' SECOND,
logicalRefreshMode=AUTOMATIC, refreshMode=null, "
+ "refreshStatus=INITIALIZING,
refreshHandlerDescription='null', serializedRefreshHandler=null},
resolvedSchema=(\n"
+ " `shop_id` INT NOT NULL,\n"
@@ -2591,6 +2591,43 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
assertThat(operation).isInstanceOf(CreateViewOperation.class);
}
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("viewOriginalQueryCases")
+ void testCreateViewOriginalQuery(String name, String sql, String
expectedOriginalQuery) {
+ final Operation operation = parse(sql);
+ assertThat(operation).isInstanceOf(CreateViewOperation.class);
+ assertThat(((CreateViewOperation)
operation).getCatalogView().getOriginalQuery())
+ .isEqualTo(expectedOriginalQuery);
+ }
+
+ private static Stream<Arguments> viewOriginalQueryCases() {
+ return Stream.of(
+ // The AS query is sliced verbatim, preserving the user's
wording.
+ viewAsQueryCase(
+ "AS query with a leading comment is kept verbatim",
+ "/* keep me */\nselect 1"),
+ viewAsQueryCase(
+ "trailing comment after the AS query is kept",
+ "select 1 -- trailing comment"),
+ // Everything before AS must be excluded and must not shift
the slice.
+ Arguments.of(
+ "comment before AS is dropped",
+ "CREATE VIEW v1\n-- note before AS\nAS SELECT 1",
+ "SELECT 1"),
+ Arguments.of(
+ "As-query is stripped and comments are kept",
+ "CREATE VIEW v1 AS \n\n\n\n--keep\n--me\nSELECT * FROM
t1\n\n",
+ "--keep\n--me\nSELECT * FROM t1"),
+ Arguments.of(
+ "column list and COMMENT before AS are dropped",
+ "CREATE VIEW v1 (w, x, y, z)\nCOMMENT 'a view'\nAS
SELECT a, b, c, d FROM t1",
+ "SELECT a, b, c, d FROM t1"));
+ }
+
+ private static Arguments viewAsQueryCase(String name, String query) {
+ return Arguments.of(name, "CREATE VIEW v1 AS " + query, query);
+ }
+
@Test
void testAlterTableAddPartitions() throws Exception {
prepareTable("tb1", true, true, 0);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 6bb5fc5503d..6e99aa29ed3 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -66,6 +66,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -320,6 +321,66 @@ class SqlMaterializedTableNodeToOperationConverterTest
+ "LATERAL
TABLE(`builtin`.`default`.`myFunc`(`b`)) AS `T` (`f1`, `f2`)");
}
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("originalQueryCases")
+ void testOriginalQuery(String name, String sql, String
expectedOriginalQuery) {
+ final String originalQuery =
+ createMaterializedTableOperation(sql)
+ .getCatalogMaterializedTable()
+ .getOriginalQuery();
+ assertThat(originalQuery).isEqualTo(expectedOriginalQuery);
+ }
+
+ private static Stream<Arguments> originalQueryCases() {
+ final String query = "SELECT * FROM t1";
+ return Stream.of(
+ // The AS query is sliced verbatim from just after AS to the
end of the statement,
+ // preserving the user's wording (case, formatting, comments).
+ asQueryCase(
+ "AS query with a leading comment is kept verbatim",
+ "/* keep me */\nselect a, b from t1"),
+ asQueryCase(
+ "trailing comment after the AS query is kept",
+ "select a, b from t1 -- trailing comment"),
+ asQueryCase(
+ "WITH query is kept whole (its node position is
narrower than its text)",
+ "WITH q AS (SELECT a FROM t1) SELECT a FROM q"),
+ Arguments.of(
+ "As-query is stripped and comments are kept",
+ "CREATE MATERIALIZED TABLE mtbl1 AS
\n\n\n\n--keep\n--me\nSELECT * FROM t1\n\n",
+ "--keep\n--me\nSELECT * FROM t1"),
+ // Everything before AS must be excluded and must not shift
the slice, regardless of
+ // comments, identifier quoting, or a multi-line clause prefix.
+ Arguments.of(
+ "comment before AS is dropped",
+ "CREATE MATERIALIZED TABLE mtbl1\n-- note before
AS\nAS " + query,
+ query),
+ Arguments.of(
+ "escaped-backtick identifier and COMMENT before AS are
dropped",
+ "CREATE MATERIALIZED TABLE `m``tbl` COMMENT 'a''b' AS
" + query,
+ query),
+ Arguments.of(
+ "multi-line DDL prefix before AS is dropped",
+ "CREATE MATERIALIZED TABLE mtbl1 (\n"
+ + " CONSTRAINT ct1 PRIMARY KEY(a) NOT
ENFORCED\n"
+ + ")\n"
+ + "COMMENT 'materialized table comment'\n"
+ + "PARTITIONED BY (a)\n"
+ + "WITH (\n"
+ + " 'connector' = 'filesystem',\n"
+ + " 'format' = 'json'\n"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "REFRESH_MODE = FULL\n"
+ + "AS "
+ + query,
+ query));
+ }
+
+ private static Arguments asQueryCase(String name, String query) {
+ return Arguments.of(name, "CREATE MATERIALIZED TABLE mtbl1 AS " +
query, query);
+ }
+
@Test
void testCreateMaterializedTableWithUDTFQueryWithoutAlias() {
functionCatalog.registerCatalogFunction(
@@ -572,7 +633,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
@Test
void testAlterMaterializedTableAsQuery() throws TableNotExistException {
String sql =
- "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, d as
e, cast('123' as string) as f FROM t3";
+ "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, d as
e, cast('123' as string) as f\n FROM\n t3";
Operation operation = parse(sql);
assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class);
@@ -584,7 +645,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
TableChange.add(Column.physical("e",
DataTypes.VARCHAR(Integer.MAX_VALUE))),
TableChange.add(Column.physical("f",
DataTypes.VARCHAR(Integer.MAX_VALUE))),
TableChange.modifyDefinitionQuery(
- "SELECT `a`, `b`, `c`, `d`, `d` AS `e`,
CAST('123' AS STRING) AS `f`\nFROM `t3`",
+ "SELECT a, b, c, d, d as e, cast('123' as
string) as f\n FROM\n t3",
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`,
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+ "FROM `builtin`.`default`.`t3` AS
`t3`"));
assertThat(operation.asSummaryString())
@@ -638,7 +699,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
.containsExactly(
TableChange.add(Column.physical("a0",
DataTypes.INT())),
TableChange.modifyDefinitionQuery(
- "SELECT `a`, `b`, `c`, `d`, `c` AS `a`\nFROM
`t3`",
+ "SELECT a, b, c, d, c as a FROM t3",
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`,
`t3`.`d`, `t3`.`c` AS `a`\n"
+ "FROM `builtin`.`default`.`t3` AS
`t3`"));
}
@@ -1373,7 +1434,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
.comment("materialized table comment")
.options(Map.of("connector", "filesystem", "format", "json"))
.partitionKeys(List.of("a", "d"))
- .originalQuery("SELECT *\nFROM `t1`")
+ .originalQuery("SELECT * FROM t1")
.expandedQuery(
"SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
+ "FROM `builtin`.`default`.`t1` AS `t1`");
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
index 9ed19411a7a..0814e1556a5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
@@ -131,14 +131,14 @@ class SqlNodeToOperationConversionTestBase {
protected Operation parse(String sql, FlinkPlannerImpl planner,
CalciteParser parser) {
SqlNode node = parser.parse(sql);
- return SqlNodeToOperationConversion.convert(planner, catalogManager,
node).get();
+ return SqlNodeToOperationConversion.convert(planner, catalogManager,
node, sql).get();
}
protected Operation parse(String sql) {
FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = parser.parse(sql);
- return SqlNodeToOperationConversion.convert(planner, catalogManager,
node).get();
+ return SqlNodeToOperationConversion.convert(planner, catalogManager,
node, sql).get();
}
protected FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
index 4082919ef02..1eeeeafc627 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
@@ -92,7 +92,7 @@ class
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
TableChange.add(Column.physical("f", DataTypes.INT())),
TableChange.dropConstraint("ct1"),
TableChange.modifyDefinitionQuery(
- "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS
`f`\nFROM `t1`",
+ "SELECT a, b, c, d, a as `a1`, 3 as f FROM t1",
"SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`,
`t1`.`d`, `t1`.`a` AS `a1`, 3 AS `f`\n"
+ "FROM `builtin`.`default`.`t1` AS
`t1`"),
TableChange.reset("connector"),
@@ -112,7 +112,7 @@ class
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
// No explicit schema, so nullable will be used
TableChange.add(Column.physical("a1",
DataTypes.BIGINT())),
TableChange.modifyDefinitionQuery(
- "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM
`t1`",
+ "SELECT a, b, c, d, a as `a1` FROM t1",
"SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`,
`t1`.`d`, `t1`.`a` AS `a1`\n"
+ "FROM `builtin`.`default`.`t1` AS
`t1`"),
TableChange.reset("connector"),
@@ -201,7 +201,7 @@ class
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
TableChange.add(Column.physical("e",
DataTypes.VARCHAR(Integer.MAX_VALUE))),
TableChange.add(Column.physical("f",
DataTypes.VARCHAR(Integer.MAX_VALUE))),
TableChange.modifyDefinitionQuery(
- "SELECT `a`, `b`, `c`, `d`, `d` AS `e`,
CAST('123' AS STRING) AS `f`\nFROM `t1`",
+ "SELECT a, b, c, d, d as e, cast('123' as
string) as f FROM t1",
"SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`,
`t1`.`d`, `t1`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+ "FROM `builtin`.`default`.`t1` AS
`t1`"),
TableChange.set("format", "json2"),
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index 524bdcce224..8ed698f99d8 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ReplaceTableAsOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -280,6 +281,15 @@ class SqlRTASNodeToOperationConverterTest extends
SqlNodeToOperationConversionTe
testCommonReplaceTableAs(sql, tableName, null, tableSchema, null,
Collections.emptyList());
}
+ @Test
+ void testExplainReplaceTableAs() {
+ final Operation operation = parse("EXPLAIN REPLACE TABLE myTable123 AS
SELECT 123");
+
+ assertThat(operation).isInstanceOf(ExplainOperation.class);
+ assertThat(((ExplainOperation) operation).getChild())
+ .isInstanceOf(ReplaceTableAsOperation.class);
+ }
+
private void testCommonReplaceTableAs(
String sql, String tableName, @Nullable String tableComment) {
testCommonReplaceTableAs(
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index f8546d4a678..b0c0dd4385e 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -1234,7 +1234,7 @@ class CatalogTableITCase(isStreamingMode: Boolean)
extends TableITCaseBase {
.get()
.getTable(objectPath)
.asInstanceOf[CatalogView]
- assertThat(view.getOriginalQuery).isEqualTo("SELECT `b`\nFROM `T`")
+ assertThat(view.getOriginalQuery).isEqualTo("SELECT b FROM T")
}
@TestTemplate