This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 08f3fb88f [FLINK-39289][paimon] Supports executing TRUNCATE schema 
change events on tables with deletion-vectors.enabled:true set (#4327)
08f3fb88f is described below

commit 08f3fb88f77d1aabc4c7f991c06b2fe3565028fa
Author: zexin.gong <[email protected]>
AuthorDate: Tue Mar 24 20:31:16 2026 +0800

    [FLINK-39289][paimon] Supports executing TRUNCATE schema change events on 
tables with deletion-vectors.enabled:true set (#4327)
    
    Co-authored-by: hector.gong <[email protected]>
---
 .../cdc/connectors/paimon/sink/PaimonMetadataApplier.java |  5 -----
 .../cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java   | 15 ++-------------
 2 files changed, 2 insertions(+), 18 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
index ebe00ed3e..b5de3e62a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
@@ -30,7 +30,6 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.event.TruncateTableEvent;
 import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
 import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
-import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
@@ -354,10 +353,6 @@ public class PaimonMetadataApplier implements 
MetadataApplier {
     private void applyTruncateTable(TruncateTableEvent event) throws 
SchemaEvolveException {
         try {
             Table table = catalog.getTable(tableIdToIdentifier(event));
-            if 
(table.options().get("deletion-vectors.enabled").equals("true")) {
-                throw new UnsupportedSchemaChangeEventException(
-                        event, "Unable to truncate a table with deletion 
vectors enabled.", null);
-            }
             try (BatchTableCommit batchTableCommit = 
table.newBatchWriteBuilder().newCommit()) {
                 batchTableCommit.truncateTable();
             }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index e4473d4d0..d3277e3aa 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -45,7 +45,6 @@ import 
org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.event.TruncateTableEvent;
 import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
-import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
 import org.apache.flink.cdc.common.factories.DataSinkFactory;
 import org.apache.flink.cdc.common.factories.FactoryHelper;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
@@ -489,18 +488,8 @@ public class PaimonSinkITCase {
                         Row.ofKind(RowKind.INSERT, "6", "6"));
 
         TruncateTableEvent truncateTableEvent = new TruncateTableEvent(table1);
-        if (enableDeleteVector) {
-            Assertions.assertThatThrownBy(
-                            () -> 
metadataApplier.applySchemaChange(truncateTableEvent))
-                    .isExactlyInstanceOf(SchemaEvolveException.class)
-                    .cause()
-                    
.isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class)
-                    .extracting("exceptionMessage")
-                    .isEqualTo("Unable to truncate a table with deletion 
vectors enabled.");
-        } else {
-            metadataApplier.applySchemaChange(truncateTableEvent);
-            Assertions.assertThat(fetchResults(table1)).isEmpty();
-        }
+        metadataApplier.applySchemaChange(truncateTableEvent);
+        Assertions.assertThat(fetchResults(table1)).isEmpty();
 
         DropTableEvent dropTableEvent = new DropTableEvent(table1);
         metadataApplier.applySchemaChange(dropTableEvent);

Reply via email to