This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ec81a875 [kv] Support 'table.delete.behavior' config to disable or 
ignore deletion on the table (#1783)
3ec81a875 is described below

commit 3ec81a875d90dd87f927b65ed15d3102086f4550
Author: CaoZhen <[email protected]>
AuthorDate: Tue Oct 21 22:55:16 2025 +0800

    [kv] Support 'table.delete.behavior' config to disable or ignore deletion 
on the table (#1783)
---
 .../fluss/client/admin/FlussAdminITCase.java       |  88 ++++++++++++++
 .../org/apache/fluss/config/ConfigOptions.java     |  13 +++
 .../java/org/apache/fluss/config/TableConfig.java  |   6 +
 .../fluss/exception/DeletionDisabledException.java |  35 ++++++
 .../org/apache/fluss/metadata/DeleteBehavior.java  |  62 ++++++++++
 .../fluss/flink/catalog/FlinkTableFactory.java     |   2 +
 .../apache/fluss/flink/sink/FlinkTableSink.java    |  27 +++--
 .../fluss/flink/source/FlinkTableSource.java       |  19 ++-
 .../fluss/flink/sink/FlinkTableSinkITCase.java     | 128 +++++++++++++++++++++
 .../java/org/apache/fluss/rpc/protocol/Errors.java |   5 +-
 .../server/coordinator/CoordinatorService.java     |  16 +++
 .../java/org/apache/fluss/server/kv/KvTablet.java  |   9 +-
 .../server/kv/rowmerger/DefaultRowMerger.java      |  22 ++--
 .../server/kv/rowmerger/FirstRowRowMerger.java     |  16 ++-
 .../fluss/server/kv/rowmerger/RowMerger.java       |  19 +--
 .../server/kv/rowmerger/VersionedRowMerger.java    |  16 ++-
 .../server/utils/TableDescriptorValidation.java    |  26 +++++
 .../server/kv/rowmerger/DefaultRowMergerTest.java  |  83 +++++++++++++
 .../kv/rowmerger/VersionedRowMergerTest.java       |   7 +-
 website/docs/engine-flink/options.md               |   1 +
 20 files changed, 568 insertions(+), 32 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 61b8f5f28..860d63f36 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -50,6 +50,7 @@ import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.fs.FsPathAndFileName;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.PartitionInfo;
@@ -86,6 +87,7 @@ import java.util.stream.Stream;
 
 import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
 import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -331,6 +333,92 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                         "Table name __internal_table is invalid: '__' is not 
allowed as prefix, since it is reserved for internal databases/internal 
tables/internal partitions in Fluss server");
     }
 
+    @Test
+    void testCreateTableWithDeleteBehavior() {
+        // Test 1: FIRST_ROW merge engine - should set delete behavior to 
IGNORE
+        TablePath tablePath1 = TablePath.of("fluss", 
"test_ignore_delete_for_first_row");
+        Map<String, String> properties1 = new HashMap<>();
+        properties1.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "first_row");
+
+        TableDescriptor tableDescriptor1 =
+                TableDescriptor.builder()
+                        .schema(DEFAULT_SCHEMA)
+                        .comment("first row merge engine table")
+                        .properties(properties1)
+                        .build();
+        admin.createTable(tablePath1, tableDescriptor1, false).join();
+
+        // Get the table and verify delete behavior is changed to IGNORE
+        TableInfo tableInfo1 = admin.getTableInfo(tablePath1).join();
+        
assertThat(tableInfo1.getTableConfig().getDeleteBehavior()).hasValue(DeleteBehavior.IGNORE);
+
+        // Test 2: VERSIONED merge engine - should set delete behavior to 
IGNORE
+        TablePath tablePath2 = TablePath.of("fluss", 
"test_ignore_delete_for_versioned");
+        Map<String, String> properties2 = new HashMap<>();
+        properties2.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned");
+        properties2.put(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(), 
"age");
+        TableDescriptor tableDescriptor2 =
+                TableDescriptor.builder()
+                        .schema(DEFAULT_SCHEMA)
+                        .comment("versioned merge engine table")
+                        .properties(properties2)
+                        .build();
+        admin.createTable(tablePath2, tableDescriptor2, false).join();
+        // Get the table and verify delete behavior is changed to IGNORE
+        TableInfo tableInfo2 = admin.getTableInfo(tablePath2).join();
+        
assertThat(tableInfo2.getTableConfig().getDeleteBehavior()).hasValue(DeleteBehavior.IGNORE);
+
+        // Test 3: FIRST_ROW merge engine with delete behavior explicitly set 
to ALLOW
+        TablePath tablePath3 = TablePath.of("fluss", 
"test_allow_delete_for_first_row");
+        Map<String, String> properties3 = new HashMap<>();
+        properties3.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "first_row");
+        properties3.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "ALLOW");
+        TableDescriptor tableDescriptor3 =
+                TableDescriptor.builder()
+                        .schema(DEFAULT_SCHEMA)
+                        .comment("first row merge engine table")
+                        .properties(properties3)
+                        .build();
+        assertThatThrownBy(() -> admin.createTable(tablePath3, 
tableDescriptor3, false).join())
+                .hasRootCauseInstanceOf(InvalidConfigException.class)
+                .hasMessageContaining(
+                        "Table with 'FIRST_ROW' merge engine does not support 
delete operations. "
+                                + "The 'table.delete.behavior' config must be 
set to 'ignore' or 'disable', but got 'allow'.");
+
+        // Test 4: VERSIONED merge engine with delete behavior explicitly set 
to ALLOW
+        TablePath tablePath4 = TablePath.of("fluss", 
"test_allow_delete_for_versioned");
+        Map<String, String> properties4 = new HashMap<>();
+        properties4.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned");
+        properties4.put(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(), 
"age");
+        properties4.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "ALLOW");
+        TableDescriptor tableDescriptor4 =
+                TableDescriptor.builder()
+                        .schema(DEFAULT_SCHEMA)
+                        .comment("versioned merge engine table")
+                        .properties(properties4)
+                        .build();
+        assertThatThrownBy(() -> admin.createTable(tablePath4, 
tableDescriptor4, false).join())
+                .hasRootCauseInstanceOf(InvalidConfigException.class)
+                .hasMessageContaining(
+                        "Table with 'VERSIONED' merge engine does not support 
delete operations. "
+                                + "The 'table.delete.behavior' config must be 
set to 'ignore' or 'disable', but got 'allow'.");
+
+        // Test 5: Log table - not allow to set delete behavior
+        TablePath tablePath5 = TablePath.of("fluss", 
"test_set_delete_behavior_for_log_table");
+        Map<String, String> properties5 = new HashMap<>();
+        properties5.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "IGNORE");
+        TableDescriptor tableDescriptor5 =
+                TableDescriptor.builder()
+                        .schema(DATA1_SCHEMA)
+                        .comment("log table")
+                        .properties(properties5)
+                        .build();
+        assertThatThrownBy(() -> admin.createTable(tablePath5, 
tableDescriptor5, false).join())
+                .hasRootCauseInstanceOf(InvalidConfigException.class)
+                .hasMessageContaining(
+                        "The 'table.delete.behavior' configuration is only 
supported for primary key tables.");
+    }
+
     @Test
     void testCreateTableWithInvalidProperty() {
         TablePath tablePath = 
TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "test_property");
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 6c75ff3c9..6b045bb38 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.compression.ArrowCompressionType;
 import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.MergeEngineType;
