This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new 3ff7be42e53 [FLINK-36026][table] Options from `OPTIONS` hint should be
present in compiled plan
3ff7be42e53 is described below
commit 3ff7be42e53366dcb5b18ad63ca4a7a8ef78fac4
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Aug 9 13:52:17 2024 +0200
[FLINK-36026][table] Options from `OPTIONS` hint should be present in
compiled plan
This closes #25186
---
.editorconfig | 3 +
.../planner/plan/schema/CatalogSourceTable.java | 6 +-
.../apache/flink/table/api/CompiledPlanITCase.java | 29 +++++--
.../jsonplan/testGetJsonPlanWithHints.out | 89 ++++++++++++++++++++++
4 files changed, 117 insertions(+), 10 deletions(-)
diff --git a/.editorconfig b/.editorconfig
index 333c2a16580..5aa9a33ccd2 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -263,6 +263,9 @@ ij_java_variable_annotation_wrap = normal
ij_java_wrap_first_method_in_call_chain = true
# ij_java_wrap_long_lines = false
+[*.out]
+insert_final_newline = false
+
[*.xml]
indent_style = tab
indent_size = 4
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java
index dd501487e4e..0c1cb09188f 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java
@@ -107,19 +107,19 @@ public final class CatalogSourceTable extends
FlinkPreparingTableBase {
// finalize catalog table with option hints
final Map<String, String> hintedOptions =
FlinkHints.getHintedOptions(hints);
- final ContextResolvedTable catalogTable =
+ final ContextResolvedTable contextTableWithHints =
computeContextResolvedTable(context, hintedOptions);
// create table source
final DynamicTableSource tableSource =
- createDynamicTableSource(context,
catalogTable.getResolvedTable());
+ createDynamicTableSource(context,
contextTableWithHints.getResolvedTable());
// prepare table source and convert to RelNode
return DynamicSourceUtils.convertSourceToRel(
!schemaTable.isStreamingMode(),
context.getTableConfig(),
relBuilder,
- schemaTable.getContextResolvedTable(),
+ contextTableWithHints,
schemaTable.getStatistic(),
hints,
tableSource);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index 591f0a52e4a..f432b0fa8fe 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -83,14 +83,25 @@ class CompiledPlanITCase extends JsonPlanTestBase {
tableEnv.compilePlanSql("INSERT INTO MySink SELECT * FROM
MyTable");
String expected =
TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out");
assertThat(
- TableTestUtil.replaceExecNodeId(
- TableTestUtil.replaceFlinkVersion(
- TableTestUtil.getFormattedJson(
- compiledPlan.asJsonString()))))
+ getPreparedToCompareCompiledPlan(
+
TableTestUtil.getFormattedJson(compiledPlan.asJsonString())))
.isEqualTo(
- TableTestUtil.replaceExecNodeId(
- TableTestUtil.replaceFlinkVersion(
-
TableTestUtil.getFormattedJson(expected))));
+
getPreparedToCompareCompiledPlan(TableTestUtil.getFormattedJson(expected)));
+ }
+
+ @Test
+ void testSourceTableWithHints() {
+ CompiledPlan compiledPlan =
+ tableEnv.compilePlanSql(
+ "INSERT INTO MySink SELECT * FROM MyTable"
+ // OPTIONS hints here do not play any
significant role
+ // we just have to be sure that these options
are present in
+ // compiled plan
+ + " /*+ OPTIONS('bounded'='true',
'scan.parallelism'='2') */");
+
+ String expected =
TableTestUtil.readFromResource("/jsonplan/testGetJsonPlanWithHints.out");
+
assertThat(getPreparedToCompareCompiledPlan(compiledPlan.asJsonString()))
+ .isEqualTo(expected);
}
@Test
@@ -432,4 +443,8 @@ class CompiledPlanITCase extends JsonPlanTestBase {
createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
return createTestCsvSinkTable("sink", COLUMNS_DEFINITION);
}
+
+ private String getPreparedToCompareCompiledPlan(final String planAsString)
{
+ return
TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(planAsString));
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
new file mode 100644
index 00000000000..fd7c70323e1
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
@@ -0,0 +1,89 @@
+{
+ "flinkVersion": "",
+ "nodes" : [ {
+ "id": 0,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "bounded" : "true",
+ "connector" : "values",
+ "scan.parallelism" : "2"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS
options:{scan.parallelism=2, bounded=true}]]])",
+ "inputProperties" : [ ]
+ }, {
+ "id": 0,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "table-sink-class" : "DEFAULT"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink],
fields=[a, b, c])"
+ } ],
+ "edges" : [ {
+ "source": 0,
+ "target": 0,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file