This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 986ab8660f7 [FLINK-37856] Ensure sink option hints are present in 
compiled plan
986ab8660f7 is described below

commit 986ab8660f7b5877b316bbea5fa857bce348580b
Author: Arvid Heise <[email protected]>
AuthorDate: Tue May 27 12:09:10 2025 +0200

    [FLINK-37856] Ensure sink option hints are present in compiled plan
    
    compilePlan and executeSql use two different code paths. The former uses 
DynamicTableSinkSpec to render the json, the latter directly creates the 
DynamicSinkTable. Merging the dynamic options so far only happened on the 
second code path.
    
    This commit eagerly merges the options into the ResolvedCatalogTable such 
that the combined options can be used in both code paths. The commit still 
retains all hints fields and parameters to ensure that explain still outputs 
the same plan where hints are explicitly added if available. The hints are not 
explicitly used while creating the DynamicSinkTable anymore because the 
ResolvedCatalogTable already contains them.
---
 .../table/planner/connectors/DynamicSinkUtils.java      |  9 +++++++++
 .../flink/table/planner/delegation/PlannerBase.scala    | 17 ++++++++++-------
 .../org/apache/flink/table/api/CompiledPlanITCase.java  | 15 ++++++++++++++-
 ...JsonPlanWithHints.out => testSinkTableWithHints.out} | 11 ++++++-----
 ...onPlanWithHints.out => testSourceTableWithHints.out} |  0
 5 files changed, 39 insertions(+), 13 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index a52db9763fc..e5bcfa075ce 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -107,6 +107,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static org.apache.flink.table.planner.hint.FlinkHints.mergeTableOptions;
 import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
 import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsAvoidingCast;
@@ -258,6 +259,14 @@ public final class DynamicSinkUtils {
             int[][] targetColumns,
             boolean isOverwrite,
             DynamicTableSink sink) {
+        if (!dynamicOptions.isEmpty()) {
+            contextResolvedTable =
+                    contextResolvedTable.copy(
+                            mergeTableOptions(
+                                    dynamicOptions,
+                                    
contextResolvedTable.getResolvedTable().getOptions()));
+        }
+
         final DataTypeFactory dataTypeFactory =
                 
unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
         final FlinkTypeFactory typeFactory = unwrapTypeFactory(relBuilder);
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 702ffc76650..75e6826aebd 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -466,11 +466,14 @@ abstract class PlannerBase(
         }
 
       case regularTable: CatalogTable =>
-        val resolvedTable = 
contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
-        val tableToFind = if (dynamicOptions.asScala.nonEmpty) {
-          resolvedTable.copy(FlinkHints.mergeTableOptions(dynamicOptions, 
resolvedTable.getOptions))
-        } else {
-          resolvedTable
+        val resolvedTable = {
+          val resolvedTable = 
contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
+          if (dynamicOptions.asScala.nonEmpty) {
+            resolvedTable.copy(
+              FlinkHints.mergeTableOptions(dynamicOptions, 
resolvedTable.getOptions))
+          } else {
+            resolvedTable
+          }
         }
         val catalog = toScala(contextResolvedTable.getCatalog)
         val objectIdentifier = contextResolvedTable.getIdentifier
@@ -488,14 +491,14 @@ abstract class PlannerBase(
         ) {
           val tableSink = TableFactoryUtil.findAndCreateTableSink(
             objectIdentifier,
-            tableToFind,
+            resolvedTable,
             getTableConfig,
             isStreamingMode,
             isTemporary)
           Option(resolvedTable, tableSink)
         } else {
           val tableSink =
-            createDynamicTableSink(objectIdentifier, catalog, tableToFind, 
isTemporary)
+            createDynamicTableSink(objectIdentifier, catalog, resolvedTable, 
isTemporary)
           Option(resolvedTable, tableSink)
         }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index b0ccbd732ea..0d5c13d48a5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -96,7 +96,20 @@ class CompiledPlanITCase extends JsonPlanTestBase {
                                 // compiled plan
                                 + " /*+ OPTIONS('bounded'='true', 
'scan.parallelism'='2') */");
 
-        String expected = 
TableTestUtil.readFromResource("/jsonplan/testGetJsonPlanWithHints.out");
+        String expected = 
TableTestUtil.readFromResource("/jsonplan/testSourceTableWithHints.out");
+        
assertThat(getPreparedToCompareCompiledPlan(compiledPlan.asJsonString()))
+                .isEqualTo(expected);
+    }
+
+    @Test
+    void testSinkTableWithHints() {
+        CompiledPlan compiledPlan =
+                tableEnv.compilePlanSql(
+                        "INSERT INTO MySink "
+                                + "/*+ OPTIONS('sink.parallelism'='2', 
'sink-insert-only'='false') */ "
+                                + "SELECT * FROM MyTable");
+
+        String expected = 
TableTestUtil.readFromResource("/jsonplan/testSinkTableWithHints.out");
         
assertThat(getPreparedToCompareCompiledPlan(compiledPlan.asJsonString()))
                 .isEqualTo(expected);
     }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out
similarity index 88%
copy from 
flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
copy to 
flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out
index 9b5893b048e..0edb0572182 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out
@@ -22,15 +22,14 @@
           },
           "partitionKeys" : [ ],
           "options" : {
-            "bounded" : "true",
-            "connector" : "values",
-            "scan.parallelism" : "2"
+            "bounded" : "false",
+            "connector" : "values"
           }
         }
       }
     },
     "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS 
options:{scan.parallelism=2, bounded=true}]]])",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])",
     "inputProperties" : [ ]
   }, {
     "id" : 0,
@@ -62,6 +61,8 @@
           "partitionKeys" : [ ],
           "options" : {
             "connector" : "values",
+            "sink-insert-only" : "false",
+            "sink.parallelism" : "2",
             "table-sink-class" : "DEFAULT"
           }
         }
@@ -76,7 +77,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c], hints=[[[OPTIONS options:{sink-insert-only=false, 
sink.parallelism=2}]]])"
   } ],
   "edges" : [ {
     "source" : 0,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out
similarity index 100%
rename from 
flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out
rename to 
flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out

Reply via email to