This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new 81de170ddc9 [FLINK-36000][table-planner] Fix DynamicTableSink#Context's getTargetColumns should return an Optional#empty instead of int[0] for insert stmt without column list 81de170ddc9 is described below commit 81de170ddc9311ef1bc74d10841530a6eb60e5bd Author: lincoln lee <lincoln.8...@gmail.com> AuthorDate: Fri Aug 9 16:12:34 2024 +0800 [FLINK-36000][table-planner] Fix DynamicTableSink#Context's getTargetColumns should return an Optional#empty instead of int[0] for insert stmt without column list This closes #25173 --- .../table/planner/operations/SqlNodeToOperationConversion.java | 5 ++++- .../org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala | 2 +- .../flink/table/planner/factories/TestValuesTableFactory.java | 6 ++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index 63bae2ecd44..cd7fef40274 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -1358,8 +1358,11 @@ public class SqlNodeToOperationConversion { private int[][] getTargetColumnIndices( @Nonnull ContextResolvedTable contextResolvedTable, @Nullable SqlNodeList targetColumns) { + if (targetColumns == null) { + return null; + } List<String> allColumns = contextResolvedTable.getResolvedSchema().getColumnNames(); - return Optional.ofNullable(targetColumns).orElse(SqlNodeList.EMPTY).stream() + return targetColumns.stream() .mapToInt(c -> allColumns.indexOf(((SqlIdentifier) c).getSimple())) .mapToObj(idx -> new int[] {idx}) .toArray(int[][]::new); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala index 1934e980d8a..48356801b3d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala @@ -73,7 +73,7 @@ abstract class Sink( .getOrElse(Array.empty[Array[Int]]) .map(_.mkString("[", ",", "]")) .mkString(","), - targetColumns != null && targetColumns.length > 0 + targetColumns != null ) .item("fields", getRowType.getFieldNames.mkString(", ")) .itemIf("hints", RelExplainUtil.hintsToString(hints), !hints.isEmpty) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index 4ed16ba684a..7cea30b132a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -2088,6 +2088,12 @@ public final class TestValuesTableFactory } else { // we don't support OutputFormat for updating query in the TestValues connector assertThat(runtimeSink.equals("SinkFunction")).isTrue(); + // check the contract of the context.getTargetColumns method returns the expected + // empty Option or non-empty Option with a non-empty array + assertThat( + !context.getTargetColumns().isPresent() + || context.getTargetColumns().get().length > 0) + .isTrue(); SinkFunction<RowData> sinkFunction; if (primaryKeyIndices.length > 0) { // TODO FLINK-31301 currently partial-insert composite columns are not supported