This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push:
new 0560ae6fdcd [FLINK-37856] Ensure sink option hints are present in
compiled plan
0560ae6fdcd is described below
commit 0560ae6fdcd3534fc99a5ca4ec718ae752febdb0
Author: Arvid Heise <[email protected]>
AuthorDate: Tue May 27 12:09:10 2025 +0200
[FLINK-37856] Ensure sink option hints are present in compiled plan
compilePlan and executeSql use two different code paths. The former uses
DynamicTableSinkSpec to render the json, the latter directly creates the
DynamicSinkTable. Merging the dynamic options so far only happened on the
second code path.
This commit eagerly merges the options into the ResolvedCatalogTable such
that the combined options can be used in both code paths. The commit still
retains all hints fields and parameters to ensure that explain still outputs
the same plan where hints are explicitly added if available. The hints are not
explicitly used while creating the DynamicSinkTable anymore because the
ResolvedCatalogTable already contains them.
---
.../table/planner/connectors/DynamicSinkUtils.java | 9 +++++++++
.../flink/table/planner/delegation/PlannerBase.scala | 17 ++++++++++-------
.../org/apache/flink/table/api/CompiledPlanITCase.java | 15 ++++++++++++++-
...JsonPlanWithHints.out => testSinkTableWithHints.out} | 11 ++++++-----
...onPlanWithHints.out => testSourceTableWithHints.out} | 0
5 files changed, 39 insertions(+), 13 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 5af511c83f3..41828b9a454 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -105,6 +105,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static org.apache.flink.table.planner.hint.FlinkHints.mergeTableOptions;
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
import static
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsAvoidingCast;
@@ -256,6 +257,14 @@ public final class DynamicSinkUtils {
int[][] targetColumns,
boolean isOverwrite,
DynamicTableSink sink) {
+ if (!dynamicOptions.isEmpty()) {
+ contextResolvedTable =
+ contextResolvedTable.copy(
+ mergeTableOptions(
+ dynamicOptions,
+
contextResolvedTable.getResolvedTable().getOptions()));
+ }
+
final DataTypeFactory dataTypeFactory =
unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
final FlinkTypeFactory typeFactory = unwrapTypeFactory(relBuilder);
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 19f50472cb6..048e8a6aa2b 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -457,11 +457,14 @@ abstract class PlannerBase(
}
case regularTable: CatalogTable =>
- val resolvedTable =
contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
- val tableToFind = if (dynamicOptions.nonEmpty) {
- resolvedTable.copy(FlinkHints.mergeTableOptions(dynamicOptions,
resolvedTable.getOptions))
- } else {
- resolvedTable
+ val resolvedTable = {
+ val resolvedTable =
contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
+ if (dynamicOptions.nonEmpty) {
+ resolvedTable.copy(
+ FlinkHints.mergeTableOptions(dynamicOptions,
resolvedTable.getOptions))
+ } else {
+ resolvedTable
+ }
}
val catalog = toScala(contextResolvedTable.getCatalog)
val objectIdentifier = contextResolvedTable.getIdentifier
@@ -479,14 +482,14 @@ abstract class PlannerBase(
) {
val tableSink = TableFactoryUtil.findAndCreateTableSink(
objectIdentifier,
- tableToFind,
+ resolvedTable,
getTableConfig,
isStreamingMode,
isTemporary)
Option(resolvedTable, tableSink)
} else {
val tableSink =
- createDynamicTableSink(objectIdentifier, catalog, tableToFind,
isTemporary)
+ createDynamicTableSink(objectIdentifier, catalog, resolvedTable,
isTemporary)
Option(resolvedTable, tableSink)
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index b0ccbd732ea..0d5c13d48a5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -96,7 +96,20 @@ class CompiledPlanITCase extends JsonPlanTestBase {
// compiled plan
+ " /*+ OPTIONS('bounded'='true',
'scan.parallelism'='2') */");
- String expected =
TableTestUtil.readFromResource("/jsonplan/testGetJsonPlanWithHints.out");
+ String expected =
TableTestUtil.readFromResource("/jsonplan/testSourceTableWithHints.out");
+
assertThat(getPreparedToCompareCompiledPlan(compiledPlan.asJsonString()))
+ .isEqualTo(expected);
+ }
+
+ @Test
+ void testSinkTableWithHints() {
+ CompiledPlan compiledPlan =
+ tableEnv.compilePlanSql(
+ "INSERT INTO MySink "
+ + "/*+ OPTIONS('sink.parallelism'='2',
'sink-insert-only'='false') */ "
+ + "SELECT * FROM MyTable");
+
+ String expected =
TableTestUtil.readFromResource("/jsonplan/testSinkTableWithHints.out");
assertThat(getPreparedToCompareCompiledPlan(compiledPlan.asJsonString()))
.isEqualTo(expected);
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out
similarity index 88%
copy from
flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
copy to
flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out
index 9b5893b048e..0edb0572182 100644
---
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
+++
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out
@@ -22,15 +22,14 @@
},
"partitionKeys" : [ ],
"options" : {
- "bounded" : "true",
- "connector" : "values",
- "scan.parallelism" : "2"
+ "bounded" : "false",
+ "connector" : "values"
}
}
}
},
"outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS
options:{scan.parallelism=2, bounded=true}]]])",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c])",
"inputProperties" : [ ]
}, {
"id" : 0,
@@ -62,6 +61,8 @@
"partitionKeys" : [ ],
"options" : {
"connector" : "values",
+ "sink-insert-only" : "false",
+ "sink.parallelism" : "2",
"table-sink-class" : "DEFAULT"
}
}
@@ -76,7 +77,7 @@
"priority" : 0
} ],
"outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
- "description" : "Sink(table=[default_catalog.default_database.MySink],
fields=[a, b, c])"
+ "description" : "Sink(table=[default_catalog.default_database.MySink],
fields=[a, b, c], hints=[[[OPTIONS options:{sink-insert-only=false,
sink.parallelism=2}]]])"
} ],
"edges" : [ {
"source" : 0,
diff --git
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out
similarity index 100%
rename from
flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
rename to
flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out