This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 08f3fb88f [FLINK-39289][paimon] Supports executing TRUNCATE schema
change events on tables with deletion-vectors.enabled:true set (#4327)
08f3fb88f is described below
commit 08f3fb88f77d1aabc4c7f991c06b2fe3565028fa
Author: zexin.gong <[email protected]>
AuthorDate: Tue Mar 24 20:31:16 2026 +0800
[FLINK-39289][paimon] Supports executing TRUNCATE schema change events on
tables with deletion-vectors.enabled:true set (#4327)
Co-authored-by: hector.gong <[email protected]>
---
.../cdc/connectors/paimon/sink/PaimonMetadataApplier.java | 5 -----
.../cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java | 15 ++-------------
2 files changed, 2 insertions(+), 18 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
index ebe00ed3e..b5de3e62a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
@@ -30,7 +30,6 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
-import
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
@@ -354,10 +353,6 @@ public class PaimonMetadataApplier implements
MetadataApplier {
private void applyTruncateTable(TruncateTableEvent event) throws
SchemaEvolveException {
try {
Table table = catalog.getTable(tableIdToIdentifier(event));
- if
(table.options().get("deletion-vectors.enabled").equals("true")) {
- throw new UnsupportedSchemaChangeEventException(
- event, "Unable to truncate a table with deletion
vectors enabled.", null);
- }
try (BatchTableCommit batchTableCommit =
table.newBatchWriteBuilder().newCommit()) {
batchTableCommit.truncateTable();
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index e4473d4d0..d3277e3aa 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -45,7 +45,6 @@ import
org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
-import
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
@@ -489,18 +488,8 @@ public class PaimonSinkITCase {
Row.ofKind(RowKind.INSERT, "6", "6"));
TruncateTableEvent truncateTableEvent = new TruncateTableEvent(table1);
- if (enableDeleteVector) {
- Assertions.assertThatThrownBy(
- () ->
metadataApplier.applySchemaChange(truncateTableEvent))
- .isExactlyInstanceOf(SchemaEvolveException.class)
- .cause()
-
.isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class)
- .extracting("exceptionMessage")
- .isEqualTo("Unable to truncate a table with deletion
vectors enabled.");
- } else {
- metadataApplier.applySchemaChange(truncateTableEvent);
- Assertions.assertThat(fetchResults(table1)).isEmpty();
- }
+ metadataApplier.applySchemaChange(truncateTableEvent);
+ Assertions.assertThat(fetchResults(table1)).isEmpty();
DropTableEvent dropTableEvent = new DropTableEvent(table1);
metadataApplier.applySchemaChange(dropTableEvent);