@@ -1381,6 +1382,18 @@ public class ConfigOptions {
                             "The column name of the version column for the 
`versioned` merge engine. "
                                     + "If the merge engine is set to 
`versioned`, the version column must be set.");
 
+    public static final ConfigOption<DeleteBehavior> TABLE_DELETE_BEHAVIOR =
+            key("table.delete.behavior")
+                    .enumType(DeleteBehavior.class)
+                    .defaultValue(DeleteBehavior.ALLOW)
+                    .withDescription(
+                            "Defines the delete behavior for the primary key 
table. "
+                                    + "The supported delete behaviors are 
`allow`, `ignore`, and `disable`. "
+                                    + "The `allow` behavior allows normal 
delete operations (default). "
+                                    + "The `ignore` behavior silently skips 
delete requests without error. "
+                                    + "The `disable` behavior rejects delete 
requests with a clear error message. "
+                                    + "For tables with FIRST_ROW or VERSIONED 
merge engines, this option defaults to `ignore`.");
+
     // ------------------------------------------------------------------------
     //  ConfigOptions for Kv
     // ------------------------------------------------------------------------
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java 
b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
index a1f8ac9cc..a1422f460 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
@@ -20,6 +20,7 @@ package org.apache.fluss.config;
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.compression.ArrowCompressionInfo;
 import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.MergeEngineType;
@@ -111,6 +112,11 @@ public class TableConfig {
         return 
config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
     }
 
+    /** Gets the delete behavior of the table. */
+    public Optional<DeleteBehavior> getDeleteBehavior() {
+        return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
+    }
+
     /** Gets the Arrow compression type and compression level of the table. */
     public ArrowCompressionInfo getArrowCompressionInfo() {
         return ArrowCompressionInfo.fromConf(config);
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/exception/DeletionDisabledException.java
 
b/fluss-common/src/main/java/org/apache/fluss/exception/DeletionDisabledException.java
new file mode 100644
index 000000000..2b69a42ea
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/DeletionDisabledException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/**
+ * Exception thrown when deletion operations are disabled on a table. This 
exception is used when a
+ * table has been configured with delete behavior set to 'disable', indicating 
that deletion
+ * operations are not allowed and should be rejected.
+ *
+ * @see org.apache.fluss.config.ConfigOptions#TABLE_DELETE_BEHAVIOR
+ */
+public class DeletionDisabledException extends ApiException {
+    public DeletionDisabledException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DeletionDisabledException(String message) {
+        this(message, null);
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/DeleteBehavior.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/DeleteBehavior.java
new file mode 100644
index 000000000..23ebf641d
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/DeleteBehavior.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/**
+ * The delete behavior for the primary key table.
+ *
+ * <p>This enum defines how delete operations should be handled for primary 
key tables. It provides
+ * different strategies to control whether deletions are allowed, ignored, or 
explicitly disabled.
+ *
+ * @since 0.8
+ */
+public enum DeleteBehavior {
+
+    /**
+     * Allow normal delete operations. This is the default behavior for 
primary key tables without
+     * merge engines.
+     */
+    ALLOW,
+
+    /**
+     * Silently ignore delete requests without error. Delete operations will 
be dropped at the
+     * server side, and no deletion will be performed. This is the default 
behavior for tables with
+     * FIRST_ROW or VERSIONED merge engines.
+     */
+    IGNORE,
+
+    /**
+     * Reject delete requests with a clear error message. Any attempt to 
perform delete operations
+     * will result in an exception being thrown.
+     */
+    DISABLE;
+
+    /** Creates a {@link DeleteBehavior} from the given string. */
+    public static DeleteBehavior fromString(String behavior) {
+        switch (behavior.toUpperCase()) {
+            case "ALLOW":
+                return ALLOW;
+            case "IGNORE":
+                return IGNORE;
+            case "DISABLE":
+                return DISABLE;
+            default:
+                throw new IllegalArgumentException("Unsupported delete 
behavior: " + behavior);
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index d90b14164..92579e3ac 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -59,6 +59,7 @@ import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DELETE_BEHAVIOR;
 import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
 import static org.apache.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
 import static org.apache.fluss.flink.utils.DataLakeUtils.getDatalakeFormat;
@@ -187,6 +188,7 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
                 
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
                 tableOptions.get(toFlinkOption(TABLE_DATALAKE_FORMAT)),
                 tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE),
+                tableOptions.get(toFlinkOption(TABLE_DELETE_BEHAVIOR)),
                 tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER),
                 getBucketKeys(tableOptions),
                 tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE));
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
index 1303f8faf..4edd586e4 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
@@ -24,6 +24,7 @@ import org.apache.fluss.flink.utils.PushdownUtils;
 import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
 import org.apache.fluss.flink.utils.PushdownUtils.ValueConversion;
 import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.GenericRow;
@@ -74,7 +75,8 @@ public class FlinkTableSink
     private final List<String> partitionKeys;
     private final boolean streaming;
     @Nullable private final MergeEngineType mergeEngineType;
-    private final boolean ignoreDelete;
+    private final boolean sinkIgnoreDelete;
+    private final DeleteBehavior tableDeleteBehavior;
     private final int numBucket;
     private final List<String> bucketKeys;
     private final boolean shuffleByBucketId;
@@ -92,7 +94,8 @@ public class FlinkTableSink
             boolean streaming,
             @Nullable MergeEngineType mergeEngineType,
             @Nullable DataLakeFormat lakeFormat,
-            boolean ignoreDelete,
+            boolean sinkIgnoreDelete,
+            DeleteBehavior tableDeleteBehavior,
             int numBucket,
             List<String> bucketKeys,
             boolean shuffleByBucketId) {
@@ -103,7 +106,8 @@ public class FlinkTableSink
         this.partitionKeys = partitionKeys;
         this.streaming = streaming;
         this.mergeEngineType = mergeEngineType;
-        this.ignoreDelete = ignoreDelete;
+        this.sinkIgnoreDelete = sinkIgnoreDelete;
+        this.tableDeleteBehavior = tableDeleteBehavior;
         this.numBucket = numBucket;
         this.bucketKeys = bucketKeys;
         this.shuffleByBucketId = shuffleByBucketId;
@@ -115,7 +119,7 @@ public class FlinkTableSink
         if (!streaming) {
             return ChangelogMode.insertOnly();
         } else {
-            if (primaryKeyIndexes.length > 0 || ignoreDelete) {
+            if (primaryKeyIndexes.length > 0 || sinkIgnoreDelete) {
                 // primary-key table or ignore_delete mode can accept 
RowKind.DELETE
                 ChangelogMode.Builder builder = ChangelogMode.newBuilder();
                 for (RowKind kind : requestedMode.getContainedKinds()) {
@@ -200,7 +204,7 @@ public class FlinkTableSink
                                 partitionKeys,
                                 lakeFormat,
                                 shuffleByBucketId,
-                                new RowDataSerializationSchema(false, 
ignoreDelete))
+                                new RowDataSerializationSchema(false, 
sinkIgnoreDelete))
                         : new FlinkSink.AppendSinkWriterBuilder<>(
                                 tablePath,
                                 flussConfig,
@@ -210,7 +214,7 @@ public class FlinkTableSink
                                 partitionKeys,
                                 lakeFormat,
                                 shuffleByBucketId,
-                                new RowDataSerializationSchema(true, 
ignoreDelete));
+                                new RowDataSerializationSchema(true, 
sinkIgnoreDelete));
 
         return new FlinkSink<>(flinkSinkWriterBuilder);
     }
@@ -235,7 +239,8 @@ public class FlinkTableSink
                         streaming,
                         mergeEngineType,
                         lakeFormat,
-                        ignoreDelete,
+                        sinkIgnoreDelete,
+                        tableDeleteBehavior,
                         numBucket,
                         bucketKeys,
                         shuffleByBucketId);
@@ -360,6 +365,14 @@ public class FlinkTableSink
                             "Table %s uses the '%s' merge engine which does 
not support DELETE or UPDATE statements.",
                             tablePath, mergeEngineType));
         }
+
+        // Check table-level delete behavior configuration
+        if (tableDeleteBehavior == DeleteBehavior.DISABLE) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Table %s has delete behavior set to 'disable' 
which does not support DELETE statements.",
+                            tablePath));
+        }
     }
 
     private Map<Integer, LogicalType> getPrimaryKeyTypes() {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index c278e1572..e7c7357e3 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.flink.source;
 
+import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.FlinkConnectorOptions;
 import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema;
@@ -30,6 +31,7 @@ import org.apache.fluss.flink.utils.PushdownUtils;
 import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
 import org.apache.fluss.lake.source.LakeSource;
 import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.predicate.GreaterOrEqual;
@@ -74,6 +76,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.LookupFunction;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -206,7 +209,21 @@ public class FlinkTableSource
                 if (mergeEngineType == MergeEngineType.FIRST_ROW) {
                     return ChangelogMode.insertOnly();
                 } else {
-                    return ChangelogMode.all();
+                    // Check delete behavior configuration
+                    Configuration tableConf = 
Configuration.fromMap(tableOptions);
+                    DeleteBehavior deleteBehavior =
+                            tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
+                    if (deleteBehavior == DeleteBehavior.ALLOW) {
+                        return ChangelogMode.all();
+                    } else {
+                        // If delete operations are ignored or disabled, only 
insert and update are
+                        // relevant
+                        return ChangelogMode.newBuilder()
+                                .addContainedKind(RowKind.INSERT)
+                                .addContainedKind(RowKind.UPDATE_BEFORE)
+                                .addContainedKind(RowKind.UPDATE_AFTER)
+                                .build();
+                    }
                 }
             } else {
                 // append only
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index b01a7e4b2..0ea8ed94f 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -1095,4 +1096,131 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
             this.expectedRows = expectedRows;
         }
     }
