This is an automated email from the ASF dual-hosted git repository.

jerryjing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b83ebdde32 [core] Introduce 'ignore-update-before' to ignore UD only 
(#6235)
b83ebdde32 is described below

commit b83ebdde325b064fd52501863bc8776f483bd5a0
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Sep 11 13:46:27 2025 +0800

    [core] Introduce 'ignore-update-before' to ignore UD only (#6235)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 11 ++++
 .../org/apache/paimon/utils/RowKindFilter.java     | 64 ++++++++++++++++++++++
 .../paimon/table/AppendOnlyFileStoreTable.java     |  3 +-
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  3 +-
 .../apache/paimon/table/sink/TableWriteImpl.java   |  9 +--
 .../paimon/flink/sink/LocalMergeOperator.java      |  9 ++-
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 18 +++++-
 .../paimon/flink/sink/LocalMergeOperatorTest.java  | 47 ++++++++++++++++
 9 files changed, 160 insertions(+), 10 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 4273751caf..2ca6b6c7bc 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -518,6 +518,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to ignore delete records.</td>
         </tr>
+        <tr>
+            <td><h5>ignore-update-before</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to ignore update-before records.</td>
+        </tr>
         <tr>
             <td><h5>incremental-between</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 05ff57a617..e2e141f141 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -489,6 +489,13 @@ public class CoreOptions implements Serializable {
                             "partial-update.ignore-delete")
                     .withDescription("Whether to ignore delete records.");
 
+    @Immutable
+    public static final ConfigOption<Boolean> IGNORE_UPDATE_BEFORE =
+            key("ignore-update-before")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to ignore update-before 
records.");
+
     public static final ConfigOption<SortEngine> SORT_ENGINE =
             key("sort-engine")
                     .enumType(SortEngine.class)
@@ -2288,6 +2295,10 @@ public class CoreOptions implements Serializable {
         return options.get(IGNORE_DELETE);
     }
 
+    public boolean ignoreUpdateBefore() {
+        return options.get(IGNORE_UPDATE_BEFORE);
+    }
+
     public SortEngine sortEngine() {
         return options.get(SORT_ENGINE);
     }
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/utils/RowKindFilter.java 
b/paimon-api/src/main/java/org/apache/paimon/utils/RowKindFilter.java
new file mode 100644
index 0000000000..fb6372ae0c
--- /dev/null
+++ b/paimon-api/src/main/java/org/apache/paimon/utils/RowKindFilter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.types.RowKind;
+
+import javax.annotation.Nullable;
+
+/** A class to filter row kinds. */
+public class RowKindFilter {
+
+    private final boolean ignoreAllRetracts;
+    private final boolean ignoreUpdateBefore;
+
+    public RowKindFilter(boolean ignoreAllRetracts, boolean 
ignoreUpdateBefore) {
+        this.ignoreAllRetracts = ignoreAllRetracts;
+        this.ignoreUpdateBefore = ignoreUpdateBefore;
+    }
+
+    @Nullable
+    public static RowKindFilter of(CoreOptions options) {
+        boolean ignoreAllRetracts = options.ignoreDelete();
+        boolean ignoreUpdateBefore = options.ignoreUpdateBefore();
+        if (!ignoreAllRetracts && !ignoreUpdateBefore) {
+            return null;
+        }
+        return new RowKindFilter(ignoreAllRetracts, ignoreUpdateBefore);
+    }
+
+    public boolean test(RowKind rowKind) {
+        switch (rowKind) {
+            case DELETE:
+                if (ignoreAllRetracts) {
+                    return false;
+                }
+                break;
+            case UPDATE_BEFORE:
+                if (ignoreUpdateBefore || ignoreAllRetracts) {
+                    return false;
+                }
+                break;
+            default:
+                break;
+        }
+        return true;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 367eeb714b..4ac41361ac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -40,6 +40,7 @@ import 
org.apache.paimon.table.source.splitread.DataEvolutionSplitReadProvider;
 import org.apache.paimon.table.source.splitread.SplitReadConfig;
 import org.apache.paimon.table.source.splitread.SplitReadProvider;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RowKindFilter;
 
 import javax.annotation.Nullable;
 
@@ -139,7 +140,7 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
                     return record.row();
                 },
                 rowKindGenerator(),
-                CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
+                RowKindFilter.of(coreOptions()));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index b17f307f8d..62631aaf80 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -38,6 +38,7 @@ import org.apache.paimon.table.source.KeyValueTableRead;
 import org.apache.paimon.table.source.MergeTreeSplitGenerator;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RowKindFilter;
 
 import javax.annotation.Nullable;
 
@@ -170,7 +171,7 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
                                 rowKind,
                                 record.row()),
                 rowKindGenerator(),
-                CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
+                RowKindFilter.of(coreOptions()));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index ac1499c75d..99e30bcccf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -38,6 +38,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Restorable;
+import org.apache.paimon.utils.RowKindFilter;
 
 import javax.annotation.Nullable;
 
@@ -59,7 +60,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
     private final RecordExtractor<T> recordExtractor;
     @Nullable private final RowKindGenerator rowKindGenerator;
-    private final boolean ignoreDelete;
+    @Nullable private final RowKindFilter rowKindFilter;
 
     private boolean batchCommitted = false;
     private BucketMode bucketMode;
@@ -73,13 +74,13 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
             KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
             RecordExtractor<T> recordExtractor,
             @Nullable RowKindGenerator rowKindGenerator,
