This is an automated email from the ASF dual-hosted git repository.
xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 929bd15daf1 [FLINK-37540][table-planner] Introduce
SupportsTargetColumnWriting sink ability (#26361)
929bd15daf1 is described below
commit 929bd15daf1648eef57c6b6ac263b81986a75fef
Author: xiangyu0xf <[email protected]>
AuthorDate: Wed Apr 16 10:46:41 2025 +0800
[FLINK-37540][table-planner] Introduce SupportsTargetColumnWriting sink
ability (#26361)
---
.../table/connector/sink/DynamicTableSink.java | 2 +
.../abilities/SupportsTargetColumnWriting.java | 62 +++++++++++++++++
.../table/planner/connectors/DynamicSinkUtils.java | 24 ++++++-
.../plan/abilities/sink/SinkAbilitySpec.java | 3 +-
.../abilities/sink/TargetColumnWritingSpec.java | 79 ++++++++++++++++++++++
.../planner/factories/TestValuesTableFactory.java | 21 ++++--
.../exec/serde/DynamicTableSinkSpecSerdeTest.java | 33 ++++++++-
7 files changed, 216 insertions(+), 8 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
index 6fe87e67720..8cd344ca98a 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.sink.abilities.SupportsBucketing;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import
org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
@@ -71,6 +72,7 @@ import java.util.Optional;
* <li>{@link SupportsPartitioning}
* <li>{@link SupportsOverwrite}
* <li>{@link SupportsWritingMetadata}
+ * <li>{@link SupportsTargetColumnWriting}
* </ul>
*
* <p>In the last step, the planner will call {@link
#getSinkRuntimeProvider(Context)} for obtaining
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTargetColumnWriting.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTargetColumnWriting.java
new file mode 100644
index 00000000000..77fa856fddb
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsTargetColumnWriting.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support target column writing.
+ *
+ * <p>The planner will parse target columns from the DML clause and call {@link
+ * #applyTargetColumns(int[][])} to pass an array of column index paths to the
sink.
+ *
+ * <p>The array indices are 0-based and support composite columns within
(possibly nested)
+ * structures. This information comes from the column list of the DML clause,
e.g., for a sink table
+ * t1 whose schema is: {@code a STRING, b ROW < b1 INT, b2 STRING>, c BIGINT}
+ *
+ * <ul>
+ * <li>insert: 'insert into t1(a, b.b2) ...', the column list will be 'a,
b.b2', and will provide
+ * {@code [[0], [1, 1]]}. The statement 'insert into t1 select ...' will
provide an empty list
+ * and will not apply this ability.
+ * <li>update: 'update t1 set a=1, b.b1=2 where ...', the column list will
be 'a, b.b1', and will
+ * provide {@code [[0], [1, 0]]}.
+ * </ul>
+ *
+ * <p>Note: Planner will not apply this ability for the delete statement
because it has no column
+ * list.
+ *
+ * <p>A sink can use this information to perform target columns writing.
+ *
+ * <p>If this interface is implemented and {@link
#applyTargetColumns(int[][])} returns true. The
+ * planner will use this information for plan optimization such as sink reuse.
+ */
+@PublicEvolving
+public interface SupportsTargetColumnWriting {
+
+ /**
+ * Provides an array of column index paths related to user specified
target column list.
+ *
+ * <p>See the documentation of {@link SupportsTargetColumnWriting} for
more information.
+ *
+ * @param targetColumns column index paths
+ * @return true if the target columns are applied successfully, false
otherwise.
+ */
+ boolean applyTargetColumns(int[][] targetColumns);
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 5af511c83f3..a52db9763fc 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -48,6 +48,7 @@ import
org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import
org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.connector.source.DynamicTableSource;
import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
@@ -63,6 +64,7 @@ import
org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec;
import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
import org.apache.flink.table.planner.plan.abilities.sink.RowLevelUpdateSpec;
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import
org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec;
import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec;
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
@@ -279,7 +281,8 @@ public final class DynamicSinkUtils {
isOverwrite,
sink,
contextResolvedTable.getResolvedTable(),
- sinkAbilitySpecs);
+ sinkAbilitySpecs,
+ targetColumns);
// rewrite rel node for delete
if (isDelete) {
@@ -995,7 +998,8 @@ public final class DynamicSinkUtils {
boolean isOverwrite,
DynamicTableSink sink,
ResolvedCatalogTable table,
- List<SinkAbilitySpec> sinkAbilitySpecs) {
+ List<SinkAbilitySpec> sinkAbilitySpecs,
+ int[][] targetColumns) {
table.getDistribution()
.ifPresent(
distribution ->
@@ -1007,6 +1011,8 @@ public final class DynamicSinkUtils {
validateAndApplyOverwrite(tableDebugName, isOverwrite, sink,
sinkAbilitySpecs);
validateAndApplyMetadata(tableDebugName, sink,
table.getResolvedSchema(), sinkAbilitySpecs);
+
+ validateAndApplyTargetColumns(sink, targetColumns, sinkAbilitySpecs);
}
/**
@@ -1285,6 +1291,20 @@ public final class DynamicSinkUtils {
createConsumedType(schema, sink)));
}
+ private static void validateAndApplyTargetColumns(
+ DynamicTableSink sink, int[][] targetColumns,
List<SinkAbilitySpec> sinkAbilitySpecs) {
+ if (targetColumns == null || targetColumns.length == 0) {
+ return;
+ }
+
+ if (!(sink instanceof SupportsTargetColumnWriting)) {
+ // Ignore target columns if the sink doesn't support it.
+ return;
+ }
+
+ sinkAbilitySpecs.add(new TargetColumnWritingSpec(targetColumns));
+ }
+
/**
* Returns the {@link DataType} that a sink should consume as the output
from the runtime.
*
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
index 471145b2a60..0267b4044be 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
@@ -37,7 +37,8 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
@JsonSubTypes.Type(value = PartitioningSpec.class),
@JsonSubTypes.Type(value = WritingMetadataSpec.class),
@JsonSubTypes.Type(value = RowLevelDeleteSpec.class),
- @JsonSubTypes.Type(value = RowLevelUpdateSpec.class)
+ @JsonSubTypes.Type(value = RowLevelUpdateSpec.class),
+ @JsonSubTypes.Type(value = TargetColumnWritingSpec.class)
})
@Internal
public interface SinkAbilitySpec {
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/TargetColumnWritingSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/TargetColumnWritingSpec.java
new file mode 100644
index 00000000000..f25a06a7fbd
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/TargetColumnWritingSpec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.sink;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import
org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A sub-class of {@link SinkAbilitySpec} that can not only
serialize/deserialize the writing target
+ * column indices to/from JSON, but also can write the target columns for
{@link
+ * SupportsTargetColumnWriting}.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeName("TargetColumnWriting")
+public class TargetColumnWritingSpec implements SinkAbilitySpec {
+ public static final String FIELD_NAME_TARGET_COLUMNS = "targetColumns";
+
+ @JsonProperty(FIELD_NAME_TARGET_COLUMNS)
+ private final int[][] targetColumns;
+
+ @JsonCreator
+ public TargetColumnWritingSpec(@JsonProperty(FIELD_NAME_TARGET_COLUMNS)
int[][] targetColumns) {
+ this.targetColumns = targetColumns;
+ }
+
+ @Override
+ public void apply(DynamicTableSink tableSink) {
+ if (tableSink instanceof SupportsTargetColumnWriting) {
+ ((SupportsTargetColumnWriting)
tableSink).applyTargetColumns(targetColumns);
+ } else {
+ throw new TableException(
+ String.format(
+ "%s does not support SupportsTargetColumnWriting.",
+ tableSink.getClass().getName()));
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TargetColumnWritingSpec that = (TargetColumnWritingSpec) o;
+ return Objects.deepEquals(targetColumns, that.targetColumns);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.deepHashCode(targetColumns);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index f75cb778597..f0d984209a6 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -53,6 +53,7 @@ import
org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.abilities.SupportsBucketing;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import
org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
@@ -786,7 +787,8 @@ public final class TestValuesTableFactory
rowTimeIndex,
tableSchema,
requireBucketCount,
- supportsDeleteByKey);
+ supportsDeleteByKey,
+ null);
} else {
try {
return InstantiationUtil.instantiate(
@@ -2221,9 +2223,11 @@ public final class TestValuesTableFactory
SupportsWritingMetadata,
SupportsPartitioning,
SupportsOverwrite,
- SupportsBucketing {
+ SupportsBucketing,
+ SupportsTargetColumnWriting {
private DataType consumedDataType;
+ private int[][] targetColumns;
private int[] primaryKeyIndices;
private final String tableName;
private final boolean isInsertOnly;
@@ -2250,7 +2254,8 @@ public final class TestValuesTableFactory
int rowtimeIndex,
TableSchema tableSchema,
boolean requireBucketCount,
- boolean supportsDeleteByKey) {
+ boolean supportsDeleteByKey,
+ int[][] targetColumns) {
this.consumedDataType = consumedDataType;
this.primaryKeyIndices = primaryKeyIndices;
this.tableName = tableName;
@@ -2264,6 +2269,7 @@ public final class TestValuesTableFactory
this.tableSchema = tableSchema;
this.requireBucketCount = requireBucketCount;
this.supportsDeleteByKey = supportsDeleteByKey;
+ this.targetColumns = targetColumns;
}
@Override
@@ -2416,7 +2422,8 @@ public final class TestValuesTableFactory
rowtimeIndex,
tableSchema,
requireBucketCount,
- supportsDeleteByKey);
+ supportsDeleteByKey,
+ targetColumns);
}
@Override
@@ -2454,6 +2461,12 @@ public final class TestValuesTableFactory
public boolean requiresBucketCount() {
return requireBucketCount;
}
+
+ @Override
+ public boolean applyTargetColumns(int[][] targetColumns) {
+ this.targetColumns = targetColumns;
+ return true;
+ }
}
/** A TableSink used for testing the implementation of {@link
SinkFunction.Context}. */
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
index 2c1eb026cfe..b61c4ced52a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.factories.TestFormatFactory;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec;
import org.apache.flink.table.planner.plan.abilities.sink.PartitioningSpec;
+import
org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec;
import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec;
import
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
import org.apache.flink.table.planner.utils.PlannerMocks;
@@ -173,7 +174,37 @@ class DynamicTableSinkSpecSerdeTest {
RowType.of(new BigIntType(), new
IntType()))),
null);
- return Stream.of(spec1, spec2, spec3);
+ Map<String, String> options4 = new HashMap<>();
+ options4.put("connector", TestValuesTableFactory.IDENTIFIER);
+ int[][] targetColumnIndices = new int[][] {{0}, {1}};
+
+ // Todo: add test cases for nested columns in schema after FLINK-31301
is fixed.
+ final ResolvedSchema resolvedSchema4 =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("a", DataTypes.BIGINT()),
+ Column.physical("b", DataTypes.INT()),
+ Column.metadata("p", DataTypes.STRING(), null,
false)),
+ Collections.emptyList(),
+ null);
+ final CatalogTable catalogTable4 =
+ CatalogTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema4).build())
+ .options(options4)
+ .build();
+
+ DynamicTableSinkSpec spec4 =
+ new DynamicTableSinkSpec(
+ ContextResolvedTable.temporary(
+ ObjectIdentifier.of(
+ CatalogManagerMocks.DEFAULT_CATALOG,
+ CatalogManagerMocks.DEFAULT_DATABASE,
+ "MyTable"),
+ new ResolvedCatalogTable(catalogTable4,
resolvedSchema4)),
+ Collections.singletonList(new
TargetColumnWritingSpec(targetColumnIndices)),
+ targetColumnIndices);
+
+ return Stream.of(spec1, spec2, spec3, spec4);
}
@ParameterizedTest