This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 42cc2a04459 [FLINK-36000][table-planner] Fix 
DynamicTableSink#Context's getTargetColumns should return an Optional#empty 
instead of int[0] for insert stmt without column list
42cc2a04459 is described below

commit 42cc2a04459bb4365e6a4b444ccd8e6b328a369f
Author: lincoln lee <lincoln.8...@gmail.com>
AuthorDate: Fri Aug 9 16:11:40 2024 +0800

    [FLINK-36000][table-planner] Fix DynamicTableSink#Context's 
getTargetColumns should return an Optional#empty instead of int[0] for insert 
stmt without column list
    
    This closes #25172
---
 .../table/planner/operations/SqlNodeToOperationConversion.java      | 5 ++++-
 .../org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala    | 2 +-
 .../flink/table/planner/factories/TestValuesTableFactory.java       | 6 ++++++
 3 files changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index 0e3aac1f060..4abf47205a4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -1391,8 +1391,11 @@ public class SqlNodeToOperationConversion {
     private int[][] getTargetColumnIndices(
             @Nonnull ContextResolvedTable contextResolvedTable,
             @Nullable SqlNodeList targetColumns) {
+        if (targetColumns == null) {
+            return null;
+        }
         List<String> allColumns = 
contextResolvedTable.getResolvedSchema().getColumnNames();
-        return 
Optional.ofNullable(targetColumns).orElse(SqlNodeList.EMPTY).stream()
+        return targetColumns.stream()
                 .mapToInt(c -> allColumns.indexOf(((SqlIdentifier) 
c).getSimple()))
                 .mapToObj(idx -> new int[] {idx})
                 .toArray(int[][]::new);
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
index 1934e980d8a..48356801b3d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
@@ -73,7 +73,7 @@ abstract class Sink(
           .getOrElse(Array.empty[Array[Int]])
           .map(_.mkString("[", ",", "]"))
           .mkString(","),
-        targetColumns != null && targetColumns.length > 0
+        targetColumns != null
       )
       .item("fields", getRowType.getFieldNames.mkString(", "))
       .itemIf("hints", RelExplainUtil.hintsToString(hints), !hints.isEmpty)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 3ebaa4a3a5c..1054d52ed36 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -2105,6 +2105,12 @@ public final class TestValuesTableFactory
             } else {
                 // we don't support OutputFormat for updating query in the 
TestValues connector
                 assertThat(runtimeSink.equals("SinkFunction")).isTrue();
+                // check the contract of the context.getTargetColumns method 
returns the expected
+                // empty Option or non-empty Option with a non-empty array
+                assertThat(
+                                !context.getTargetColumns().isPresent()
+                                        || 
context.getTargetColumns().get().length > 0)
+                        .isTrue();
                 SinkFunction<RowData> sinkFunction;
                 if (primaryKeyIndices.length > 0) {
                     // TODO FLINK-31301 currently partial-insert composite 
columns are not supported

Reply via email to