This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5919251d7a9 [FLINK-33823] Make PlannerQueryOperation SQL serializable
5919251d7a9 is described below

commit 5919251d7a94264a6a72c31de0716b3f72d65437
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Mon Dec 18 12:20:12 2023 +0100

    [FLINK-33823] Make PlannerQueryOperation SQL serializable
---
 .../expressions/ExpressionSerializationTest.java   |  4 ++--
 .../flink/table/test/program/TableApiTestStep.java |  8 ++++++++
 .../functions/BuiltInFunctionDefinitions.java      |  2 +-
 .../planner/operations/PlannerQueryOperation.java  | 16 ++++++++++++++-
 .../operations/SqlNodeToOperationConversion.java   | 16 ++++++++++++---
 .../operations/converters/SqlNodeConvertUtils.java |  3 ++-
 .../operations/converters/SqlQueryConverter.java   |  3 ++-
 .../converters/SqlReplaceTableAsConverter.java     |  4 +++-
 .../table/planner/delegation/PlannerBase.scala     |  4 +++-
 .../table/api/QueryOperationSqlExecutionTest.java  |  3 ++-
 .../api/QueryOperationSqlSerializationTest.java    |  3 ++-
 .../table/api/QueryOperationTestPrograms.java      | 24 ++++++++++++++++++++++
 .../flink/table/planner/utils/TableTestBase.scala  |  7 ++++++-
 13 files changed, 83 insertions(+), 14 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java
index 2693cb38517..ea5d0318b55 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java
@@ -183,10 +183,10 @@ public class ExpressionSerializationTest {
                         .expectStr("OVERLAY(`f0` PLACING 'ABC' FROM 2 FOR 5)"),
                 TestSpec.forExpr($("f0").substr(2))
                         .withField("f0", DataTypes.STRING())
-                        .expectStr("SUBSTR(`f0` FROM 2)"),
+                        .expectStr("SUBSTR(`f0`, 2)"),
                 TestSpec.forExpr($("f0").substr(2, 5))
                         .withField("f0", DataTypes.STRING())
-                        .expectStr("SUBSTR(`f0` FROM 2 FOR 5)"),
+                        .expectStr("SUBSTR(`f0`, 2, 5)"),
                 TestSpec.forExpr($("f0").substring(2))
                         .withField("f0", DataTypes.STRING())
                         .expectStr("SUBSTRING(`f0` FROM 2)"),
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java
index 06c16c70931..07e147ae208 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java
@@ -57,6 +57,11 @@ public class TableApiTestStep implements TestStep {
                     public Table fromValues(AbstractDataType<?> dataType, 
Object... values) {
                         return env.fromValues(dataType, values);
                     }
+
+                    @Override
+                    public Table sqlQuery(String query) {
+                        return env.sqlQuery(query);
+                    }
                 });
     }
 
