This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push: new 42cc2a04459 [FLINK-36000][table-planner] Fix DynamicTableSink#Context's getTargetColumns should return an Optional#empty instead of int[0] for insert stmt without column list 42cc2a04459 is described below commit 42cc2a04459bb4365e6a4b444ccd8e6b328a369f Author: lincoln lee <lincoln.8...@gmail.com> AuthorDate: Fri Aug 9 16:11:40 2024 +0800 [FLINK-36000][table-planner] Fix DynamicTableSink#Context's getTargetColumns should return an Optional#empty instead of int[0] for insert stmt without column list This closes #25172 --- .../table/planner/operations/SqlNodeToOperationConversion.java | 5 ++++- .../org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala | 2 +- .../flink/table/planner/factories/TestValuesTableFactory.java | 6 ++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index 0e3aac1f060..4abf47205a4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -1391,8 +1391,11 @@ public class SqlNodeToOperationConversion { private int[][] getTargetColumnIndices( @Nonnull ContextResolvedTable contextResolvedTable, @Nullable SqlNodeList targetColumns) { + if (targetColumns == null) { + return null; + } List<String> allColumns = contextResolvedTable.getResolvedSchema().getColumnNames(); - return Optional.ofNullable(targetColumns).orElse(SqlNodeList.EMPTY).stream() + return targetColumns.stream() .mapToInt(c -> allColumns.indexOf(((SqlIdentifier) c).getSimple())) .mapToObj(idx -> new int[] {idx}) .toArray(int[][]::new); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala index 1934e980d8a..48356801b3d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala @@ -73,7 +73,7 @@ abstract class Sink( .getOrElse(Array.empty[Array[Int]]) .map(_.mkString("[", ",", "]")) .mkString(","), - targetColumns != null && targetColumns.length > 0 + targetColumns != null ) .item("fields", getRowType.getFieldNames.mkString(", ")) .itemIf("hints", RelExplainUtil.hintsToString(hints), !hints.isEmpty) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index 3ebaa4a3a5c..1054d52ed36 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -2105,6 +2105,12 @@ public final class TestValuesTableFactory } else { // we don't support OutputFormat for updating query in the TestValues connector assertThat(runtimeSink.equals("SinkFunction")).isTrue(); + // check the contract of the context.getTargetColumns method returns the expected + // empty Option or non-empty Option with a non-empty array + assertThat( + !context.getTargetColumns().isPresent() + || context.getTargetColumns().get().length > 0) + .isTrue(); SinkFunction<RowData> sinkFunction; if (primaryKeyIndices.length > 0) { // TODO FLINK-31301 currently partial-insert composite columns are not supported