This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 5919251d7a9 [FLINK-33823] Make PlannerQueryOperation SQL serializable 5919251d7a9 is described below commit 5919251d7a94264a6a72c31de0716b3f72d65437 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Mon Dec 18 12:20:12 2023 +0100 [FLINK-33823] Make PlannerQueryOperation SQL serializable --- .../expressions/ExpressionSerializationTest.java | 4 ++-- .../flink/table/test/program/TableApiTestStep.java | 8 ++++++++ .../functions/BuiltInFunctionDefinitions.java | 2 +- .../planner/operations/PlannerQueryOperation.java | 16 ++++++++++++++- .../operations/SqlNodeToOperationConversion.java | 16 ++++++++++++--- .../operations/converters/SqlNodeConvertUtils.java | 3 ++- .../operations/converters/SqlQueryConverter.java | 3 ++- .../converters/SqlReplaceTableAsConverter.java | 4 +++- .../table/planner/delegation/PlannerBase.scala | 4 +++- .../table/api/QueryOperationSqlExecutionTest.java | 3 ++- .../api/QueryOperationSqlSerializationTest.java | 3 ++- .../table/api/QueryOperationTestPrograms.java | 24 ++++++++++++++++++++++ .../flink/table/planner/utils/TableTestBase.scala | 7 ++++++- 13 files changed, 83 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java index 2693cb38517..ea5d0318b55 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java @@ -183,10 +183,10 @@ public class ExpressionSerializationTest { .expectStr("OVERLAY(`f0` PLACING 'ABC' FROM 2 FOR 5)"), TestSpec.forExpr($("f0").substr(2)) .withField("f0", DataTypes.STRING()) - .expectStr("SUBSTR(`f0` FROM 2)"), + .expectStr("SUBSTR(`f0`, 2)"), TestSpec.forExpr($("f0").substr(2, 5)) .withField("f0", DataTypes.STRING()) - .expectStr("SUBSTR(`f0` FROM 2 FOR 5)"), + .expectStr("SUBSTR(`f0`, 2, 5)"), TestSpec.forExpr($("f0").substring(2)) .withField("f0", DataTypes.STRING()) .expectStr("SUBSTRING(`f0` FROM 2)"), diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java index 06c16c70931..07e147ae208 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java @@ -57,6 +57,11 @@ public class TableApiTestStep implements TestStep { public Table fromValues(AbstractDataType<?> dataType, Object... values) { return env.fromValues(dataType, values); } + + @Override + public Table sqlQuery(String query) { + return env.sqlQuery(query); + } }); } @@ -83,5 +88,8 @@ public class TableApiTestStep implements TestStep { /** See {@link TableEnvironment#fromValues(AbstractDataType, Object...)}. */ Table fromValues(AbstractDataType<?> dataType, Object... values); + + /** See {@link TableEnvironment#sqlQuery(String)}. */ + Table sqlQuery(String query); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index b65afdc4284..669f4012003 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -807,7 +807,7 @@ public final class BuiltInFunctionDefinitions { public static final BuiltInFunctionDefinition SUBSTR = BuiltInFunctionDefinition.newBuilder() .name("substr") - .callSyntax("SUBSTR", SqlCallSyntax.SUBSTRING) + .sqlName("SUBSTR") .kind(SCALAR) .inputTypeStrategy( or( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerQueryOperation.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerQueryOperation.java index fdb0dd53660..0fce3fb8f46 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerQueryOperation.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerQueryOperation.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.operations; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.OperationUtils; @@ -33,6 +34,7 @@ import org.apache.calcite.rel.type.RelDataType; import java.util.Collections; import java.util.List; +import java.util.function.Supplier; /** Wrapper for valid logical plans generated by Planner. */ @Internal @@ -41,8 +43,11 @@ public class PlannerQueryOperation implements QueryOperation { private final RelNode calciteTree; private final ResolvedSchema resolvedSchema; - public PlannerQueryOperation(RelNode calciteTree) { + private final Supplier<String> toSqlString; + + public PlannerQueryOperation(RelNode calciteTree, Supplier<String> toSqlString) { this.calciteTree = calciteTree; + this.toSqlString = toSqlString; RelDataType rowType = calciteTree.getRowType(); String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); @@ -72,6 +77,15 @@ public class PlannerQueryOperation implements QueryOperation { "PlannerNode", Collections.emptyMap(), getChildren(), Operation::asSummaryString); } + @Override + public String asSerializableString() { + try { + return toSqlString.get(); + } catch (Exception e) { + throw new TableException("Given plan is not serializable into SQL", e); + } + } + @Override public List<QueryOperation> getChildren() { return Collections.emptyList(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index 497aca5076b..947724e1076 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -1319,7 +1319,12 @@ public class SqlNodeToOperationConversion { } } // delete push down is not applicable, use row-level delete - PlannerQueryOperation queryOperation = new PlannerQueryOperation(tableModify); + PlannerQueryOperation queryOperation = + new PlannerQueryOperation( + tableModify, + () -> { + throw new TableException("Delete statements are not SQL serializable."); + }); return new SinkModifyOperation( contextResolvedTable, queryOperation, @@ -1340,7 +1345,12 @@ public class SqlNodeToOperationConversion { catalogManager.getTableOrError( catalogManager.qualifyIdentifier(unresolvedTableIdentifier)); // get query - PlannerQueryOperation queryOperation = new PlannerQueryOperation(tableModify); + PlannerQueryOperation queryOperation = + new PlannerQueryOperation( + tableModify, + () -> { + throw new TableException("Update statements are not SQL serializable."); + }); // TODO calc target column list to index array, currently only simple SqlIdentifiers are // available, this should be updated after FLINK-31344 fixed @@ -1379,6 +1389,6 @@ public class SqlNodeToOperationConversion { private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) { // transform to a relational tree RelRoot relational = planner.rel(validated); - return new PlannerQueryOperation(relational.project()); + return new PlannerQueryOperation(relational.project(), () -> getQuotedSqlString(validated)); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java index 28d030af707..06cdceea9ad 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java @@ -45,7 +45,8 @@ class SqlNodeConvertUtils { static PlannerQueryOperation toQueryOperation(SqlNode validated, ConvertContext context) { // transform to a relational tree RelRoot relational = context.toRelRoot(validated); - return new PlannerQueryOperation(relational.project()); + return new PlannerQueryOperation( + relational.project(), () -> context.toQuotedSqlString(validated)); } /** convert the query part of a VIEW statement into a {@link CatalogView}. */ diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlQueryConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlQueryConverter.java index b38a7cfc03d..14da6e6fc87 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlQueryConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlQueryConverter.java @@ -46,6 +46,7 @@ public class SqlQueryConverter implements SqlNodeConverter<SqlNode> { public Operation convertSqlNode(SqlNode node, ConvertContext context) { // transform to a relational tree RelRoot relational = context.toRelRoot(node); - return new PlannerQueryOperation(relational.project()); + return new PlannerQueryOperation( + relational.project(), () -> context.toQuotedSqlString(node)); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java index 1ace0be66c6..396d50bccb5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java @@ -51,7 +51,9 @@ public class SqlReplaceTableAsConverter implements SqlNodeConverter<SqlReplaceTa SqlNode asQuerySqlNode = sqlReplaceTableAs.getAsQuery(); context.getSqlValidator().validate(asQuerySqlNode); QueryOperation query = - new PlannerQueryOperation(context.toRelRoot(asQuerySqlNode).project()); + new PlannerQueryOperation( + context.toRelRoot(asQuerySqlNode).project(), + () -> context.toQuotedSqlString(asQuerySqlNode)); // get table comment String tableComment = diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 59e44941a8e..45788e6278e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -519,7 +519,9 @@ abstract class PlannerBase( val contextResolvedTable = catalogManager.getTableOrError(objectIdentifier) val modifyOperation = new SinkModifyOperation( contextResolvedTable, - new PlannerQueryOperation(modify.getInput) + new PlannerQueryOperation( + modify.getInput, + () => queryOperation.asSerializableString()) ) translateToRel(modifyOperation) case _ => diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java index fde5b71f33f..f686ba1f283 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java @@ -69,7 +69,8 @@ public class QueryOperationSqlExecutionTest implements TableTestProgramRunner { QueryOperationTestPrograms.GROUP_HOP_WINDOW_EVENT_TIME, QueryOperationTestPrograms.SORT_LIMIT_DESC, QueryOperationTestPrograms.GROUP_BY_UDF_WITH_MERGE, - QueryOperationTestPrograms.NON_WINDOW_INNER_JOIN); + QueryOperationTestPrograms.NON_WINDOW_INNER_JOIN, + QueryOperationTestPrograms.SQL_QUERY_OPERATION); } @ParameterizedTest diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java index e2545867eac..acd38d6adc8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java @@ -52,7 +52,8 @@ public class QueryOperationSqlSerializationTest implements TableTestProgramRunne QueryOperationTestPrograms.ORDER_BY_QUERY_OPERATION, QueryOperationTestPrograms.WINDOW_AGGREGATE_QUERY_OPERATION, QueryOperationTestPrograms.UNION_ALL_QUERY_OPERATION, - QueryOperationTestPrograms.LATERAL_JOIN_QUERY_OPERATION); + QueryOperationTestPrograms.LATERAL_JOIN_QUERY_OPERATION, + QueryOperationTestPrograms.SQL_QUERY_OPERATION); } @ParameterizedTest diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java index b983f6bd25e..a77b4f481ff 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java @@ -315,6 +315,30 @@ public class QueryOperationTestPrograms { + ") ORDER BY `a` ASC, `b` DESC OFFSET 1 ROWS FETCH NEXT 2 ROWS ONLY") .build(); + static final TableTestProgram SQL_QUERY_OPERATION = + TableTestProgram.of("sql-query-operation", "verifies sql serialization") + .setupTableSource( + SourceTestStep.newBuilder("s") + .addSchema("a bigint", "b string") + .producedValues(Row.of(1L, "abc"), Row.of(2L, "cde")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("a bigint", "b string") + .consumedValues(Row.of(3L, "bc"), Row.of(4L, "de")) + .build()) + .runTableApi( + t -> + t.sqlQuery("SELECT a, b FROM s") + .select($("a").plus(2), $("b").substr(2, 3)), + "sink") + .runSql( + "SELECT (`a` + 2) AS `_c0`, (SUBSTR(`b`, 2, 3)) AS `_c1` FROM (\n" + + " SELECT `s`.`a`, `s`.`b`\n" + + " FROM `default_catalog`.`default_database`.`s` AS `s`\n" + + ")") + .build(); + static final TableTestProgram GROUP_HOP_WINDOW_EVENT_TIME = TableTestProgram.of( "group-window-aggregate-hop-event-time", diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 86533006bbe..1e006f3d94b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -77,7 +77,9 @@ import _root_.scala.collection.JavaConversions._ import _root_.scala.io.Source import org.apache.calcite.avatica.util.TimeUnit import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.rel2sql.RelToSqlConverter import org.apache.calcite.sql.{SqlExplainLevel, SqlIntervalQualifier} +import org.apache.calcite.sql.dialect.AnsiSqlDialect import org.apache.calcite.sql.parser.SqlParserPos import org.assertj.core.api.Assertions.{assertThat, assertThatExceptionOfType, fail} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} @@ -1282,7 +1284,10 @@ case class StreamTableTestUtil( rowtimeFieldIdx, expr ) - val queryOperation = new PlannerQueryOperation(watermarkAssigner) + val queryOperation = new PlannerQueryOperation( + watermarkAssigner, + () => + throw new TableException("Cannot convert a LogicalWatermarkAssigner back to a SQL string.")) testingTableEnv.createTemporaryView(tableName, testingTableEnv.createTable(queryOperation)) }