@@ -83,5 +88,8 @@ public class TableApiTestStep implements TestStep {
 
         /** See {@link TableEnvironment#fromValues(AbstractDataType, 
Object...)}. */
         Table fromValues(AbstractDataType<?> dataType, Object... values);
+
+        /** See {@link TableEnvironment#sqlQuery(String)}. */
+        Table sqlQuery(String query);
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index b65afdc4284..669f4012003 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -807,7 +807,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition SUBSTR =
             BuiltInFunctionDefinition.newBuilder()
                     .name("substr")
-                    .callSyntax("SUBSTR", SqlCallSyntax.SUBSTRING)
+                    .sqlName("SUBSTR")
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerQueryOperation.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerQueryOperation.java
index fdb0dd53660..0fce3fb8f46 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerQueryOperation.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerQueryOperation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.OperationUtils;
@@ -33,6 +34,7 @@ import org.apache.calcite.rel.type.RelDataType;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Supplier;
 
 /** Wrapper for valid logical plans generated by Planner. */
 @Internal
@@ -41,8 +43,11 @@ public class PlannerQueryOperation implements QueryOperation 
{
     private final RelNode calciteTree;
     private final ResolvedSchema resolvedSchema;
 
-    public PlannerQueryOperation(RelNode calciteTree) {
+    private final Supplier<String> toSqlString;
+
+    public PlannerQueryOperation(RelNode calciteTree, Supplier<String> 
toSqlString) {
         this.calciteTree = calciteTree;
+        this.toSqlString = toSqlString;
 
         RelDataType rowType = calciteTree.getRowType();
         String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
@@ -72,6 +77,15 @@ public class PlannerQueryOperation implements QueryOperation 
{
                 "PlannerNode", Collections.emptyMap(), getChildren(), 
Operation::asSummaryString);
     }
 
+    @Override
+    public String asSerializableString() {
+        try {
+            return toSqlString.get();
+        } catch (Exception e) {
+            throw new TableException("Given plan is not serializable into 
SQL", e);
+        }
+    }
+
     @Override
     public List<QueryOperation> getChildren() {
         return Collections.emptyList();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index 497aca5076b..947724e1076 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -1319,7 +1319,12 @@ public class SqlNodeToOperationConversion {
             }
         }
         // delete push down is not applicable, use row-level delete
-        PlannerQueryOperation queryOperation = new 
PlannerQueryOperation(tableModify);
+        PlannerQueryOperation queryOperation =
+                new PlannerQueryOperation(
+                        tableModify,
+                        () -> {
+                            throw new TableException("Delete statements are 
not SQL serializable.");
+                        });
         return new SinkModifyOperation(
                 contextResolvedTable,
                 queryOperation,
@@ -1340,7 +1345,12 @@ public class SqlNodeToOperationConversion {
                 catalogManager.getTableOrError(
                         
catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
         // get query
-        PlannerQueryOperation queryOperation = new 
PlannerQueryOperation(tableModify);
+        PlannerQueryOperation queryOperation =
+                new PlannerQueryOperation(
+                        tableModify,
+                        () -> {
+                            throw new TableException("Update statements are 
not SQL serializable.");
+                        });
 
         // TODO calc target column list to index array, currently only simple 
SqlIdentifiers are
         // available, this should be updated after FLINK-31344 fixed
@@ -1379,6 +1389,6 @@ public class SqlNodeToOperationConversion {
     private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, 
SqlNode validated) {
         // transform to a relational tree
         RelRoot relational = planner.rel(validated);
-        return new PlannerQueryOperation(relational.project());
+        return new PlannerQueryOperation(relational.project(), () -> 
getQuotedSqlString(validated));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
index 28d030af707..06cdceea9ad 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
@@ -45,7 +45,8 @@ class SqlNodeConvertUtils {
     static PlannerQueryOperation toQueryOperation(SqlNode validated, 
ConvertContext context) {
         // transform to a relational tree
         RelRoot relational = context.toRelRoot(validated);
-        return new PlannerQueryOperation(relational.project());
+        return new PlannerQueryOperation(
+                relational.project(), () -> 
context.toQuotedSqlString(validated));
     }
 
     /** convert the query part of a VIEW statement into a {@link CatalogView}. 
*/
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlQueryConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlQueryConverter.java
index b38a7cfc03d..14da6e6fc87 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlQueryConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlQueryConverter.java
@@ -46,6 +46,7 @@ public class SqlQueryConverter implements 
SqlNodeConverter<SqlNode> {
     public Operation convertSqlNode(SqlNode node, ConvertContext context) {
         // transform to a relational tree
         RelRoot relational = context.toRelRoot(node);
-        return new PlannerQueryOperation(relational.project());
+        return new PlannerQueryOperation(
+                relational.project(), () -> context.toQuotedSqlString(node));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
index 1ace0be66c6..396d50bccb5 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
@@ -51,7 +51,9 @@ public class SqlReplaceTableAsConverter implements 
SqlNodeConverter<SqlReplaceTa
         SqlNode asQuerySqlNode = sqlReplaceTableAs.getAsQuery();
         context.getSqlValidator().validate(asQuerySqlNode);
         QueryOperation query =
-                new 
PlannerQueryOperation(context.toRelRoot(asQuerySqlNode).project());
+                new PlannerQueryOperation(
+                        context.toRelRoot(asQuerySqlNode).project(),
+                        () -> context.toQuotedSqlString(asQuerySqlNode));
 
         // get table comment
         String tableComment =
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 59e44941a8e..45788e6278e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -519,7 +519,9 @@ abstract class PlannerBase(
             val contextResolvedTable = 
catalogManager.getTableOrError(objectIdentifier)
             val modifyOperation = new SinkModifyOperation(
               contextResolvedTable,
-              new PlannerQueryOperation(modify.getInput)
+              new PlannerQueryOperation(
+                modify.getInput,
+                () => queryOperation.asSerializableString())
             )
             translateToRel(modifyOperation)
           case _ =>
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
index fde5b71f33f..f686ba1f283 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
@@ -69,7 +69,8 @@ public class QueryOperationSqlExecutionTest implements 
TableTestProgramRunner {
                 QueryOperationTestPrograms.GROUP_HOP_WINDOW_EVENT_TIME,
                 QueryOperationTestPrograms.SORT_LIMIT_DESC,
                 QueryOperationTestPrograms.GROUP_BY_UDF_WITH_MERGE,
-                QueryOperationTestPrograms.NON_WINDOW_INNER_JOIN);
+                QueryOperationTestPrograms.NON_WINDOW_INNER_JOIN,
+                QueryOperationTestPrograms.SQL_QUERY_OPERATION);
     }
 
     @ParameterizedTest
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
index e2545867eac..acd38d6adc8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
@@ -52,7 +52,8 @@ public class QueryOperationSqlSerializationTest implements 
TableTestProgramRunne
                 QueryOperationTestPrograms.ORDER_BY_QUERY_OPERATION,
                 QueryOperationTestPrograms.WINDOW_AGGREGATE_QUERY_OPERATION,
                 QueryOperationTestPrograms.UNION_ALL_QUERY_OPERATION,
-                QueryOperationTestPrograms.LATERAL_JOIN_QUERY_OPERATION);
+                QueryOperationTestPrograms.LATERAL_JOIN_QUERY_OPERATION,
+                QueryOperationTestPrograms.SQL_QUERY_OPERATION);
     }
 
     @ParameterizedTest
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
index b983f6bd25e..a77b4f481ff 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
@@ -315,6 +315,30 @@ public class QueryOperationTestPrograms {
                                     + ") ORDER BY `a` ASC, `b` DESC OFFSET 1 
ROWS FETCH NEXT 2 ROWS ONLY")
                     .build();
 
+    static final TableTestProgram SQL_QUERY_OPERATION =
+            TableTestProgram.of("sql-query-operation", "verifies sql 
serialization")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("s")
+                                    .addSchema("a bigint", "b string")
+                                    .producedValues(Row.of(1L, "abc"), 
Row.of(2L, "cde"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("a bigint", "b string")
+                                    .consumedValues(Row.of(3L, "bc"), 
Row.of(4L, "de"))
+                                    .build())
+                    .runTableApi(
+                            t ->
+                                    t.sqlQuery("SELECT a, b FROM s")
+                                            .select($("a").plus(2), 
$("b").substr(2, 3)),
+                            "sink")
+                    .runSql(
+                            "SELECT (`a` + 2) AS `_c0`, (SUBSTR(`b`, 2, 3)) AS 
`_c1` FROM (\n"
+                                    + "    SELECT `s`.`a`, `s`.`b`\n"
+                                    + "    FROM 
`default_catalog`.`default_database`.`s` AS `s`\n"
+                                    + ")")
+                    .build();
+
     static final TableTestProgram GROUP_HOP_WINDOW_EVENT_TIME =
             TableTestProgram.of(
                             "group-window-aggregate-hop-event-time",
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 86533006bbe..1e006f3d94b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -77,7 +77,9 @@ import _root_.scala.collection.JavaConversions._
 import _root_.scala.io.Source
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.rel2sql.RelToSqlConverter
 import org.apache.calcite.sql.{SqlExplainLevel, SqlIntervalQualifier}
+import org.apache.calcite.sql.dialect.AnsiSqlDialect
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.assertj.core.api.Assertions.{assertThat, assertThatExceptionOfType, 
fail}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
@@ -1282,7 +1284,10 @@ case class StreamTableTestUtil(
       rowtimeFieldIdx,
       expr
     )
-    val queryOperation = new PlannerQueryOperation(watermarkAssigner)
+    val queryOperation = new PlannerQueryOperation(
+      watermarkAssigner,
+      () =>
+        throw new TableException("Cannot convert a LogicalWatermarkAssigner 
back to a SQL string."))
     testingTableEnv.createTemporaryView(tableName, 
testingTableEnv.createTable(queryOperation))
   }
 

Reply via email to