-            boolean ignoreDelete) {
+            @Nullable RowKindFilter rowKindFilter) {
         this.rowType = rowType;
         this.write = write;
         this.keyAndBucketExtractor = keyAndBucketExtractor;
         this.recordExtractor = recordExtractor;
         this.rowKindGenerator = rowKindGenerator;
-        this.ignoreDelete = ignoreDelete;
+        this.rowKindFilter = rowKindFilter;
 
         List<String> notNullColumnNames =
                 rowType.getFields().stream()
@@ -183,7 +184,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
         checkNullability(row);
         row = wrapDefaultValue(row);
         RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
-        if (ignoreDelete && rowKind.isRetract()) {
+        if (rowKindFilter != null && !rowKindFilter.test(rowKind)) {
             return null;
         }
         SinkRecord record = bucket == -1 ? toSinkRecord(row) : 
toSinkRecord(row, bucket);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index 0702621476..bb48dbf363 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -41,6 +41,7 @@ import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.KeyComparatorSupplier;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RowKindFilter;
 import org.apache.paimon.utils.UserDefinedSeqComparator;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -56,6 +57,8 @@ import 
org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
@@ -70,7 +73,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
     private static final long serialVersionUID = 1L;
 
     private final TableSchema schema;
-    private final boolean ignoreDelete;
+    private final @Nullable RowKindFilter rowKindFilter;
 
     private transient Projection keyProjection;
 
@@ -87,7 +90,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
                 schema.primaryKeys().size() > 0,
                 "LocalMergeOperator currently only support tables with primary 
keys");
         this.schema = schema;
-        this.ignoreDelete = 
CoreOptions.fromMap(schema.options()).ignoreDelete();
+        this.rowKindFilter = 
RowKindFilter.of(CoreOptions.fromMap(schema.options()));
         setup(parameters.getContainingTask(), parameters.getStreamConfig(), 
parameters.getOutput());
     }
 
@@ -170,7 +173,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
         InternalRow row = record.getValue();
 
         RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
-        if (ignoreDelete && rowKind.isRetract()) {
+        if (rowKindFilter != null && !rowKindFilter.test(rowKind)) {
             return;
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 066eeb31ca..13425014f5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -619,7 +619,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
     public void testIgnoreDeleteWithRowKindField() {
         sql(
                 "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
v STRING, kind STRING) "
-                        + "WITH ('merge-engine' = 'deduplicate', 
'ignore-delete' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')");
+                        + "WITH ('ignore-delete' = 'true', 'bucket' = '1', 
'rowkind.field' = 'kind')");
 
         sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')");
         assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "A", "+I"));
@@ -631,6 +631,22 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "B", "+I"));
     }
 
+    @Test
+    public void testIgnoreUpdateBeforeWithRowKindField() {
+        sql(
+                "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
v STRING, kind STRING) "
+                        + "WITH ('ignore-update-before' = 'true', 'bucket' = 
'1', 'rowkind.field' = 'kind')");
+
+        sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')");
+        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "A", "+I"));
+
+        sql("INSERT INTO ignore_delete VALUES (1, 'A', '-U')");
+        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "A", "+I"));
+
+        sql("INSERT INTO ignore_delete VALUES (1, 'A', '-D')");
+        assertThat(sql("SELECT * FROM ignore_delete")).isEmpty();
+    }
+
     @Test
     public void testDeleteWithPkLookup() throws Exception {
         sql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
index 08fe8cb238..eba09e12e5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
@@ -50,10 +50,13 @@ import java.util.Map;
 import java.util.Random;
 import java.util.function.Consumer;
 
+import static org.apache.paimon.CoreOptions.IGNORE_DELETE;
+import static org.apache.paimon.CoreOptions.IGNORE_UPDATE_BEFORE;
 import static org.apache.paimon.CoreOptions.LOCAL_MERGE_BUFFER_SIZE;
 import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
 import static org.apache.paimon.data.BinaryString.fromString;
 import static org.apache.paimon.types.RowKind.DELETE;
+import static org.apache.paimon.types.RowKind.UPDATE_BEFORE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class LocalMergeOperatorTest {
@@ -114,6 +117,50 @@ class LocalMergeOperatorTest {
         result.clear();
     }
 
+    @Test
+    public void testIgnoreUpdateBefore() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(IGNORE_UPDATE_BEFORE.key(), "true");
+        prepareHashOperator(options);
+
+        List<String> result = new ArrayList<>();
+        setOutput(result);
+
+        processElement("a", 1);
+        processElement(UPDATE_BEFORE, "a", 1);
+        operator.prepareSnapshotPreBarrier(0);
+        assertThat(result).containsExactlyInAnyOrder("+I:a->1");
+        result.clear();
+
+        processElement("a", 1);
+        processElement(DELETE, "a", 1);
+        operator.prepareSnapshotPreBarrier(1);
+        assertThat(result).containsExactlyInAnyOrder("-D:a->1");
+        result.clear();
+    }
+
+    @Test
+    public void testIgnoreDelete() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(IGNORE_DELETE.key(), "true");
+        prepareHashOperator(options);
+
+        List<String> result = new ArrayList<>();
+        setOutput(result);
+
+        processElement("a", 1);
+        processElement(UPDATE_BEFORE, "a", 1);
+        operator.prepareSnapshotPreBarrier(0);
+        assertThat(result).containsExactlyInAnyOrder("+I:a->1");
+        result.clear();
+
+        processElement("a", 1);
+        processElement(DELETE, "a", 1);
+        operator.prepareSnapshotPreBarrier(1);
+        assertThat(result).containsExactlyInAnyOrder("+I:a->1");
+        result.clear();
+    }
+
     @Test
     public void testHashSpill() throws Exception {
         Map<String, String> options = new HashMap<>();

Reply via email to