This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 929bd15daf1 [FLINK-37540][table-planner] Introduce 
SupportsTargetColumnWriting sink ability (#26361)
929bd15daf1 is described below

commit 929bd15daf1648eef57c6b6ac263b81986a75fef
Author: xiangyu0xf <[email protected]>
AuthorDate: Wed Apr 16 10:46:41 2025 +0800

    [FLINK-37540][table-planner] Introduce SupportsTargetColumnWriting sink 
ability (#26361)
---
 .../table/connector/sink/DynamicTableSink.java     |  2 +
 .../abilities/SupportsTargetColumnWriting.java     | 62 +++++++++++++++++
 .../table/planner/connectors/DynamicSinkUtils.java | 24 ++++++-
 .../plan/abilities/sink/SinkAbilitySpec.java       |  3 +-
 .../abilities/sink/TargetColumnWritingSpec.java    | 79 ++++++++++++++++++++++
 .../planner/factories/TestValuesTableFactory.java  | 21 ++++--
 .../exec/serde/DynamicTableSinkSpecSerdeTest.java  | 33 ++++++++-
 7 files changed, 216 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
index 6fe87e67720..8cd344ca98a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.connector.RuntimeConverter;
 import org.apache.flink.table.connector.sink.abilities.SupportsBucketing;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import 
org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
 import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -71,6 +72,7 @@ import java.util.Optional;
  *   <li>{@link SupportsPartitioning}
  *   <li>{@link SupportsOverwrite}
  *   <li>{@link SupportsWritingMetadata}
+ *   <li>{@link SupportsTargetColumnWriting}
  * </ul>
  *
  * <p>In the last step, the planner will call {@link 
#getSinkRuntimeProvider(Context)} for obtaining
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTargetColumnWriting.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTargetColumnWriting.java
new file mode 100644
index 00000000000..77fa856fddb
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTargetColumnWriting.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support target column writing.
+ *
+ * <p>The planner will parse target columns from the DML clause and call {@link
+ * #applyTargetColumns(int[][])} to pass an array of column index paths to the 
sink.
+ *
+ * <p>The array indices are 0-based and support composite columns within 
(possibly nested)
+ * structures. This information comes from the column list of the DML clause, 
e.g., for a sink table
+ * t1 whose schema is: {@code a STRING, b ROW < b1 INT, b2 STRING>, c BIGINT}
+ *
+ * <ul>
+ *   <li>insert: 'insert into t1(a, b.b2) ...', the column list will be 'a, 
b.b2', and will provide
+ *       {@code [[0], [1, 1]]}. The statement 'insert into t1 select ...' will 
provide an empty list
+ *       and will not apply this ability.
+ *   <li>update: 'update t1 set a=1, b.b1=2 where ...', the column list will 
be 'a, b.b1', and will
+ *       provide {@code [[0], [1, 0]]}.
+ * </ul>
+ *
+ * <p>Note: Planner will not apply this ability for the delete statement 
because it has no column
+ * list.
+ *
+ * <p>A sink can use this information to perform target columns writing.
+ *
+ * <p>If this interface is implemented and {@link 
#applyTargetColumns(int[][])} returns true. The
+ * planner will use this information for plan optimization such as sink reuse.
+ */
+@PublicEvolving
+public interface SupportsTargetColumnWriting {
+
+    /**
+     * Provides an array of column index paths related to user specified 
target column list.
+     *
+     * <p>See the documentation of {@link SupportsTargetColumnWriting} for 
more information.
+     *
+     * @param targetColumns column index paths
+     * @return true if the target columns are applied successfully, false 
otherwise.
+     */
+    boolean applyTargetColumns(int[][] targetColumns);
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 5af511c83f3..a52db9763fc 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import 
org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
 import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
@@ -63,6 +64,7 @@ import 
org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.RowLevelUpdateSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import 
org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec;
 import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
@@ -279,7 +281,8 @@ public final class DynamicSinkUtils {
                 isOverwrite,
                 sink,
                 contextResolvedTable.getResolvedTable(),
-                sinkAbilitySpecs);
+                sinkAbilitySpecs,
+                targetColumns);
 
         // rewrite rel node for delete
         if (isDelete) {
@@ -995,7 +998,8 @@ public final class DynamicSinkUtils {
             boolean isOverwrite,
             DynamicTableSink sink,
             ResolvedCatalogTable table,
-            List<SinkAbilitySpec> sinkAbilitySpecs) {
+            List<SinkAbilitySpec> sinkAbilitySpecs,
+            int[][] targetColumns) {
         table.getDistribution()
                 .ifPresent(
                         distribution ->
@@ -1007,6 +1011,8 @@ public final class DynamicSinkUtils {
         validateAndApplyOverwrite(tableDebugName, isOverwrite, sink, 
sinkAbilitySpecs);
 
         validateAndApplyMetadata(tableDebugName, sink, 
table.getResolvedSchema(), sinkAbilitySpecs);
+
+        validateAndApplyTargetColumns(sink, targetColumns, sinkAbilitySpecs);
     }
 
     /**
@@ -1285,6 +1291,20 @@ public final class DynamicSinkUtils {
                         createConsumedType(schema, sink)));
     }
 
+    private static void validateAndApplyTargetColumns(
+            DynamicTableSink sink, int[][] targetColumns, 
List<SinkAbilitySpec> sinkAbilitySpecs) {
+        if (targetColumns == null || targetColumns.length == 0) {
+            return;
+        }
+
+        if (!(sink instanceof SupportsTargetColumnWriting)) {
+            // Ignore target columns if the sink doesn't support it.
+            return;
+        }
+
+        sinkAbilitySpecs.add(new TargetColumnWritingSpec(targetColumns));
+    }
+
     /**
      * Returns the {@link DataType} that a sink should consume as the output 
from the runtime.
      *
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
index 471145b2a60..0267b4044be 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
@@ -37,7 +37,8 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
     @JsonSubTypes.Type(value = PartitioningSpec.class),
     @JsonSubTypes.Type(value = WritingMetadataSpec.class),
     @JsonSubTypes.Type(value = RowLevelDeleteSpec.class),
-    @JsonSubTypes.Type(value = RowLevelUpdateSpec.class)
+    @JsonSubTypes.Type(value = RowLevelUpdateSpec.class),
+    @JsonSubTypes.Type(value = TargetColumnWritingSpec.class)
 })
 @Internal
 public interface SinkAbilitySpec {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/TargetColumnWritingSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/TargetColumnWritingSpec.java
new file mode 100644
index 00000000000..f25a06a7fbd
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/TargetColumnWritingSpec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.sink;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import 
org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A sub-class of {@link SinkAbilitySpec} that can not only 
serialize/deserialize the writing target
+ * column indices to/from JSON, but also can write the target columns for 
{@link
+ * SupportsTargetColumnWriting}.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeName("TargetColumnWriting")
+public class TargetColumnWritingSpec implements SinkAbilitySpec {
+    public static final String FIELD_NAME_TARGET_COLUMNS = "targetColumns";
+
+    @JsonProperty(FIELD_NAME_TARGET_COLUMNS)
+    private final int[][] targetColumns;
+
+    @JsonCreator
+    public TargetColumnWritingSpec(@JsonProperty(FIELD_NAME_TARGET_COLUMNS) 
int[][] targetColumns) {
+        this.targetColumns = targetColumns;
+    }
+
+    @Override
+    public void apply(DynamicTableSink tableSink) {
+        if (tableSink instanceof SupportsTargetColumnWriting) {
+            ((SupportsTargetColumnWriting) 
tableSink).applyTargetColumns(targetColumns);
+        } else {
+            throw new TableException(
+                    String.format(
+                            "%s does not support SupportsTargetColumnWriting.",
+                            tableSink.getClass().getName()));
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TargetColumnWritingSpec that = (TargetColumnWritingSpec) o;
+        return Objects.deepEquals(targetColumns, that.targetColumns);
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.deepHashCode(targetColumns);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index f75cb778597..f0d984209a6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -53,6 +53,7 @@ import 
org.apache.flink.table.connector.sink.OutputFormatProvider;
 import org.apache.flink.table.connector.sink.abilities.SupportsBucketing;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import 
org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
 import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
 import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
@@ -786,7 +787,8 @@ public final class TestValuesTableFactory
                     rowTimeIndex,
                     tableSchema,
                     requireBucketCount,
-                    supportsDeleteByKey);
+                    supportsDeleteByKey,
+                    null);
         } else {
             try {
                 return InstantiationUtil.instantiate(
@@ -2221,9 +2223,11 @@ public final class TestValuesTableFactory
                     SupportsWritingMetadata,
                     SupportsPartitioning,
                     SupportsOverwrite,
-                    SupportsBucketing {
+                    SupportsBucketing,
+                    SupportsTargetColumnWriting {
 
         private DataType consumedDataType;
+        private int[][] targetColumns;
         private int[] primaryKeyIndices;
         private final String tableName;
         private final boolean isInsertOnly;
@@ -2250,7 +2254,8 @@ public final class TestValuesTableFactory
                 int rowtimeIndex,
                 TableSchema tableSchema,
                 boolean requireBucketCount,
-                boolean supportsDeleteByKey) {
+                boolean supportsDeleteByKey,
+                int[][] targetColumns) {
             this.consumedDataType = consumedDataType;
             this.primaryKeyIndices = primaryKeyIndices;
             this.tableName = tableName;
@@ -2264,6 +2269,7 @@ public final class TestValuesTableFactory
             this.tableSchema = tableSchema;
             this.requireBucketCount = requireBucketCount;
             this.supportsDeleteByKey = supportsDeleteByKey;
+            this.targetColumns = targetColumns;
         }
 
         @Override
@@ -2416,7 +2422,8 @@ public final class TestValuesTableFactory
                     rowtimeIndex,
                     tableSchema,
                     requireBucketCount,
-                    supportsDeleteByKey);
+                    supportsDeleteByKey,
+                    targetColumns);
         }
 
         @Override
@@ -2454,6 +2461,12 @@ public final class TestValuesTableFactory
         public boolean requiresBucketCount() {
             return requireBucketCount;
         }
+
+        @Override
+        public boolean applyTargetColumns(int[][] targetColumns) {
+            this.targetColumns = targetColumns;
+            return true;
+        }
     }
 
     /** A TableSink used for testing the implementation of {@link 
SinkFunction.Context}. */
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
index 2c1eb026cfe..b61c4ced52a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.factories.TestFormatFactory;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.PartitioningSpec;
+import 
org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec;
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
 import org.apache.flink.table.planner.utils.PlannerMocks;
@@ -173,7 +174,37 @@ class DynamicTableSinkSpecSerdeTest {
                                         RowType.of(new BigIntType(), new 
IntType()))),
                         null);
 
-        return Stream.of(spec1, spec2, spec3);
+        Map<String, String> options4 = new HashMap<>();
+        options4.put("connector", TestValuesTableFactory.IDENTIFIER);
+        int[][] targetColumnIndices = new int[][] {{0}, {1}};
+
+        // Todo: add test cases for nested columns in schema after FLINK-31301 
is fixed.
+        final ResolvedSchema resolvedSchema4 =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("a", DataTypes.BIGINT()),
+                                Column.physical("b", DataTypes.INT()),
+                                Column.metadata("p", DataTypes.STRING(), null, 
false)),
+                        Collections.emptyList(),
+                        null);
+        final CatalogTable catalogTable4 =
+                CatalogTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema4).build())
+                        .options(options4)
+                        .build();
+
+        DynamicTableSinkSpec spec4 =
+                new DynamicTableSinkSpec(
+                        ContextResolvedTable.temporary(
+                                ObjectIdentifier.of(
+                                        CatalogManagerMocks.DEFAULT_CATALOG,
+                                        CatalogManagerMocks.DEFAULT_DATABASE,
+                                        "MyTable"),
+                                new ResolvedCatalogTable(catalogTable4, 
resolvedSchema4)),
+                        Collections.singletonList(new 
TargetColumnWritingSpec(targetColumnIndices)),
+                        targetColumnIndices);
+
+        return Stream.of(spec1, spec2, spec3, spec4);
     }
 
     @ParameterizedTest

Reply via email to