This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 3ff7be42e53 [FLINK-36026][table] Options from `OPTIONS` hint should be 
present in compiled plan
3ff7be42e53 is described below

commit 3ff7be42e53366dcb5b18ad63ca4a7a8ef78fac4
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Aug 9 13:52:17 2024 +0200

    [FLINK-36026][table] Options from `OPTIONS` hint should be present in 
compiled plan
    
    This closes #25186
---
 .editorconfig                                      |  3 +
 .../planner/plan/schema/CatalogSourceTable.java    |  6 +-
 .../apache/flink/table/api/CompiledPlanITCase.java | 29 +++++--
 .../jsonplan/testGetJsonPlanWithHints.out          | 89 ++++++++++++++++++++++
 4 files changed, 117 insertions(+), 10 deletions(-)

diff --git a/.editorconfig b/.editorconfig
index 333c2a16580..5aa9a33ccd2 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -263,6 +263,9 @@ ij_java_variable_annotation_wrap = normal
 ij_java_wrap_first_method_in_call_chain = true
 # ij_java_wrap_long_lines = false
 
+[*.out]
+insert_final_newline = false
+
 [*.xml]
 indent_style = tab
 indent_size = 4
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java
index dd501487e4e..0c1cb09188f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java
@@ -107,19 +107,19 @@ public final class CatalogSourceTable extends 
FlinkPreparingTableBase {
 
         // finalize catalog table with option hints
         final Map<String, String> hintedOptions = 
FlinkHints.getHintedOptions(hints);
-        final ContextResolvedTable catalogTable =
+        final ContextResolvedTable contextTableWithHints =
                 computeContextResolvedTable(context, hintedOptions);
 
         // create table source
         final DynamicTableSource tableSource =
-                createDynamicTableSource(context, 
catalogTable.getResolvedTable());
+                createDynamicTableSource(context, 
contextTableWithHints.getResolvedTable());
 
         // prepare table source and convert to RelNode
         return DynamicSourceUtils.convertSourceToRel(
                 !schemaTable.isStreamingMode(),
                 context.getTableConfig(),
                 relBuilder,
-                schemaTable.getContextResolvedTable(),
+                contextTableWithHints,
                 schemaTable.getStatistic(),
                 hints,
                 tableSource);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index 591f0a52e4a..f432b0fa8fe 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -83,14 +83,25 @@ class CompiledPlanITCase extends JsonPlanTestBase {
                 tableEnv.compilePlanSql("INSERT INTO MySink SELECT * FROM 
MyTable");
         String expected = 
TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out");
         assertThat(
-                        TableTestUtil.replaceExecNodeId(
-                                TableTestUtil.replaceFlinkVersion(
-                                        TableTestUtil.getFormattedJson(
-                                                compiledPlan.asJsonString()))))
+                        getPreparedToCompareCompiledPlan(
+                                
TableTestUtil.getFormattedJson(compiledPlan.asJsonString())))
                 .isEqualTo(
-                        TableTestUtil.replaceExecNodeId(
-                                TableTestUtil.replaceFlinkVersion(
-                                        
TableTestUtil.getFormattedJson(expected))));
+                        
getPreparedToCompareCompiledPlan(TableTestUtil.getFormattedJson(expected)));
+    }
+
+    @Test
+    void testSourceTableWithHints() {
+        CompiledPlan compiledPlan =
+                tableEnv.compilePlanSql(
+                        "INSERT INTO MySink SELECT * FROM MyTable"
+                                // OPTIONS hints here do not play any 
significant role
+                                // we just have to be sure that these options 
are present in
+                                // compiled plan
+                                + " /*+ OPTIONS('bounded'='true', 
'scan.parallelism'='2') */");
+
+        String expected = 
TableTestUtil.readFromResource("/jsonplan/testGetJsonPlanWithHints.out");
+        
assertThat(getPreparedToCompareCompiledPlan(compiledPlan.asJsonString()))
+                .isEqualTo(expected);
     }
 
     @Test
@@ -432,4 +443,8 @@ class CompiledPlanITCase extends JsonPlanTestBase {
         createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
         return createTestCsvSinkTable("sink", COLUMNS_DEFINITION);
     }
+
+    private String getPreparedToCompareCompiledPlan(final String planAsString) 
{
+        return 
TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(planAsString));
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
new file mode 100644
index 00000000000..fd7c70323e1
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
@@ -0,0 +1,89 @@
+{
+  "flinkVersion": "",
+  "nodes" : [ {
+    "id": 0,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "bounded" : "true",
+            "connector" : "values",
+            "scan.parallelism" : "2"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS 
options:{scan.parallelism=2, bounded=true}]]])",
+    "inputProperties" : [ ]
+  }, {
+    "id": 0,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "table-sink-class" : "DEFAULT"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source": 0,
+    "target": 0,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file

Reply via email to