This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a8a0e28b65 [core] Add option to support reading sequence_number in 
AuditLogTable and BinlogTable (#6933)
a8a0e28b65 is described below

commit a8a0e28b650f98268499c9826e9775184abfbf36
Author: bosiew.tian <[email protected]>
AuthorDate: Thu Jan 22 13:55:13 2026 +0800

    [core] Add option to support reading sequence_number in AuditLogTable and 
BinlogTable (#6933)
---
 docs/content/concepts/system-tables.md             |  61 +++++++
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  22 +++
 .../org/apache/paimon/schema/SchemaValidation.java |  13 ++
 .../paimon/table/source/KeyValueTableRead.java     |  16 +-
 .../source/ValueContentRowDataRecordIterator.java  |  20 +++
 .../IncrementalChangelogReadProvider.java          |   2 +-
 .../source/splitread/IncrementalDiffSplitRead.java |   2 +-
 .../splitread/MergeFileSplitReadProvider.java      |   3 +-
 .../apache/paimon/table/system/AuditLogTable.java  | 142 +++++++++------
 .../apache/paimon/table/system/BinlogTable.java    |  36 ++--
 .../paimon/table/system/AuditLogTableTest.java     | 101 ++++++++---
 .../paimon/table/system/BinlogTableTest.java       | 191 +++++++++++++++++++++
 .../paimon/flink/lookup/LookupCompactDiffRead.java |   9 +-
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 122 +++++++++++++
 15 files changed, 653 insertions(+), 93 deletions(-)

diff --git a/docs/content/concepts/system-tables.md 
b/docs/content/concepts/system-tables.md
index fd412f3fa5..0b8feed61c 100644
--- a/docs/content/concepts/system-tables.md
+++ b/docs/content/concepts/system-tables.md
@@ -136,6 +136,49 @@ SELECT * FROM my_table$audit_log;
 */
 ```
 
+For primary key tables, you can enable the 
`table-read.sequence-number.enabled` option to include the `_SEQUENCE_NUMBER` 
field in the output.
+
+{{< tabs "audit-log-sequence-number" >}}
+
+{{< tab "Enable via ALTER TABLE" >}}
+```sql
+ALTER TABLE my_table SET ('table-read.sequence-number.enabled' = 'true');
+```
+{{< /tab >}}
+
+{{< tab "Enable via CREATE TABLE" >}}
+```sql
+CREATE TABLE my_table (
+    ...
+) WITH (
+    'table-read.sequence-number.enabled' = 'true'
+);
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+```sql
+SELECT * FROM my_table$audit_log;
+
+/*
++------------------+--------------------+-----------------+-----------------+
+|     rowkind      | _SEQUENCE_NUMBER   |     column_0    |     column_1    |
++------------------+--------------------+-----------------+-----------------+
+|        +I        |                 0  |      ...        |      ...        |
++------------------+--------------------+-----------------+-----------------+
+|        -U        |                 0  |      ...        |      ...        |
++------------------+--------------------+-----------------+-----------------+
+|        +U        |                 1  |      ...        |      ...        |
++------------------+--------------------+-----------------+-----------------+
+3 rows in set
+*/
+```
+
+{{< hint info >}}
+The `table-read.sequence-number.enabled` option cannot be set via SQL hints.
+{{< /hint >}}
+
 ### Binlog Table
 
 You can query the binlog through binlog table. In the binlog system table, the 
update before and update after will be packed in one row.
@@ -158,6 +201,24 @@ SELECT * FROM T$binlog;
 */
 ```
 
+Similar to the audit_log table, you can also enable 
`table-read.sequence-number.enabled` to include `_SEQUENCE_NUMBER` in the 
binlog table output:
+
+```sql
+SELECT * FROM T$binlog;
+
+/*
++------------------+--------------------+----------------------+-----------------------+
+|     rowkind      | _SEQUENCE_NUMBER   |       column_0       |       
column_1        |
++------------------+--------------------+----------------------+-----------------------+
+|        +I        |                 0  |       [col_0]        |       [col_1] 
        |
++------------------+--------------------+----------------------+-----------------------+
+|        +U        |                 1  | [col_0_ub, col_0_ua] | [col_1_ub, 
col_1_ua]  |
++------------------+--------------------+----------------------+-----------------------+
+|        -D        |                 2  |       [col_0]        |       [col_1] 
        |
++------------------+--------------------+----------------------+-----------------------+
+*/
+```
+
 ### Read-optimized Table
 
 If you require extreme reading performance and can accept reading slightly old 
data,
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 12d45a6a00..724e04cd5c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -152,6 +152,12 @@ under the License.
             <td>String</td>
             <td>Fields that are ignored for comparison while generating -U, +U 
changelog for the same record. This configuration is only valid for the 
changelog-producer.row-deduplicate is true.</td>
         </tr>
+        <tr>
+            <td><h5>table-read.sequence-number.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to include the _SEQUENCE_NUMBER field when reading the 
audit_log or binlog system tables. This is only valid for primary key 
tables.</td>
+        </tr>
         <tr>
             <td><h5>changelog.num-retained.max</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 5033eb2fa5..1b8170a9e2 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -879,6 +879,24 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Fields that are ignored for comparison while 
generating -U, +U changelog for the same record. This configuration is only 
valid for the changelog-producer.row-deduplicate is true.");
 
+    public static final ConfigOption<Boolean> 
TABLE_READ_SEQUENCE_NUMBER_ENABLED =
+            key("table-read.sequence-number.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to include the _SEQUENCE_NUMBER field 
when reading the audit_log or binlog "
+                                    + "system tables. This is only valid for 
primary key tables.");
+
+    @ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<Boolean> 
KEY_VALUE_SEQUENCE_NUMBER_ENABLED =
+            key("key-value.sequence_number.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to include the _SEQUENCE_NUMBER field 
when reading key-value data. "
+                                    + "This is an internal option used by 
AuditLogTable and BinlogTable "
+                                    + "when table.read_sequence_number_enabled 
is set to true.");
+
     public static final ConfigOption<String> SEQUENCE_FIELD =
             key("sequence.field")
                     .stringType()
@@ -2810,6 +2828,10 @@ public class CoreOptions implements Serializable {
                 .orElse(Collections.emptyList());
     }
 
+    public boolean tableReadSequenceNumberEnabled() {
+        return options.get(TABLE_READ_SEQUENCE_NUMBER_ENABLED);
+    }
+
     public boolean scanPlanSortPartition() {
         return options.get(SCAN_PLAN_SORT_PARTITION);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index cceae0567e..062f71f057 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -239,6 +239,8 @@ public class SchemaValidation {
         validateIncrementalClustering(schema, options);
 
         validateChainTable(schema, options);
+
+        validateChangelogReadSequenceNumber(schema, options);
     }
 
     public static void validateFallbackBranch(SchemaManager schemaManager, 
TableSchema schema) {
@@ -672,4 +674,15 @@ public class SchemaValidation {
                     "Partition timestamp formatter is required for chain 
table.");
         }
     }
+
+    private static void validateChangelogReadSequenceNumber(
+            TableSchema schema, CoreOptions options) {
+        if (options.tableReadSequenceNumberEnabled()) {
+            checkArgument(
+                    !schema.primaryKeys().isEmpty(),
+                    "Cannot enable '%s' for non-primary-key table. "
+                            + "Sequence number is only available for primary 
key tables.",
+                    CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key());
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index 3b257e6f3a..fda7d70ffd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.InternalRow;
@@ -42,6 +43,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Supplier;
 
 /**
@@ -150,14 +152,24 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
         throw new RuntimeException("Should not happen.");
     }
 
-    public static RecordReader<InternalRow> unwrap(RecordReader<KeyValue> 
reader) {
+    public static RecordReader<InternalRow> unwrap(
+            RecordReader<KeyValue> reader, Map<String, String> schemaOptions) {
         return new RecordReader<InternalRow>() {
 
             @Nullable
             @Override
             public RecordIterator<InternalRow> readBatch() throws IOException {
+                boolean keyValueSequenceNumberEnabled =
+                        Boolean.parseBoolean(
+                                schemaOptions.getOrDefault(
+                                        
CoreOptions.KEY_VALUE_SEQUENCE_NUMBER_ENABLED.key(),
+                                        "false"));
+
                 RecordIterator<KeyValue> batch = reader.readBatch();
-                return batch == null ? null : new 
ValueContentRowDataRecordIterator(batch);
+                return batch == null
+                        ? null
+                        : new ValueContentRowDataRecordIterator(
+                                batch, keyValueSequenceNumberEnabled);
             }
 
             @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
index adcf504a5d..7c892b23c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
@@ -19,7 +19,9 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.reader.RecordReader;
 
 import java.io.IOException;
@@ -27,8 +29,17 @@ import java.io.IOException;
 /** A {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to its 
value. */
 public class ValueContentRowDataRecordIterator extends 
ResetRowKindRecordIterator {
 
+    private final boolean keyValueSequenceNumberEnabled;
+
     public 
ValueContentRowDataRecordIterator(RecordReader.RecordIterator<KeyValue> 
kvIterator) {
+        this(kvIterator, false);
+    }
+
+    public ValueContentRowDataRecordIterator(
+            RecordReader.RecordIterator<KeyValue> kvIterator,
+            boolean keyValueSequenceNumberEnabled) {
         super(kvIterator);
+        this.keyValueSequenceNumberEnabled = keyValueSequenceNumberEnabled;
     }
 
     @Override
@@ -40,6 +51,15 @@ public class ValueContentRowDataRecordIterator extends 
ResetRowKindRecordIterato
 
         InternalRow rowData = kv.value();
         rowData.setRowKind(kv.valueKind());
+
+        if (keyValueSequenceNumberEnabled) {
+            JoinedRow joinedRow = new JoinedRow();
+            GenericRow systemFieldsRow = new GenericRow(1);
+            systemFieldsRow.setField(0, kv.sequenceNumber());
+            joinedRow.replace(systemFieldsRow, rowData);
+            joinedRow.setRowKind(kv.valueKind());
+            return joinedRow;
+        }
         return rowData;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
index d7d9866a9c..56646dab84 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
@@ -73,7 +73,7 @@ public class IncrementalChangelogReadProvider implements 
SplitReadProvider {
                                                     
incrementalSplit.afterFiles(),
                                                     
incrementalSplit.afterDeletionFiles(),
                                                     false));
-                    return unwrap(reader);
+                    return unwrap(reader, read.tableSchema().options());
                 };
 
         return SplitRead.convert(read, convertedFactory);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index d00a8a4fd9..1e93f148b7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -110,7 +110,7 @@ public class IncrementalDiffSplitRead implements 
SplitRead<InternalRow> {
                     ProjectedRow.from(readType, 
mergeRead.tableSchema().logicalRowType());
             reader = reader.transform(kv -> 
kv.replaceValue(projectedRow.replaceRow(kv.value())));
         }
-        return KeyValueTableRead.unwrap(reader);
+        return KeyValueTableRead.unwrap(reader, 
mergeRead.tableSchema().options());
     }
 
     private static RecordReader<KeyValue> readDiff(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
index fb3b850812..2e6c127c7f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
@@ -49,7 +49,8 @@ public class MergeFileSplitReadProvider implements 
SplitReadProvider {
 
     private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> 
supplier) {
         final MergeFileSplitRead read = 
supplier.get().withReadKeyType(RowType.of());
-        return SplitRead.convert(read, split -> 
unwrap(read.createReader(split)));
+        return SplitRead.convert(
+                read, split -> unwrap(read.createReader(split), 
read.tableSchema().options()));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 001e1543ee..d613d9455f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -82,6 +82,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED;
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
 
 /** A {@link Table} for reading audit log of table. */
@@ -89,29 +90,43 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
 
     public static final String AUDIT_LOG = "audit_log";
 
-    public static final PredicateReplaceVisitor PREDICATE_CONVERTER =
-            p -> {
-                Optional<FieldRef> fieldRefOptional = p.fieldRefOptional();
-                if (!fieldRefOptional.isPresent()) {
-                    return Optional.empty();
-                }
-                FieldRef fieldRef = fieldRefOptional.get();
-                if (fieldRef.index() == 0) {
-                    return Optional.empty();
-                }
-                return Optional.of(
-                        new LeafPredicate(
-                                p.function(),
-                                fieldRef.type(),
-                                fieldRef.index() - 1,
-                                fieldRef.name(),
-                                p.literals()));
-            };
+    protected final FileStoreTable wrapped;
 
-    private final FileStoreTable wrapped;
+    protected final List<DataField> specialFields;
 
     public AuditLogTable(FileStoreTable wrapped) {
         this.wrapped = wrapped;
+        this.specialFields = new ArrayList<>();
+        specialFields.add(SpecialFields.ROW_KIND);
+
+        boolean includeSequenceNumber =
+                
CoreOptions.fromMap(wrapped.options()).tableReadSequenceNumberEnabled();
+
+        if (includeSequenceNumber) {
+            
this.wrapped.options().put(CoreOptions.KEY_VALUE_SEQUENCE_NUMBER_ENABLED.key(), 
"true");
+            specialFields.add(SpecialFields.SEQUENCE_NUMBER);
+        }
+    }
+
+    /** Creates a PredicateReplaceVisitor that adjusts field indices by 
systemFieldCount. */
+    private PredicateReplaceVisitor createPredicateConverter() {
+        return p -> {
+            Optional<FieldRef> fieldRefOptional = p.fieldRefOptional();
+            if (!fieldRefOptional.isPresent()) {
+                return Optional.empty();
+            }
+            FieldRef fieldRef = fieldRefOptional.get();
+            if (fieldRef.index() < specialFields.size()) {
+                return Optional.empty();
+            }
+            return Optional.of(
+                    new LeafPredicate(
+                            p.function(),
+                            fieldRef.type(),
+                            fieldRef.index() - specialFields.size(),
+                            fieldRef.name(),
+                            p.literals()));
+        };
     }
 
     @Override
@@ -146,8 +161,7 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
 
     @Override
     public RowType rowType() {
-        List<DataField> fields = new ArrayList<>();
-        fields.add(SpecialFields.ROW_KIND);
+        List<DataField> fields = new ArrayList<>(specialFields);
         fields.addAll(wrapped.rowType().getFields());
         return new RowType(fields);
     }
@@ -234,6 +248,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
+        if (Boolean.parseBoolean(
+                
dynamicOptions.getOrDefault(TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), 
"false"))) {
+            throw new UnsupportedOperationException(
+                    "table-read.sequence-number.enabled is not supported by 
hint.");
+        }
         return new AuditLogTable(wrapped.copy(dynamicOptions));
     }
 
@@ -244,9 +263,10 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
 
     /** Push down predicate to dataScan and dataRead. */
     private Optional<Predicate> convert(Predicate predicate) {
+        PredicateReplaceVisitor converter = createPredicateConverter();
         List<Predicate> result =
                 PredicateBuilder.splitAnd(predicate).stream()
-                        .map(p -> p.visit(PREDICATE_CONVERTER))
+                        .map(p -> p.visit(converter))
                         .filter(Optional::isPresent)
                         .map(Optional::get)
                         .collect(Collectors.toList());
@@ -636,6 +656,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
 
     class AuditLogRead implements InnerTableRead {
 
+        // Special index for rowkind field
+        protected static final int ROW_KIND_INDEX = -1;
+        // _SEQUENCE_NUMBER is at index 0 by setting: 
KEY_VALUE_SEQUENCE_NUMBER_ENABLED
+        protected static final int SEQUENCE_NUMBER_INDEX = 0;
+
         protected final InnerTableRead dataRead;
 
         protected int[] readProjection;
@@ -645,52 +670,61 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             this.readProjection = defaultProjection();
         }
 
-        /** Default projection, just add row kind to the first. */
+        /** Default projection, add system fields (rowkind, and optionally 
_SEQUENCE_NUMBER). */
         private int[] defaultProjection() {
             int dataFieldCount = wrapped.rowType().getFieldCount();
-            int[] projection = new int[dataFieldCount + 1];
-            projection[0] = -1;
+            int[] projection = new int[dataFieldCount + specialFields.size()];
+            projection[0] = ROW_KIND_INDEX;
+            if (specialFields.contains(SpecialFields.SEQUENCE_NUMBER)) {
+                projection[1] = SEQUENCE_NUMBER_INDEX;
+            }
             for (int i = 0; i < dataFieldCount; i++) {
-                projection[i + 1] = i;
+                projection[specialFields.size() + i] = i + 
specialFields.size() - 1;
             }
             return projection;
         }
 
-        @Override
-        public InnerTableRead withFilter(Predicate predicate) {
-            convert(predicate).ifPresent(dataRead::withFilter);
-            return this;
-        }
-
-        @Override
-        public InnerTableRead withReadType(RowType readType) {
-            // data projection to push down to dataRead
-            List<DataField> dataReadFields = new ArrayList<>();
-
-            // read projection to handle record returned by dataRead
+        /** Build projection array from readType. */
+        private int[] buildProjection(RowType readType) {
             List<DataField> fields = readType.getFields();
-            int[] readProjection = new int[fields.size()];
+            int[] projection = new int[fields.size()];
+            int dataFieldIndex = 0;
 
-            boolean rowKindAppeared = false;
             for (int i = 0; i < fields.size(); i++) {
                 String fieldName = fields.get(i).name();
                 if (fieldName.equals(SpecialFields.ROW_KIND.name())) {
-                    rowKindAppeared = true;
-                    readProjection[i] = -1;
+                    projection[i] = ROW_KIND_INDEX;
+                } else if 
(fieldName.equals(SpecialFields.SEQUENCE_NUMBER.name())) {
+                    projection[i] = SEQUENCE_NUMBER_INDEX;
                 } else {
-                    dataReadFields.add(fields.get(i));
-                    // There is no row kind field. Keep it as it is
-                    // Row kind field has occurred, and the following fields 
are offset by 1
-                    // position
-                    readProjection[i] = rowKindAppeared ? i - 1 : i;
+                    projection[i] = dataFieldIndex + specialFields.size() - 1;
+                    dataFieldIndex++;
                 }
             }
+            return projection;
+        }
 
-            this.readProjection = readProjection;
-            dataRead.withReadType(new RowType(readType.isNullable(), 
dataReadFields));
+        @Override
+        public InnerTableRead withFilter(Predicate predicate) {
+            convert(predicate).ifPresent(dataRead::withFilter);
             return this;
         }
 
+        @Override
+        public InnerTableRead withReadType(RowType readType) {
+            this.readProjection = buildProjection(readType);
+            List<DataField> dataFields = extractDataFields(readType);
+            dataRead.withReadType(new RowType(readType.isNullable(), 
dataFields));
+            return this;
+        }
+
+        /** Extract data fields (non-system fields) from readType. */
+        private List<DataField> extractDataFields(RowType readType) {
+            return readType.getFields().stream()
+                    .filter(f -> !SpecialFields.isSystemField(f.name()))
+                    .collect(Collectors.toList());
+        }
+
         @Override
         public TableRead withIOManager(IOManager ioManager) {
             this.dataRead.withIOManager(ioManager);
@@ -707,7 +741,10 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
     }
 
-    /** A {@link ProjectedRow} which returns row kind when mapping index is 
negative. */
+    /**
+     * A {@link ProjectedRow} which returns row kind and sequence number when 
mapping index is
+     * negative.
+     */
     static class AuditLogRow extends ProjectedRow {
 
         AuditLogRow(int[] indexMapping, InternalRow row) {
@@ -729,7 +766,7 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         @Override
         public boolean isNullAt(int pos) {
             if (indexMapping[pos] < 0) {
-                // row kind is always not null
+                // row kind and sequence num are always not null
                 return false;
             }
             return super.isNullAt(pos);
@@ -737,7 +774,8 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
 
         @Override
         public BinaryString getString(int pos) {
-            if (indexMapping[pos] < 0) {
+            int index = indexMapping[pos];
+            if (index == AuditLogRead.ROW_KIND_INDEX) {
                 return BinaryString.fromString(row.getRowKind().shortString());
             }
             return super.getString(pos);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
index cc8d1621a3..5c1268f963 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.IntStream;
 
+import static org.apache.paimon.CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED;
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
 
 /**
@@ -56,11 +57,8 @@ public class BinlogTable extends AuditLogTable {
 
     public static final String BINLOG = "binlog";
 
-    private final FileStoreTable wrapped;
-
     public BinlogTable(FileStoreTable wrapped) {
         super(wrapped);
-        this.wrapped = wrapped;
     }
 
     @Override
@@ -70,8 +68,7 @@ public class BinlogTable extends AuditLogTable {
 
     @Override
     public RowType rowType() {
-        List<DataField> fields = new ArrayList<>();
-        fields.add(SpecialFields.ROW_KIND);
+        List<DataField> fields = new ArrayList<>(specialFields);
         for (DataField field : wrapped.rowType().getFields()) {
             // convert to nullable
             fields.add(field.newType(new ArrayType(field.type().nullable())));
@@ -86,6 +83,11 @@ public class BinlogTable extends AuditLogTable {
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
+        if (Boolean.parseBoolean(
+                
dynamicOptions.getOrDefault(TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), 
"false"))) {
+            throw new UnsupportedOperationException(
+                    "table-read.sequence-number.enabled is not supported by 
hint.");
+        }
         return new BinlogTable(wrapped.copy(dynamicOptions));
     }
 
@@ -102,7 +104,7 @@ public class BinlogTable extends AuditLogTable {
             List<DataField> fields = new ArrayList<>();
             List<DataField> wrappedReadFields = new ArrayList<>();
             for (DataField field : readType.getFields()) {
-                if (field.name().equals(SpecialFields.ROW_KIND.name())) {
+                if (SpecialFields.isSystemField(field.name())) {
                     fields.add(field);
                 } else {
                     DataField origin = field.newType(((ArrayType) 
field.type()).getElementType());
@@ -117,12 +119,16 @@ public class BinlogTable extends AuditLogTable {
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             DataSplit dataSplit = (DataSplit) split;
+            // When sequence number is enabled, the underlying data layout is:
+            // [_SEQUENCE_NUMBER, pk, pt, col1, ...]
+            // We need to offset the field index to skip the sequence number 
field.
+            int offset = specialFields.size() - 1;
             InternalRow.FieldGetter[] fieldGetters =
                     IntStream.range(0, wrappedReadType.getFieldCount())
                             .mapToObj(
                                     i ->
                                             InternalRow.createFieldGetter(
-                                                    
wrappedReadType.getTypeAt(i), i))
+                                                    
wrappedReadType.getTypeAt(i), i + offset))
                             .toArray(InternalRow.FieldGetter[]::new);
 
             if (dataSplit.isStreaming()) {
@@ -146,15 +152,23 @@ public class BinlogTable extends AuditLogTable {
                 InternalRow row1,
                 @Nullable InternalRow row2,
                 InternalRow.FieldGetter[] fieldGetters) {
-            GenericRow row = new GenericRow(row1.getFieldCount());
-            for (int i = 0; i < row1.getFieldCount(); i++) {
+            // seqOffset is 1 if sequence number is enabled, 0 otherwise
+            int seqOffset = specialFields.size() - 1;
+            GenericRow row = new GenericRow(fieldGetters.length + seqOffset);
+
+            // Copy sequence number if enabled (it's at index 0 in input row)
+            if (seqOffset > 0) {
+                row.setField(0, row1.getLong(0));
+            }
+
+            for (int i = 0; i < fieldGetters.length; i++) {
                 Object o1 = fieldGetters[i].getFieldOrNull(row1);
                 Object o2;
                 if (row2 != null) {
                     o2 = fieldGetters[i].getFieldOrNull(row2);
-                    row.setField(i, new GenericArray(new Object[] {o1, o2}));
+                    row.setField(i + seqOffset, new GenericArray(new Object[] 
{o1, o2}));
                 } else {
-                    row.setField(i, new GenericArray(new Object[] {o1}));
+                    row.setField(i + seqOffset, new GenericArray(new Object[] 
{o1}));
                 }
             }
             // If no row2 provided, then follow the row1 kind.
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
index 41facd1d83..cbba947bcb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
@@ -36,7 +37,6 @@ import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
@@ -48,15 +48,62 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Unit tests for {@link AuditLogTable}. */
 public class AuditLogTableTest extends TableTestBase {
 
-    private static final String tableName = "MyTable";
-    private AuditLogTable auditLogTable;
+    @Test
+    public void testReadAuditLogFromLatest() throws Exception {
+        AuditLogTable auditLogTable = createAuditLogTable("audit_table", 
false);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+        List<InternalRow> expectRow = getExpectedResult();
+        List<InternalRow> result = read(auditLogTable);
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithTableOption() throws Exception {
+        AuditLogTable auditLogTable = 
createAuditLogTable("audit_table_with_seq", true);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(auditLogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithAlterTable() throws Exception {
+        String tableName = "audit_table_alter_seq";
+        // Create table without sequence-number option
+        AuditLogTable auditLogTable = createAuditLogTable(tableName, false);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+
+        // Add sequence-number option via alterTable
+        catalog.alterTable(
+                identifier(tableName),
+                SchemaChange.setOption(
+                        CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), 
"true"),
+                false);
 
-    @BeforeEach
-    public void before() throws Exception {
+        // Re-fetch the audit_log table to get updated schema
+        Identifier auditLogTableId =
+                identifier(tableName + SYSTEM_TABLE_SPLITTER + 
AuditLogTable.AUDIT_LOG);
+        AuditLogTable updatedAuditLogTable = (AuditLogTable) 
catalog.getTable(auditLogTableId);
+
+        // Verify schema now includes _SEQUENCE_NUMBER
+        assertThat(updatedAuditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(updatedAuditLogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    private AuditLogTable createAuditLogTable(String tableName, boolean 
enableSequenceNumber)
+            throws Exception {
         Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, tableName));
         FileIO fileIO = LocalFileIO.create();
 
-        Schema schema =
+        Schema.Builder schemaBuilder =
                 Schema.newBuilder()
                         .column("pk", DataTypes.INT())
                         .column("pt", DataTypes.INT())
@@ -64,44 +111,54 @@ public class AuditLogTableTest extends TableTestBase {
                         .partitionKeys("pt")
                         .primaryKey("pk", "pt")
                         .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
-                        .option("bucket", "1")
-                        .build();
+                        .option("bucket", "1");
+        if (enableSequenceNumber) {
+            
schemaBuilder.option(CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), 
"true");
+        }
 
         TableSchema tableSchema =
-                SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), 
schema);
+                SchemaUtils.forceCommit(
+                        new SchemaManager(fileIO, tablePath), 
schemaBuilder.build());
         FileStoreTable table =
                 FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
-        Identifier filesTableId =
+
+        writeTestData(table);
+
+        Identifier auditLogTableId =
                 identifier(tableName + SYSTEM_TABLE_SPLITTER + 
AuditLogTable.AUDIT_LOG);
-        auditLogTable = (AuditLogTable) catalog.getTable(filesTableId);
+        return (AuditLogTable) catalog.getTable(auditLogTableId);
+    }
 
+    private void writeTestData(FileStoreTable table) throws Exception {
         write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
         write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
         write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
         write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
-        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6));
+        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));
         write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1));
     }
 
-    @Test
-    public void testReadAuditLogFromLatest() throws Exception {
-        List<InternalRow> expectRow = getExpectedResult();
-        List<InternalRow> result = read(auditLogTable);
-        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
-    }
-
     private List<InternalRow> getExpectedResult() {
         List<InternalRow> expectedRow = new ArrayList<>();
         expectedRow.add(
                 
GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1, 1, 1));
         expectedRow.add(
                 GenericRow.of(
-                        
BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()), 1, 2, 5));
+                        
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 2, 6));
+        expectedRow.add(
+                
GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 2, 3, 1));
+        return expectedRow;
+    }
+
+    private List<InternalRow> getExpectedResultWithSequenceNumber() {
+        List<InternalRow> expectedRow = new ArrayList<>();
+        expectedRow.add(
+                
GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1L, 1, 1, 
1));
         expectedRow.add(
                 GenericRow.of(
-                        
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 4, 6));
+                        
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 2L, 1, 2, 6));
         expectedRow.add(
-                
GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 2, 3, 1));
+                
GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 0L, 2, 3, 
1));
         return expectedRow;
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java
new file mode 100644
index 0000000000..6f187fbffd
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link BinlogTable}. */
+public class BinlogTableTest extends TableTestBase {
+
+    @Test
+    public void testReadBinlogFromLatest() throws Exception {
+        BinlogTable binlogTable = createBinlogTable("binlog_table", false);
+        assertThat(binlogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+
+        List<InternalRow> result = read(binlogTable);
+        List<InternalRow> expectRow = getExpectedResult();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithTableOption() throws Exception {
+        BinlogTable binlogTable = createBinlogTable("binlog_table_with_seq", 
true);
+        assertThat(binlogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(binlogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithAlterTable() throws Exception {
+        String tableName = "binlog_table_alter_seq";
+        // Create table without sequence-number option
+        BinlogTable binlogTable = createBinlogTable(tableName, false);
+        assertThat(binlogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+
+        // Add sequence-number option via alterTable
+        catalog.alterTable(
+                identifier(tableName),
+                SchemaChange.setOption(
+                        CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), 
"true"),
+                false);
+
+        // Re-fetch the binlog table to get updated schema
+        Identifier binlogTableId =
+                identifier(tableName + SYSTEM_TABLE_SPLITTER + 
BinlogTable.BINLOG);
+        BinlogTable updatedBinlogTable = (BinlogTable) 
catalog.getTable(binlogTableId);
+
+        // Verify schema now includes _SEQUENCE_NUMBER
+        assertThat(updatedBinlogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(updatedBinlogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    private BinlogTable createBinlogTable(String tableName, boolean 
enableSequenceNumber)
+            throws Exception {
+        Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, tableName));
+        FileIO fileIO = LocalFileIO.create();
+
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("pk", DataTypes.INT())
+                        .column("pt", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+                        .option("bucket", "1");
+        if (enableSequenceNumber) {
+            
schemaBuilder.option(CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), 
"true");
+        }
+
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(fileIO, tablePath), 
schemaBuilder.build());
+        FileStoreTable table =
+                FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
+
+        writeTestData(table);
+
+        Identifier binlogTableId =
+                identifier(tableName + SYSTEM_TABLE_SPLITTER + 
BinlogTable.BINLOG);
+        return (BinlogTable) catalog.getTable(binlogTableId);
+    }
+
+    private void writeTestData(FileStoreTable table) throws Exception {
+        write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
+        write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
+        write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
+        write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
+        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));
+        write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1));
+    }
+
+    private List<InternalRow> getExpectedResult() {
+        List<InternalRow> expectedRow = new ArrayList<>();
+        expectedRow.add(
+                GenericRow.of(
+                        BinaryString.fromString(RowKind.DELETE.shortString()),
+                        new GenericArray(new Object[] {1}),
+                        new GenericArray(new Object[] {1}),
+                        new GenericArray(new Object[] {1})));
+        expectedRow.add(
+                GenericRow.of(
+                        
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()),
+                        new GenericArray(new Object[] {1}),
+                        new GenericArray(new Object[] {2}),
+                        new GenericArray(new Object[] {6})));
+        expectedRow.add(
+                GenericRow.of(
+                        BinaryString.fromString(RowKind.INSERT.shortString()),
+                        new GenericArray(new Object[] {2}),
+                        new GenericArray(new Object[] {3}),
+                        new GenericArray(new Object[] {1})));
+        return expectedRow;
+    }
+
+    private List<InternalRow> getExpectedResultWithSequenceNumber() {
+        List<InternalRow> expectedRow = new ArrayList<>();
+        expectedRow.add(
+                GenericRow.of(
+                        BinaryString.fromString(RowKind.DELETE.shortString()),
+                        1L,
+                        new GenericArray(new Object[] {1}),
+                        new GenericArray(new Object[] {1}),
+                        new GenericArray(new Object[] {1})));
+        expectedRow.add(
+                GenericRow.of(
+                        
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()),
+                        2L,
+                        new GenericArray(new Object[] {1}),
+                        new GenericArray(new Object[] {2}),
+                        new GenericArray(new Object[] {6})));
+        expectedRow.add(
+                GenericRow.of(
+                        BinaryString.fromString(RowKind.INSERT.shortString()),
+                        0L,
+                        new GenericArray(new Object[] {2}),
+                        new GenericArray(new Object[] {3}),
+                        new GenericArray(new Object[] {1})));
+        return expectedRow;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index dfdd9dd207..e4870de583 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -28,14 +28,13 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.AbstractDataTableRead;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.KeyValueTableRead;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.RowType;
 
 import java.io.IOException;
 
-import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
-
 /** An {@link InnerTableRead} that reads the data changed before and after 
compaction. */
 public class LookupCompactDiffRead extends AbstractDataTableRead {
 
@@ -46,7 +45,11 @@ public class LookupCompactDiffRead extends 
AbstractDataTableRead {
         super(schema);
         this.incrementalDiffRead = new 
IncrementalCompactDiffSplitRead(mergeRead);
         this.fullPhaseMergeRead =
-                SplitRead.convert(mergeRead, split -> 
unwrap(mergeRead.createReader(split)));
+                SplitRead.convert(
+                        mergeRead,
+                        split ->
+                                KeyValueTableRead.unwrap(
+                                        mergeRead.createReader(split), 
schema.options()));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 22271c8a7e..48fcbbda71 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -56,6 +56,7 @@ import static java.util.Collections.singletonList;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** ITCase for batch file store. */
 public class BatchFileStoreITCase extends CatalogITCaseBase {
@@ -1053,6 +1054,127 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
                 .containsExactly(Row.of("+I", new String[] {"A"}));
     }
 
+    @Test
+    public void testAuditLogTableWithSequenceNumberEnabled() {
+        // Creating an append-only table (no primary key) with
+        // table-read.sequence-number.enabled option should throw an exception
+        assertThrows(
+                RuntimeException.class,
+                () ->
+                        sql(
+                                "CREATE TABLE test_table_err (a int, b int, c 
AS a + b) "
+                                        + "WITH 
('table-read.sequence-number.enabled'='true')"));
+
+        // Selecting an auditlog table with
+        // table-read.sequence-number.enabled option should throw an exception
+        sql("CREATE TABLE test_table_err2 (a int PRIMARY KEY NOT ENFORCED, b 
int, c AS a + b);");
+        assertThrows(
+                RuntimeException.class,
+                () ->
+                        sql(
+                                "SELECT * FROM `test_table_err2$audit_log`"
+                                        + " /*+ 
OPTIONS('table-read.sequence-number.enabled' = 'true') */"));
+
+        // Create primary key table with table-read.sequence-number.enabled 
option
+        sql(
+                "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, 
b int, c AS a + b) "
+                        + "WITH 
('table-read.sequence-number.enabled'='true');");
+        sql("INSERT INTO test_table_seq VALUES (1, 2)");
+        sql("INSERT INTO test_table_seq VALUES (3, 4)");
+
+        // Test SELECT * from original table
+        assertThat(sql("SELECT * FROM `test_table_seq`"))
+                .containsExactlyInAnyOrder(Row.of(1, 2, 3), Row.of(3, 4, 7));
+
+        // Test SELECT * includes _SEQUENCE_NUMBER
+        assertThat(sql("SELECT * FROM `test_table_seq$audit_log`"))
+                .containsExactlyInAnyOrder(Row.of("+I", 0L, 1, 2, 3), 
Row.of("+I", 1L, 3, 4, 7));
+
+        // Test out-of-order select with _SEQUENCE_NUMBER
+        assertThat(sql("SELECT b, c, _SEQUENCE_NUMBER FROM 
`test_table_seq$audit_log`"))
+                .containsExactlyInAnyOrder(Row.of(2, 3, 0L), Row.of(4, 7, 1L));
+
+        // Test selecting only _SEQUENCE_NUMBER, rowkind
+        assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM 
`test_table_seq$audit_log`"))
+                .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+    }
+
+    @Test
+    public void testAuditLogTableWithSequenceNumberAlterTable() {
+        // Create primary key table without sequence-number option
+        sql("CREATE TABLE test_table_dyn (a int PRIMARY KEY NOT ENFORCED, b 
int, c AS a + b);");
+        sql("INSERT INTO test_table_dyn VALUES (1, 2)");
+        sql("INSERT INTO test_table_dyn VALUES (3, 4)");
+
+        // Add table-read.sequence-number.enabled option via ALTER TABLE
+        sql("ALTER TABLE test_table_dyn SET 
('table-read.sequence-number.enabled'='true')");
+
+        // Test SELECT * includes _SEQUENCE_NUMBER (same as
+        // testAuditLogTableWithSequenceNumberEnabled)
+        assertThat(sql("SELECT * FROM `test_table_dyn$audit_log`"))
+                .containsExactlyInAnyOrder(Row.of("+I", 0L, 1, 2, 3), 
Row.of("+I", 1L, 3, 4, 7));
+
+        // Test out-of-order select with _SEQUENCE_NUMBER
+        assertThat(sql("SELECT b, c, _SEQUENCE_NUMBER FROM 
`test_table_dyn$audit_log`"))
+                .containsExactlyInAnyOrder(Row.of(2, 3, 0L), Row.of(4, 7, 1L));
+
+        // Test selecting only _SEQUENCE_NUMBER, rowkind
+        assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM 
`test_table_dyn$audit_log`"))
+                .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+    }
+
+    @Test
+    public void testBinlogTableWithSequenceNumberEnabled() {
+        // Create primary key table with table-read.sequence-number.enabled 
option
+        sql(
+                "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, 
b int) "
+                        + "WITH 
('table-read.sequence-number.enabled'='true');");
+        sql("INSERT INTO test_table_seq VALUES (1, 2)");
+        sql("INSERT INTO test_table_seq VALUES (3, 4)");
+
+        // Test SELECT * includes _SEQUENCE_NUMBER
+        assertThat(sql("SELECT * FROM `test_table_seq$binlog`"))
+                .containsExactlyInAnyOrder(
+                        Row.of("+I", 0L, new Integer[] {1}, new Integer[] {2}),
+                        Row.of("+I", 1L, new Integer[] {3}, new Integer[] 
{4}));
+
+        // Test out-of-order select with _SEQUENCE_NUMBER
+        assertThat(sql("SELECT b, _SEQUENCE_NUMBER FROM 
`test_table_seq$binlog`"))
+                .containsExactlyInAnyOrder(
+                        Row.of(new Integer[] {2}, 0L), Row.of(new Integer[] 
{4}, 1L));
+
+        // Test selecting only _SEQUENCE_NUMBER
+        assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM 
`test_table_seq$binlog`"))
+                .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+    }
+
+    @Test
+    public void testBinlogTableWithSequenceNumberAlterTable() {
+        // Create primary key table without sequence-number option
+        sql("CREATE TABLE test_table_dyn (a int PRIMARY KEY NOT ENFORCED, b 
int);");
+        sql("INSERT INTO test_table_dyn VALUES (1, 2)");
+        sql("INSERT INTO test_table_dyn VALUES (3, 4)");
+
+        // Add table-read.sequence-number.enabled option via ALTER TABLE
+        sql("ALTER TABLE test_table_dyn SET 
('table-read.sequence-number.enabled'='true')");
+
+        // Test SELECT * includes _SEQUENCE_NUMBER (same as
+        // testBinlogTableWithSequenceNumberEnabled)
+        assertThat(sql("SELECT * FROM `test_table_dyn$binlog`"))
+                .containsExactlyInAnyOrder(
+                        Row.of("+I", 0L, new Integer[] {1}, new Integer[] {2}),
+                        Row.of("+I", 1L, new Integer[] {3}, new Integer[] 
{4}));
+
+        // Test out-of-order select with _SEQUENCE_NUMBER
+        assertThat(sql("SELECT b, _SEQUENCE_NUMBER FROM 
`test_table_dyn$binlog`"))
+                .containsExactlyInAnyOrder(
+                        Row.of(new Integer[] {2}, 0L), Row.of(new Integer[] 
{4}, 1L));
+
+        // Test selecting only _SEQUENCE_NUMBER
+        assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM 
`test_table_dyn$binlog`"))
+                .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+    }
+
     @Test
     public void testBatchReadSourceWithSnapshot() {
         batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222), (3, 33, 
333), (4, 44, 444)");

Reply via email to