+
+    @Test
+    void testDeleteBehaviorDisabledForDeleteStmt() {
+        String tableName = "delete_behavior_disable_table";
+        tBatchEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " a int not null,"
+                                + " b bigint null, "
+                                + " c string null, "
+                                + " primary key (a) not enforced"
+                                + ") with ('table.delete.behavior' = 
'disable')",
+                        tableName));
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+        assertThatThrownBy(
+                        () ->
+                                tBatchEnv
+                                        .executeSql("DELETE FROM " + tableName 
+ " WHERE a = 1")
+                                        .await())
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        String.format(
+                                "Table %s has delete behavior set to 'disable' 
which does not support DELETE statements.",
+                                tablePath));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"ignore", "disable", "allow"})
+    void testDeleteBehaviorForInsertStmt(String deleteBehavior) throws 
Exception {
+        String tableName = "delete_behavior_ignore_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " a int not null primary key not enforced,"
+                                + " b string"
+                                + ") with ('table.delete.behavior' = '%s')",
+                        tableName, deleteBehavior));
+
+        // 1. Verify the changelog mode of the table
+        String changelogModePlan =
+                tEnv.explainSql("SELECT * FROM " + tableName, 
ExplainDetail.CHANGELOG_MODE);
+        if (deleteBehavior.equals("allow")) {
+            assertThat(changelogModePlan)
+                    .contains(
+                            "TableSourceScan(table=[[testcatalog, defaultdb, 
delete_behavior_ignore_table]], fields=[a, b], "
+                                    + "changelogMode=[I,UB,UA,D])");
+        } else {
+            // For 'ignore' and 'disable', delete operations are not emitted 
in the changelog
+            assertThat(changelogModePlan)
+                    .contains(
+                            "TableSourceScan(table=[[testcatalog, defaultdb, 
delete_behavior_ignore_table]], fields=[a, b], "
+                                    + "changelogMode=[I,UB,UA])");
+        }
+
+        // 2. Write data including delete operations and verify the final 
table state
+
+        // Insert some data
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s VALUES (1, 'test1'), (2, 
'test2'), (3, 'test3')",
+                                tableName))
+                .await();
+
+        // Create a changelog stream with deletes that should be ignored
+        org.apache.flink.table.api.Table changelogData =
+                tEnv.fromChangelogStream(
+                        env.fromCollection(
+                                Arrays.asList(
+                                        Row.ofKind(RowKind.INSERT, 4, "test4"),
+                                        Row.ofKind(RowKind.DELETE, 1, 
"test1"), // Should be ignored
+                                        Row.ofKind(RowKind.UPDATE_AFTER, 2, 
"updated_test2"))));
+        tEnv.createTemporaryView("changelog_source", changelogData);
+
+        // Disable upsert materialization to avoid generate SinkMaterializer 
operator,
+        // because we want to see the original delete messages in sink
+        tEnv.getConfig().set("table.exec.sink.upsert-materialize", "NONE");
+        String plan =
+                tEnv.explainSql(
+                        String.format("INSERT INTO %s SELECT * FROM 
changelog_source", tableName));
+        assertThat(plan).doesNotContain("upsertMaterialize=[true]");
+
+        // Insert changelog data
+        TableResult tableResult =
+                tEnv.executeSql(
+                        String.format("INSERT INTO %s SELECT * FROM 
changelog_source", tableName));
+
+        // 3. Verify the final table state based on delete behavior
+        if (deleteBehavior.equals("disable")) {
+            // For 'disable', the delete operation is not supported, so we 
expect an exception
+            assertThatThrownBy(tableResult::await)
+                    .hasStackTraceContaining(
+                            "DeletionDisabledException: Delete operations are 
disabled for this table."
+                                    + " The table.delete.behavior is set to 
'disable'.");
+        } else {
+            // For 'ignore', the delete operation is ignored, so we just wait 
for the insert and
+            // update to be applied
+            tableResult.await();
+            CloseableIterator<Row> rowIter =
+                    tEnv.executeSql(String.format("select * from %s", 
tableName)).collect();
+
+            final List<String> expectedRows;
+            if (deleteBehavior.equals("ignore")) {
+                // Row with a=1 should still exist (delete was ignored)
+                expectedRows =
+                        Arrays.asList(
+                                "+I[1, test1]", // Delete was ignored
+                                "+I[2, test2]",
+                                "-U[2, test2]",
+                                "+U[2, updated_test2]",
+                                "+I[3, test3]",
+                                "+I[4, test4]");
+            } else {
+                // For 'allow', the delete operation should be reflected in 
the final state
+                expectedRows =
+                        Arrays.asList(
+                                "+I[1, test1]",
+                                "-D[1, test1]", // a=1 was deleted
+                                "+I[2, test2]",
+                                "-U[2, test2]",
+                                "+U[2, updated_test2]",
+                                "+I[3, test3]",
+                                "+I[4, test4]");
+            }
+            assertResultsIgnoreOrder(rowIter, expectedRows, true);
+        }
+    }
 }
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
index 3b899baee..4b49a5847 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
@@ -25,6 +25,7 @@ import org.apache.fluss.exception.CorruptRecordException;
 import org.apache.fluss.exception.DatabaseAlreadyExistException;
 import org.apache.fluss.exception.DatabaseNotEmptyException;
 import org.apache.fluss.exception.DatabaseNotExistException;
+import org.apache.fluss.exception.DeletionDisabledException;
 import org.apache.fluss.exception.DuplicateSequenceException;
 import org.apache.fluss.exception.FencedLeaderEpochException;
 import org.apache.fluss.exception.FencedTieringEpochException;
@@ -225,7 +226,9 @@ public enum Errors {
             "The new ISR contains at least one ineligible replica.",
             IneligibleReplicaException::new),
     INVALID_ALTER_TABLE_EXCEPTION(
-            56, "The alter table is invalid.", 
InvalidAlterTableException::new);
+            56, "The alter table is invalid.", 
InvalidAlterTableException::new),
+    DELETION_DISABLED_EXCEPTION(
+            57, "Deletion operations are disabled on this table.", 
DeletionDisabledException::new);
 
     private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index cebc937bf..8fe9285bb 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -35,6 +35,8 @@ import 
org.apache.fluss.exception.TableNotPartitionedException;
 import org.apache.fluss.fs.FileSystem;
 import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.DeleteBehavior;
+import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.TableChange;
@@ -404,6 +406,20 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                             ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
         }
 
+        // For tables with first_row or versioned merge engines, automatically 
set to IGNORE if
+        // delete behavior is not set
+        Configuration tableConf = 
Configuration.fromMap(tableDescriptor.getProperties());
+        MergeEngineType mergeEngine =
+                
tableConf.getOptional(ConfigOptions.TABLE_MERGE_ENGINE).orElse(null);
+        if (mergeEngine == MergeEngineType.FIRST_ROW || mergeEngine == 
MergeEngineType.VERSIONED) {
+            if (tableDescriptor.hasPrimaryKey()
+                    && 
!tableConf.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR).isPresent()) {
+                Map<String, String> newProperties = new 
HashMap<>(newDescriptor.getProperties());
+                newProperties.put(
+                        ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), 
DeleteBehavior.IGNORE.name());
+                newDescriptor = newDescriptor.withProperties(newProperties);
+            }
+        }
         return newDescriptor;
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 0437053af..f91d5cc59 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -21,8 +21,10 @@ import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.compression.ArrowCompressionInfo;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.DeletionDisabledException;
 import org.apache.fluss.exception.KvStorageException;
 import org.apache.fluss.memory.MemorySegmentPool;
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.PhysicalTablePath;
@@ -277,9 +279,14 @@ public final class KvTablet {
                             byte[] keyBytes = 
BytesUtils.toArray(kvRecord.getKey());
                             KvPreWriteBuffer.Key key = 
KvPreWriteBuffer.Key.of(keyBytes);
                             if (kvRecord.getRow() == null) {
-                                if (!rowMerger.supportsDelete()) {
+                                DeleteBehavior deleteBehavior = 
rowMerger.deleteBehavior();
+                                if (deleteBehavior == DeleteBehavior.IGNORE) {
                                     // skip delete rows if the merger doesn't 
support yet
                                     continue;
+                                } else if (deleteBehavior == 
DeleteBehavior.DISABLE) {
+                                    throw new DeletionDisabledException(
+                                            "Delete operations are disabled 
for this table. "
+                                                    + "The 
table.delete.behavior is set to 'disable'.");
                                 }
                                 // it's for deletion
                                 byte[] oldValue = getFromBufferOrKv(key);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
index 4cce34d09..049d2859a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.server.kv.rowmerger;
 
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.row.BinaryRow;
@@ -34,10 +35,14 @@ public class DefaultRowMerger implements RowMerger {
     private final PartialUpdaterCache partialUpdaterCache;
     private final KvFormat kvFormat;
     private final Schema schema;
+    private final DeleteBehavior deleteBehavior;
 
-    public DefaultRowMerger(Schema schema, KvFormat kvFormat) {
+    public DefaultRowMerger(
+            Schema schema, KvFormat kvFormat, @Nullable DeleteBehavior 
deleteBehavior) {
         this.schema = schema;
         this.kvFormat = kvFormat;
+        // for compatibility, default to ALLOW if not specified
+        this.deleteBehavior = deleteBehavior != null ? deleteBehavior : 
DeleteBehavior.ALLOW;
         // TODO: share cache in server level when PartialUpdater is thread-safe
         this.partialUpdaterCache = new PartialUpdaterCache();
     }
@@ -57,8 +62,8 @@ public class DefaultRowMerger implements RowMerger {
     }
 
     @Override
-    public boolean supportsDelete() {
-        return true;
+    public DeleteBehavior deleteBehavior() {
+        return deleteBehavior;
     }
 
     @Override
@@ -69,7 +74,7 @@ public class DefaultRowMerger implements RowMerger {
             // this also sanity checks the validity of the partial update
             PartialUpdater partialUpdater =
                     partialUpdaterCache.getOrCreatePartialUpdater(kvFormat, 
schema, targetColumns);
-            return new PartialUpdateRowMerger(partialUpdater);
+            return new PartialUpdateRowMerger(partialUpdater, deleteBehavior);
         }
     }
 
@@ -77,9 +82,12 @@ public class DefaultRowMerger implements RowMerger {
     private static class PartialUpdateRowMerger implements RowMerger {
 
         private final PartialUpdater partialUpdater;
+        private final DeleteBehavior deleteBehavior;
 
-        public PartialUpdateRowMerger(PartialUpdater partialUpdater) {
+        public PartialUpdateRowMerger(
+                PartialUpdater partialUpdater, DeleteBehavior deleteBehavior) {
             this.partialUpdater = partialUpdater;
+            this.deleteBehavior = deleteBehavior;
         }
 
         @Override
@@ -101,8 +109,8 @@ public class DefaultRowMerger implements RowMerger {
         }
 
         @Override
-        public boolean supportsDelete() {
-            return true;
+        public DeleteBehavior deleteBehavior() {
+            return deleteBehavior;
         }
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
index 502552af3..3271f0920 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.server.kv.rowmerger;
 
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.row.BinaryRow;
 
@@ -29,6 +30,17 @@ import javax.annotation.Nullable;
  */
 public class FirstRowRowMerger implements RowMerger {
 
+    private final DeleteBehavior deleteBehavior;
+
+    public FirstRowRowMerger(@Nullable DeleteBehavior deleteBehavior) {
+        if (deleteBehavior == DeleteBehavior.ALLOW) {
+            throw new IllegalArgumentException(
+                    "DELETE is not supported for the first_row merge engine.");
+        }
+        // for compatibility, default to IGNORE if not specified
+        this.deleteBehavior = deleteBehavior != null ? deleteBehavior : 
DeleteBehavior.IGNORE;
+    }
+
     @Nullable
     @Override
     public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
@@ -44,8 +56,8 @@ public class FirstRowRowMerger implements RowMerger {
     }
 
     @Override
-    public boolean supportsDelete() {
-        return false;
+    public DeleteBehavior deleteBehavior() {
+        return deleteBehavior;
     }
 
     @Override
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
index 402ff2974..b942cf181 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.kv.rowmerger;
 
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.Schema;
@@ -44,7 +45,8 @@ public interface RowMerger {
     /**
      * Merge the old row with a delete row.
      *
-     * <p>This method will be invoked only when {@link #supportsDelete()} 
returns true.
+     * <p>This method will be invoked only when {@link #deleteBehavior()} 
returns {@link
+     * DeleteBehavior#ALLOW}.
      *
      * @param oldRow the old row.
      * @return the merged row, or null if the row is deleted.
@@ -53,11 +55,11 @@ public interface RowMerger {
     BinaryRow delete(BinaryRow oldRow);
 
     /**
-     * Whether the merger supports to merge delete rows.
+     * The behavior of delete operations on primary key tables.
      *
-     * @return true if the merger supports delete operation.
+     * @return {@link DeleteBehavior}
      */
-    boolean supportsDelete();
+    DeleteBehavior deleteBehavior();
 
     /** Dynamically configure the target columns to merge and return the 
effective merger. */
     RowMerger configureTargetColumns(@Nullable int[] targetColumns);
@@ -65,10 +67,12 @@ public interface RowMerger {
     /** Create a row merger based on the given configuration. */
     static RowMerger create(TableConfig tableConf, Schema schema, KvFormat 
kvFormat) {
         Optional<MergeEngineType> mergeEngineType = 
tableConf.getMergeEngineType();
+        @Nullable DeleteBehavior deleteBehavior = 
tableConf.getDeleteBehavior().orElse(null);
+
         if (mergeEngineType.isPresent()) {
             switch (mergeEngineType.get()) {
                 case FIRST_ROW:
-                    return new FirstRowRowMerger();
+                    return new FirstRowRowMerger(deleteBehavior);
                 case VERSIONED:
                     Optional<String> versionColumn = 
tableConf.getMergeEngineVersionColumn();
                     if (!versionColumn.isPresent()) {
@@ -77,13 +81,14 @@ public interface RowMerger {
                                         "'%s' must be set for versioned merge 
engine.",
                                         
ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key()));
                     }
-                    return new VersionedRowMerger(schema.getRowType(), 
versionColumn.get());
+                    return new VersionedRowMerger(
+                            schema.getRowType(), versionColumn.get(), 
deleteBehavior);
                 default:
                     throw new IllegalArgumentException(
                             "Unsupported merge engine type: " + 
mergeEngineType.get());
             }
         } else {
-            return new DefaultRowMerger(schema, kvFormat);
+            return new DefaultRowMerger(schema, kvFormat, deleteBehavior);
         }
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
index 91ae0fd09..11901b34b 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.server.kv.rowmerger;
 
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.TimestampLtz;
@@ -46,8 +47,17 @@ public class VersionedRowMerger implements RowMerger {
 
     private final Comparator<BinaryRow> versionComparator;
 
-    public VersionedRowMerger(RowType schema, String versionColumnName) {
+    private final DeleteBehavior deleteBehavior;
+
+    public VersionedRowMerger(
+            RowType schema, String versionColumnName, @Nullable DeleteBehavior 
deleteBehavior) {
         this.versionComparator = createVersionComparator(schema, 
versionColumnName);
+        if (deleteBehavior == DeleteBehavior.ALLOW) {
+            throw new IllegalArgumentException(
+                    "DELETE is not supported for the versioned merge engine.");
+        }
+        // for compatibility, default to IGNORE if not specified
+        this.deleteBehavior = deleteBehavior != null ? deleteBehavior : 
DeleteBehavior.IGNORE;
     }
 
     @Nullable
@@ -65,8 +75,8 @@ public class VersionedRowMerger implements RowMerger {
     }
 
     @Override
-    public boolean supportsDelete() {
-        return false;
+    public DeleteBehavior deleteBehavior() {
+        return deleteBehavior;
     }
 
     @Override
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 76e8adc82..446e8b505 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -26,6 +26,7 @@ import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.InvalidConfigException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.TooManyBucketsException;
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.MergeEngineType;
@@ -100,6 +101,7 @@ public class TableDescriptorValidation {
         checkLogFormat(tableConf, hasPrimaryKey);
         checkArrowCompression(tableConf);
         checkMergeEngine(tableConf, hasPrimaryKey, schema);
+        checkDeleteBehavior(tableConf, hasPrimaryKey);
         checkTieredLog(tableConf);
         checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema);
         checkSystemColumns(schema);
@@ -316,6 +318,30 @@ public class TableDescriptorValidation {
         }
     }
 
+    private static void checkDeleteBehavior(Configuration tableConf, boolean 
hasPrimaryKey) {
+        Optional<DeleteBehavior> deleteBehaviorOptional =
+                tableConf.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
+        if (!hasPrimaryKey && deleteBehaviorOptional.isPresent()) {
+            throw new InvalidConfigException(
+                    "The 'table.delete.behavior' configuration is only 
supported for primary key tables.");
+        }
+
+        // For tables with merge engines, automatically set appropriate delete 
behavior
+        MergeEngineType mergeEngine = 
tableConf.get(ConfigOptions.TABLE_MERGE_ENGINE);
+        if (mergeEngine == MergeEngineType.FIRST_ROW || mergeEngine == 
MergeEngineType.VERSIONED) {
+            // For FIRST_ROW and VERSIONED merge engines, delete operations 
are not supported
+            // If user explicitly sets delete behavior to ALLOW, throw an 
exception
+            if (deleteBehaviorOptional.isPresent()
+                    && deleteBehaviorOptional.get() == DeleteBehavior.ALLOW) {
+                throw new InvalidConfigException(
+                        String.format(
+                                "Table with '%s' merge engine does not support 
delete operations. "
+                                        + "The 'table.delete.behavior' config 
must be set to 'ignore' or 'disable', but got 'allow'.",
+                                mergeEngine));
+            }
+        }
+    }
+
     private static void validateOptionValue(ReadableConfig options, 
ConfigOption<?> option) {
         try {
             options.get(option);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
new file mode 100644
index 000000000..a7eea5fa9
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rowmerger;
+
+import org.apache.fluss.metadata.DeleteBehavior;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DefaultRowMerger} delete behavior functionality. */
+class DefaultRowMergerTest {
+
+    private static final Schema SCHEMA =
+            Schema.newBuilder()
+                    .column("id", DataTypes.INT())
+                    .column("name", DataTypes.STRING())
+                    .primaryKey("id")
+                    .build();
+
+    private static final RowType ROW_TYPE = SCHEMA.getRowType();
+
+    private BinaryRow createBinaryRow(int id, String name) {
+        return compactedRow(ROW_TYPE, new Object[] {id, name});
+    }
+
+    @ParameterizedTest
+    @EnumSource(DeleteBehavior.class)
+    void testDefaultRowMerger(DeleteBehavior deleteBehavior) {
+        DefaultRowMerger merger = new DefaultRowMerger(SCHEMA, 
KvFormat.COMPACTED, deleteBehavior);
+
+        BinaryRow oldRow = createBinaryRow(1, "old");
+        BinaryRow newRow = createBinaryRow(1, "new");
+
+        // Test merge operation - should return new row
+        BinaryRow mergedRow = merger.merge(oldRow, newRow);
+        assertThat(mergedRow).isSameAs(newRow);
+
+        // Test delete operation - should return null (deleted)
+        BinaryRow deletedRow = merger.delete(oldRow);
+        assertThat(deletedRow).isNull();
+
+        // Test supportsDelete - should return true
+        assertThat(merger.deleteBehavior()).isEqualTo(deleteBehavior);
+    }
+
+    @ParameterizedTest
+    @EnumSource(DeleteBehavior.class)
+    void testPartialUpdateRowMergerDeleteBehavior(DeleteBehavior 
deleteBehavior) {
+        DefaultRowMerger merger = new DefaultRowMerger(SCHEMA, 
KvFormat.COMPACTED, deleteBehavior);
+
+        // Configure for partial update (only name column)
+        RowMerger partialMerger = merger.configureTargetColumns(new int[] {0, 
1}); // id + name
+
+        BinaryRow oldRow = createBinaryRow(1, "old");
+
+        BinaryRow ignoredRow = partialMerger.delete(oldRow);
+        assertThat(ignoredRow).isNull();
+        assertThat(partialMerger.deleteBehavior()).isEqualTo(deleteBehavior);
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
index 989af9385..45166fd0b 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.server.kv.rowmerger;
 
+import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.TimestampLtz;
@@ -128,7 +129,7 @@ class VersionedRowMergerTest {
                         .column("b", DataTypes.STRING())
                         .build()
                         .getRowType();
-        VersionedRowMerger merger = new VersionedRowMerger(schema, "a");
+        VersionedRowMerger merger = new VersionedRowMerger(schema, "a", 
DeleteBehavior.DISABLE);
 
         for (TestSpec testSpec : testSpecs) {
             BinaryRow oldRow = compactedRow(schema, new Object[] 
{testSpec.oldValue, "dummy"});
@@ -152,9 +153,9 @@ class VersionedRowMergerTest {
                         .column("b", DataTypes.STRING())
                         .build()
                         .getRowType();
-        VersionedRowMerger merger = new VersionedRowMerger(schema, "a");
+        VersionedRowMerger merger = new VersionedRowMerger(schema, "a", 
DeleteBehavior.DISABLE);
 
-        assertThat(merger.supportsDelete()).isFalse();
+        assertThat(merger.deleteBehavior()).isEqualTo(DeleteBehavior.DISABLE);
         assertThat(merger.configureTargetColumns(null)).isSameAs(merger);
     }
 
diff --git a/website/docs/engine-flink/options.md 
b/website/docs/engine-flink/options.md
index d53a468e5..9c71e7077 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -83,6 +83,7 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d');
 | table.datalake.auto-compaction          | Boolean | false                    
            | If true, compaction will be triggered automatically when tiering 
service writes to the datalake. It is disabled by default.                      
                                                                                
                                                                                
                                                                                
               [...]
 | table.merge-engine                      | Enum     | (None)                  
            | Defines the merge engine for the primary key table. By default, 
primary key table uses the [default merge 
engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md). 
It also supports two merge engines are `first_row` and `versioned`. The 
[first_row merge 
engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep 
the first row of the same primary key. The [v [...]
 | table.merge-engine.versioned.ver-column | String   | (None)                  
            | The column name of the version column for the `versioned` merge 
engine. If the merge engine is set to `versioned`, the version column must be 
set.                                                                            
                                                                                
                                                                                
                  [...]
+| table.delete.behavior                   | Enum     | ALLOW                   
            | Controls the behavior of delete operations on primary key tables. 
Three modes are supported: `ALLOW` (default) - allows normal delete operations; 
`IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects 
delete requests and throws explicit errors. This configuration provides 
system-level guarantees for some downstream pipelines (e.g., Flink Delta Join) 
that must not receive  [...]
 
 ## Read Options
 

Reply via email to