This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fc5f3b5290d [FLINK-36000][table-planner] Fix
DynamicTableSink#Context's getTargetColumns should return an Optional#empty
instead of int[0] for insert stmt without column list
fc5f3b5290d is described below
commit fc5f3b5290dfb0b39682f29d0fc8e851dda5dd31
Author: lincoln lee <[email protected]>
AuthorDate: Thu Aug 8 09:25:10 2024 +0800
[FLINK-36000][table-planner] Fix DynamicTableSink#Context's
getTargetColumns should return an Optional#empty instead of int[0] for insert
stmt without column list
This closes #25165
---
.../table/planner/operations/SqlNodeToOperationConversion.java | 5 ++++-
.../org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala | 2 +-
.../flink/table/planner/factories/TestValuesTableFactory.java | 6 ++++++
3 files changed, 11 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index f73b314d56a..1a7e4b16317 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -1389,8 +1389,11 @@ public class SqlNodeToOperationConversion {
private int[][] getTargetColumnIndices(
@Nonnull ContextResolvedTable contextResolvedTable,
@Nullable SqlNodeList targetColumns) {
+ if (targetColumns == null) {
+ return null;
+ }
List<String> allColumns =
contextResolvedTable.getResolvedSchema().getColumnNames();
- return
Optional.ofNullable(targetColumns).orElse(SqlNodeList.EMPTY).stream()
+ return targetColumns.stream()
.mapToInt(c -> allColumns.indexOf(((SqlIdentifier)
c).getSimple()))
.mapToObj(idx -> new int[] {idx})
.toArray(int[][]::new);
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
index 1934e980d8a..48356801b3d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
@@ -73,7 +73,7 @@ abstract class Sink(
.getOrElse(Array.empty[Array[Int]])
.map(_.mkString("[", ",", "]"))
.mkString(","),
- targetColumns != null && targetColumns.length > 0
+ targetColumns != null
)
.item("fields", getRowType.getFieldNames.mkString(", "))
.itemIf("hints", RelExplainUtil.hintsToString(hints), !hints.isEmpty)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 3ebaa4a3a5c..1054d52ed36 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -2105,6 +2105,12 @@ public final class TestValuesTableFactory
} else {
// we don't support OutputFormat for updating query in the
TestValues connector
assertThat(runtimeSink.equals("SinkFunction")).isTrue();
+ // check the contract of the context.getTargetColumns method
returns the expected
+ // empty Option or non-empty Option with a non-empty array
+ assertThat(
+ !context.getTargetColumns().isPresent()
+ ||
context.getTargetColumns().get().length > 0)
+ .isTrue();
SinkFunction<RowData> sinkFunction;
if (primaryKeyIndices.length > 0) {
// TODO FLINK-31301 currently partial-insert composite
columns are not supported