wuchong commented on code in PR #2525:
URL: https://github.com/apache/fluss/pull/2525#discussion_r2746993624


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java:
##########
@@ -932,6 +942,65 @@ void testGetChangelogVirtualTable() throws Exception {
         
assertThat(logChangelogTable.getUnresolvedSchema()).isEqualTo(expectedLogSchema);
     }
 
+    @Test
+    void testGetBinlogVirtualTable() throws Exception {
+        // Create a primary key table with partition
+        tEnv.executeSql(
+                "CREATE TABLE pk_table_for_binlog ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING NOT NULL,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id, name) NOT ENFORCED"
+                        + ") PARTITIONED BY (name) "
+                        + "WITH ('bucket.num' = '1')");
+
+        // Get the $binlog virtual table via catalog API
+        CatalogTable binlogTable =
+                (CatalogTable)
+                        catalog.getTable(new ObjectPath(DEFAULT_DB, 
"pk_table_for_binlog$binlog"));
+
+        // Verify binlog schema has 5 columns: _change_type, _log_offset, 
_commit_timestamp,
+        // before, after
+        Schema binlogSchema = binlogTable.getUnresolvedSchema();
+        List<String> columnNames =
+                binlogSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+        assertThat(columnNames)
+                .containsExactly(
+                        "_change_type", "_log_offset", "_commit_timestamp", 
"before", "after");
+
+        // Verify before and after columns are ROW types containing the 
original columns
+        String schemaString = binlogSchema.toString();
+        assertThat(schemaString).contains("before");
+        assertThat(schemaString).contains("after");
+        assertThat(schemaString).contains("ROW<");
+        // Verify nested columns exist in the ROW type
+        assertThat(schemaString).contains("id INT NOT NULL");
+        assertThat(schemaString).contains("name STRING NOT NULL");
+        assertThat(schemaString).contains("amount BIGINT");

Review Comment:
   Could you follow the approach used in `testGetChangelogVirtualTable` and 
assert against an expected schema directly? Using `contains` for schema 
validation is less readable and prone to errors.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.utils.clock.ManualClock;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Integration test for $binlog virtual table functionality. */
+abstract class BinlogVirtualTableITCase extends AbstractTestBase {
+
+    protected static final ManualClock CLOCK = new ManualClock();
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(new Configuration())
+                    .setNumOfTabletServers(1)
+                    .setClock(CLOCK)
+                    .build();
+
+    static final String CATALOG_NAME = "testcatalog";
+    static final String DEFAULT_DB = "test_binlog_db";
+    protected StreamExecutionEnvironment execEnv;
+    protected StreamTableEnvironment tEnv;
+    protected static Connection conn;
+    protected static Admin admin;
+
+    protected static Configuration clientConf;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
+    }
+
+    @BeforeEach
+    void before() {
+        // Initialize Flink environment
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
+
+        // Initialize catalog and database
+        String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        tEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        tEnv.executeSql("use catalog " + CATALOG_NAME);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+        tEnv.executeSql("create database " + DEFAULT_DB);
+        tEnv.useDatabase(DEFAULT_DB);
+        // reset clock before each test
+        CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS);
+    }
+
+    @AfterEach
+    void after() {
+        tEnv.useDatabase(BUILTIN_DATABASE);
+        tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+    }
+
+    /** Deletes rows from a primary key table using the proper delete API. */
+    protected static void deleteRows(
+            Connection connection, TablePath tablePath, List<InternalRow> 
rows) throws Exception {
+        try (Table table = connection.getTable(tablePath)) {
+            UpsertWriter writer = table.newUpsert().createWriter();
+            for (InternalRow row : rows) {
+                writer.delete(row);
+            }
+            writer.flush();
+        }
+    }
+
+    @Test
+    public void testDescribeBinlogTable() throws Exception {
+        // Create a table with various data types to test complex schema
+        tEnv.executeSql(
+                "CREATE TABLE describe_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ")");
+
+        // Test DESCRIBE on binlog virtual table
+        CloseableIterator<Row> describeResult =
+                tEnv.executeSql("DESCRIBE describe_test$binlog").collect();
+
+        List<String> schemaRows = new ArrayList<>();
+        while (describeResult.hasNext()) {
+            schemaRows.add(describeResult.next().toString());
+        }
+
+        // Should have 5 columns: _change_type, _log_offset, 
_commit_timestamp, before, after
+        assertThat(schemaRows).hasSize(5);
+
+        // Verify metadata columns are listed first
+        assertThat(schemaRows.get(0))
+                .isEqualTo("+I[_change_type, STRING, false, null, null, 
null]");
+        assertThat(schemaRows.get(1)).isEqualTo("+I[_log_offset, BIGINT, 
false, null, null, null]");
+        assertThat(schemaRows.get(2))
+                .isEqualTo("+I[_commit_timestamp, TIMESTAMP_LTZ(3), false, 
null, null, null]");
+
+        // Verify before and after are ROW types with original columns
+        assertThat(schemaRows.get(3))
+                .isEqualTo(
+                        "+I[before, ROW<`id` INT NOT NULL, `name` STRING, 
`amount` BIGINT>, true, null, null, null]");
+        assertThat(schemaRows.get(4))
+                .isEqualTo(
+                        "+I[after, ROW<`id` INT NOT NULL, `name` STRING, 
`amount` BIGINT>, true, null, null, null]");
+    }
+
+    @Test
+    public void testBinlogUnsupportedForLogTable() throws Exception {
+        // Create a log table (no primary key)
+        tEnv.executeSql(
+                "CREATE TABLE log_table ("
+                        + "  event_id INT,"
+                        + "  event_type STRING"
+                        + ") WITH ('bucket.num' = '1')");
+
+        // $binlog should fail for log tables
+        assertThatThrownBy(() -> tEnv.executeSql("DESCRIBE 
log_table$binlog").collect())
+                .hasMessageContaining("only supported for primary key tables");
+    }
+
+    @Test
+    public void testBinlogWithAllChangeTypes() throws Exception {
+        // Create a primary key table with 1 bucket for consistent log_offset 
numbers
+        tEnv.executeSql(
+                "CREATE TABLE binlog_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "binlog_test");
+
+        // Start binlog scan
+        String query =
+                "SELECT _change_type, _log_offset, "
+                        + "before.id, before.name, before.amount, "
+                        + "after.id, after.name, after.amount "
+                        + "FROM binlog_test$binlog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Test INSERT
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(
+                conn,
+                tablePath,
+                Arrays.asList(row(1, "Item-1", 100L), row(2, "Item-2", 200L)),
+                false);
+
+        // Collect inserts - each INSERT produces one binlog row
+        List<String> insertResults = collectRowsWithTimeout(rowIter, 2, false);
+        assertThat(insertResults).hasSize(2);
+
+        // INSERT: before=null, after=row data
+        // Format: +I[_change_type, _log_offset, before.id, before.name, 
before.amount,
+        //            after.id, after.name, after.amount]
+        assertThat(insertResults.get(0)).isEqualTo("+I[I, 0, null, null, null, 
1, Item-1, 100]");
+        assertThat(insertResults.get(1)).isEqualTo("+I[I, 1, null, null, null, 
2, Item-2, 200]");
+
+        // Test UPDATE - should merge -U and +U into single binlog row
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 
150L)), false);
+
+        // UPDATE produces ONE binlog row (not two like changelog)
+        List<String> updateResults = collectRowsWithTimeout(rowIter, 1, false);
+        assertThat(updateResults).hasSize(1);
+
+        // UPDATE: before=old row, after=new row, offset=from -U record
+        assertThat(updateResults.get(0))
+                .isEqualTo("+I[U, 2, 1, Item-1, 100, 1, Item-1-Updated, 150]");
+
+        // Test DELETE
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        deleteRows(conn, tablePath, Arrays.asList(row(2, "Item-2", 200L)));
+
+        // DELETE produces one binlog row
+        List<String> deleteResults = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(deleteResults).hasSize(1);
+
+        // DELETE: before=row data, after=null
+        assertThat(deleteResults.get(0)).isEqualTo("+I[D, 4, 2, Item-2, 200, 
null, null, null]");
+    }
+
+    @Test
+    public void testBinlogSelectStar() throws Exception {
+        // Test SELECT * which returns the full binlog structure
+        tEnv.executeSql(
+                "CREATE TABLE star_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "star_test");
+
+        String query = "SELECT * FROM star_test$binlog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Insert a row
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "Alice")), false);
+
+        List<String> results = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(results).hasSize(1);
+
+        // SELECT * returns: _change_type, _log_offset, _commit_timestamp, 
before, after
+        // before is null for INSERT, after contains the row
+        assertThat(results.get(0)).startsWith("+I[I, 0, ");
+        assertThat(results.get(0)).contains("null, +I[1, Alice]");
+    }
+
+    @Test
+    public void testBinlogWithPartitionedTable() throws Exception {
+        // Create a partitioned primary key table
+        tEnv.executeSql(
+                "CREATE TABLE partitioned_binlog_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  region STRING NOT NULL,"
+                        + "  PRIMARY KEY (id, region) NOT ENFORCED"
+                        + ") PARTITIONED BY (region) WITH ('bucket.num' = 
'1')");
+
+        // Insert data into different partitions using Flink SQL
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        tEnv.executeSql(
+                        "INSERT INTO partitioned_binlog_test VALUES "
+                                + "(1, 'Item-1', 'us'), "
+                                + "(2, 'Item-2', 'eu')")
+                .await();
+
+        // Query binlog with nested field access
+        String query =
+                "SELECT _change_type, after.id, after.name, after.region "
+                        + "FROM partitioned_binlog_test$binlog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        List<String> results = collectRowsWithTimeout(rowIter, 2, false);
+        // Sort results for deterministic assertion (partitions may return in 
any order)
+        Collections.sort(results);
+        assertThat(results)
+                .isEqualTo(Arrays.asList("+I[I, 1, Item-1, us]", "+I[I, 2, 
Item-2, eu]"));
+
+        // Update a record in a specific partition
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        tEnv.executeSql("INSERT INTO partitioned_binlog_test VALUES (1, 
'Item-1-Updated', 'us')")
+                .await();
+
+        List<String> updateResults = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(updateResults).hasSize(1);
+        assertThat(updateResults.get(0)).isEqualTo("+I[U, 1, Item-1-Updated, 
us]");
+    }
+
+    @Test
+    public void testBinlogScanStartupMode() throws Exception {
+        // Create a primary key table with 1 bucket
+        tEnv.executeSql(
+                "CREATE TABLE startup_binlog_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "startup_binlog_test");
+
+        // Write first batch
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3")), false);
+
+        // Write second batch
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, tablePath, Arrays.asList(row(4, "v4"), row(5, "v5")), 
false);
+
+        // Test scan.startup.mode='earliest' - should read all records from 
beginning
+        String optionsEarliest = " /*+ OPTIONS('scan.startup.mode' = 
'earliest') */";
+        String queryEarliest =
+                "SELECT _change_type, after.id, after.name FROM 
startup_binlog_test$binlog"
+                        + optionsEarliest;
+        CloseableIterator<Row> rowIterEarliest = 
tEnv.executeSql(queryEarliest).collect();
+        List<String> earliestResults = collectRowsWithTimeout(rowIterEarliest, 
5, true);
+        assertThat(earliestResults).hasSize(5);
+        // All should be INSERT change types
+        for (String result : earliestResults) {
+            assertThat(result).startsWith("+I[I,");
+        }
+
+        // Test scan.startup.mode='timestamp' - should read from specific 
timestamp
+        String optionsTimestamp =
+                " /*+ OPTIONS('scan.startup.mode' = 'timestamp', 
'scan.startup.timestamp' = '150') */";
+        String queryTimestamp =
+                "SELECT _change_type, after.id, after.name FROM 
startup_binlog_test$binlog"
+                        + optionsTimestamp;
+        CloseableIterator<Row> rowIterTimestamp = 
tEnv.executeSql(queryTimestamp).collect();
+        List<String> timestampResults = 
collectRowsWithTimeout(rowIterTimestamp, 2, true);
+        assertThat(timestampResults).hasSize(2);
+        // Sort results for deterministic assertion
+        Collections.sort(timestampResults);

Review Comment:
   ditto. You can change the `TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM` config 
to `1` in the `before()` method. 



##########
fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java:
##########
@@ -65,6 +65,10 @@ public final class TableDescriptor implements Serializable {
     public static final String LOG_OFFSET_COLUMN = "_log_offset";
     public static final String COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp";
 
+    // Reserved column names for $binlog virtual table nested row fields

Review Comment:
   ```suggestion
       // column names for $binlog virtual table nested row fields
   ```
   
   They are not "Reserved" names. "Reserved" means it is not allowed users to 
use. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.deserializer;
+
+import org.apache.fluss.flink.utils.BinlogRowConverter;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import javax.annotation.Nullable;
+
+import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType;
+
+/**
+ * A deserialization schema that converts {@link LogRecord} objects to Flink's 
{@link RowData}
+ * format with nested before/after row structure for the $binlog virtual table.
+ *
+ * <p>This schema is stateful: it buffers UPDATE_BEFORE (-U) records and 
returns {@code null} for
+ * them. When the subsequent UPDATE_AFTER (+U) record arrives, it merges both 
into a single binlog
+ * row. The {@link org.apache.fluss.flink.source.emitter.FlinkRecordEmitter} 
handles null returns by
+ * skipping emission.

Review Comment:
   I noticed a potential data consistency issue in the current implementation 
of `FlinkRecordEmitter` regarding how it handles offsets for `$binlog` tables.
   
   **The Problem:**
   Currently, `FlinkRecordEmitter` advances the log offset in `LogSplitState` 
by 1 immediately after a record is processed, regardless of whether the record 
was fully emitted as a complete transaction (e.g., an `UPDATE` pair).
   
   Specifically, for `$binlog` tables, an `UPDATE` operation is split into two 
logical events: `UPDATE_BEFORE` and `UPDATE_AFTER`. If a checkpoint occurs 
exactly between these two events and the job subsequently fails:
   
   1. The `LogSplitState` will have already been updated to `offset + 1`.
   2. Upon recovery, the source will skip the original log record and start 
reading from the next offset.
   3. The `UPDATE_AFTER` event is lost, or more critically, 
`BinlogRowConverter#toBinlogRowData` will throw an `IllegalStateException` 
because `pendingUpdateBefore` is null when it expects to complete an update 
sequence.
   
   **Suggested Solution:**
   We should modify `FlinkRecordEmitter` to only advance the log offset in the 
state when `deserializationSchema.deserialize` actually returns a non-null 
record (signaling that the logical processing for that physical record is 
complete and emitted).
   
   ---
   
   ### Revised Code Implementation
   
   I suggest refining the logic to ensure the `emitted` flag correctly dictates 
the state update. Note that I've added a comment to clarify that "emitted" here 
implies the full logical unit has been sent downstream.
   
   ```java
   } else if (splitState.isLogSplitState()) {
       // Attempt to process and emit the record. 
       // For $binlog, this returns true only when a complete row (or the final 
part of a split) is emitted.
       boolean emitted = processAndEmitRecord(recordAndPosition.record(), 
sourceOutput);
       
       if (emitted) {
           // Only advance the offset in state if the record was successfully 
emitted.
           // This ensures that if a crash occurs mid-update (between BEFORE 
and AFTER),
           // the source will re-read the same log offset upon recovery, 
           // allowing the BinlogDeserializationSchema to correctly reconstruct 
the state.
           
splitState.asLogSplitState().setNextOffset(recordAndPosition.record().logOffset()
 + 1);
       }
   }
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.client.initializer.OffsetsInitializer;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema;
+import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
+import org.apache.fluss.flink.utils.FlinkConversions;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** A Flink table source for the $binlog virtual table. */
+public class BinlogFlinkTableSource implements ScanTableSource {
+
+    private final TablePath tablePath;
+    private final Configuration flussConfig;
+    // The binlog output type (includes metadata + nested before/after ROW 
columns)
+    private final org.apache.flink.table.types.logical.RowType 
binlogOutputType;
+    // The data columns type extracted from the 'before' nested ROW
+    private final org.apache.flink.table.types.logical.RowType dataColumnsType;
+    private final int[] partitionKeyIndexes;
+    private final boolean streaming;
+    private final FlinkConnectorOptionsUtils.StartupOptions startupOptions;
+    private final long scanPartitionDiscoveryIntervalMs;
+    private final Map<String, String> tableOptions;
+
+    // Projection pushdown
+    @Nullable private int[] projectedFields;
+    private LogicalType producedDataType;
+
+    @Nullable private Predicate partitionFilters;
+
+    public BinlogFlinkTableSource(
+            TablePath tablePath,
+            Configuration flussConfig,
+            org.apache.flink.table.types.logical.RowType binlogOutputType,
+            int[] partitionKeyIndexes,
+            boolean streaming,
+            FlinkConnectorOptionsUtils.StartupOptions startupOptions,
+            long scanPartitionDiscoveryIntervalMs,
+            Map<String, String> tableOptions) {
+        this.tablePath = tablePath;
+        this.flussConfig = flussConfig;
+        this.binlogOutputType = binlogOutputType;
+        this.partitionKeyIndexes = partitionKeyIndexes;
+        this.streaming = streaming;
+        this.startupOptions = startupOptions;
+        this.scanPartitionDiscoveryIntervalMs = 
scanPartitionDiscoveryIntervalMs;
+        this.tableOptions = tableOptions;
+
+        // Extract data columns from the 'before' nested ROW type (index 3)
+        // The binlog schema is: [_change_type, _log_offset, 
_commit_timestamp, before, after]
+        this.dataColumnsType =
+                (org.apache.flink.table.types.logical.RowType) 
binlogOutputType.getTypeAt(3);
+        this.producedDataType = binlogOutputType;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
+        // Create the Fluss row type for the data columns (the original table 
columns)
+        RowType flussRowType = 
FlinkConversions.toFlussRowType(dataColumnsType);
+        if (projectedFields != null) {
+            flussRowType = flussRowType.project(projectedFields);
+        }
+
+        // Determine the offsets initializer based on startup mode
+        OffsetsInitializer offsetsInitializer;
+        switch (startupOptions.startupMode) {
+            case EARLIEST:
+            case FULL:
+                // For binlog, read all log records from the beginning
+                offsetsInitializer = OffsetsInitializer.earliest();
+                break;
+            case LATEST:
+                offsetsInitializer = OffsetsInitializer.latest();
+                break;
+            case TIMESTAMP:
+                offsetsInitializer =
+                        
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported startup mode: " + 
startupOptions.startupMode);
+        }
+
+        // Create the source with the binlog deserialization schema
+        FlinkSource<RowData> source =
+                new FlinkSource<>(
+                        flussConfig,
+                        tablePath,
+                        false,
+                        isPartitioned(),
+                        flussRowType,
+                        projectedFields,
+                        offsetsInitializer,
+                        scanPartitionDiscoveryIntervalMs,
+                        new BinlogDeserializationSchema(),
+                        streaming,
+                        partitionFilters,
+                        null);
+
+        if (!streaming) {
+            // Batch mode - binlog virtual tables read from log, not data lake
+            return new SourceProvider() {
+                @Override
+                public boolean isBounded() {
+                    return true;
+                }
+
+                @Override
+                public Source<RowData, ?, ?> createSource() {
+                    return source;
+                }
+            };
+        } else {
+            return SourceProvider.of(source);
+        }

Review Comment:
   Simply return `SourceProvider.of(source)` should be fine. It returns 
`isBounded` as `true` when `streaming` is `false`.
   We should also update the `ChangelogFlinkTableSource`.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java:
##########
@@ -245,7 +245,16 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "The serialized base64 bytes of refresh handler of 
materialized table.");
 
-    // 
------------------------------------------------------------------------------------------
+    /**
+     * Internal option used to pass the base table's partition key column 
names to the binlog table
+     * source. The value is a comma-separated list of column names.
+     */
+    public static final ConfigOption<String> BINLOG_PARTITION_KEYS =
+            ConfigOptions.key("binlog.partition-keys")

Review Comment:
   we don't need this



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java:
##########
@@ -960,4 +959,112 @@ private Schema buildChangelogSchema(Schema 
originalSchema) {
 
         return builder.build();
     }
+
+    /**
+     * Creates a virtual $binlog table by modifying the base table's schema to 
include metadata
+     * columns and nested before/after ROW fields.
+     */
+    private CatalogBaseTable getVirtualBinlogTable(ObjectPath objectPath)
+            throws TableNotExistException, CatalogException {
+        // Extract the base table name (remove $binlog suffix)
+        String virtualTableName = objectPath.getObjectName();
+        String baseTableName =
+                virtualTableName.substring(
+                        0, virtualTableName.length() - 
BINLOG_TABLE_SUFFIX.length());
+
+        // Get the base table
+        ObjectPath baseObjectPath = new 
ObjectPath(objectPath.getDatabaseName(), baseTableName);
+        TablePath baseTablePath = toTablePath(baseObjectPath);
+
+        try {
+            // Retrieve base table info
+            TableInfo tableInfo = admin.getTableInfo(baseTablePath).get();
+
+            // $binlog is only supported for primary key tables
+            if (!tableInfo.hasPrimaryKey()) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "$binlog virtual tables are only supported for 
primary key tables. "
+                                        + "Table %s does not have a primary 
key.",
+                                baseTablePath));
+            }
+
+            // Convert to Flink table
+            CatalogBaseTable catalogBaseTable = 
FlinkConversions.toFlinkTable(tableInfo);
+
+            if (!(catalogBaseTable instanceof CatalogTable)) {
+                throw new UnsupportedOperationException(
+                        "Virtual $binlog tables are only supported for regular 
tables");
+            }
+
+            CatalogTable baseTable = (CatalogTable) catalogBaseTable;
+
+            // Build the binlog schema with nested before/after ROW columns
+            Schema originalSchema = baseTable.getUnresolvedSchema();
+            Schema binlogSchema = buildBinlogSchema(originalSchema);
+
+            // Copy options from base table
+            Map<String, String> newOptions = new 
HashMap<>(baseTable.getOptions());
+            newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
+            newOptions.putAll(securityConfigs);
+
+            // Store partition key names for the table source to use.
+            // Since binlog schema has nested columns, we can't use Flink's 
partition key mechanism.
+            List<String> partitionKeys = baseTable.getPartitionKeys();
+            if (!partitionKeys.isEmpty()) {
+                newOptions.put(
+                        FlinkConnectorOptions.BINLOG_PARTITION_KEYS.key(),
+                        String.join(",", partitionKeys));
+            }

Review Comment:
   We don't need extract the partition key indexes, we only need to know 
whether it is a partitioned table.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.GenericRecord;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.row.indexed.IndexedRowWriter;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit test for {@link BinlogRowConverter}. */
+class BinlogRowConverterTest {
+
+    private RowType testRowType;
+    private BinlogRowConverter converter;
+
+    @BeforeEach
+    void setUp() {
+        // Create a simple test table schema: (id INT, name STRING, amount 
BIGINT)
+        testRowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("name", DataTypes.STRING())
+                        .field("amount", DataTypes.BIGINT())
+                        .build();
+
+        converter = new BinlogRowConverter(testRowType);
+    }
+
+    @Test
+    void testConvertInsertRecord() throws Exception {
+        LogRecord record = createLogRecord(ChangeType.INSERT, 100L, 1000L, 1, 
"Alice", 5000L);
+
+        RowData result = converter.convert(record);
+
+        // Verify row kind is always INSERT for virtual tables
+        assertThat(result).isNotNull();
+        assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+        // Verify metadata columns
+        assertThat(result.getString(0)).isEqualTo(StringData.fromString("I"));
+        assertThat(result.getLong(1)).isEqualTo(100L); // log offset
+        assertThat(result.getTimestamp(2, 3)).isNotNull(); // commit timestamp

Review Comment:
   We can assert the value of commit timestamp



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -327,4 +349,67 @@ private DynamicTableSource createChangelogTableSource(
                 partitionDiscoveryIntervalMs,
                 catalogTableOptions);
     }
+
+    /** Creates a BinlogFlinkTableSource for $binlog virtual tables. */
+    private DynamicTableSource createBinlogTableSource(
+            Context context, ObjectIdentifier tableIdentifier, String 
tableName) {
+        // Extract the base table name by removing the $binlog suffix
+        String baseTableName =
+                tableName.substring(
+                        0, tableName.length() - 
FlinkCatalog.BINLOG_TABLE_SUFFIX.length());
+
+        boolean isStreamingMode =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
+
+        // tableOutputType: [_change_type, _log_offset, _commit_timestamp, 
before ROW<...>, after
+        // ROW<...>]
+        RowType tableOutputType = (RowType) 
context.getPhysicalRowDataType().getLogicalType();
+
+        // Extract data columns type from the 'before' nested ROW field (index 
3)
+        RowType dataColumnsType = (RowType) tableOutputType.getTypeAt(3);
+
+        Map<String, String> catalogTableOptions = 
context.getCatalogTable().getOptions();
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        final ReadableConfig tableOptions = helper.getOptions();
+        validateSourceOptions(helper, tableOptions);
+
+        ZoneId timeZone =
+                FlinkConnectorOptionsUtils.getLocalTimeZone(
+                        
context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE));
+        final FlinkConnectorOptionsUtils.StartupOptions startupOptions =
+                FlinkConnectorOptionsUtils.getStartupOptions(tableOptions, 
timeZone);
+
+        // Get partition keys from the internal option (since binlog schema 
has nested columns,
+        // the catalog table's partition keys list is empty to avoid Flink 
validation errors).
+        String partitionKeysOption =
+                context.getCatalogTable()
+                        .getOptions()
+                        
.get(FlinkConnectorOptions.BINLOG_PARTITION_KEYS.key());
+        int[] partitionKeyIndexes;
+        if (partitionKeysOption != null && !partitionKeysOption.isEmpty()) {
+            String[] partitionKeyNames = partitionKeysOption.split(",");
+            partitionKeyIndexes = new int[partitionKeyNames.length];
+            for (int i = 0; i < partitionKeyNames.length; i++) {
+                partitionKeyIndexes[i] = 
dataColumnsType.getFieldIndex(partitionKeyNames[i]);
+            }
+        } else {
+            partitionKeyIndexes = new int[0];

Review Comment:
   A boolean flag `isPartitionedTable` is enough. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -261,11 +276,18 @@ private LakeTableFactory mayInitLakeTableFactory() {
     }
 
     /**
-     * Validates table source options explicitly recognized by Flink.
+     * Validates table source options using the standard validation pattern.
      *
+     * @param helper the factory helper for option validation
      * @param tableOptions the table options to validate
      */
-    private static void validateSourceOptions(ReadableConfig tableOptions) {
+    private static void validateSourceOptions(
+            FactoryUtil.TableFactoryHelper helper, ReadableConfig 
tableOptions) {
+        Optional<DataLakeFormat> datalakeFormat = 
getDatalakeFormat(tableOptions);
+        List<String> prefixesToSkip =
+                new ArrayList<>(Arrays.asList("table.", "client.", "fields.", 
"binlog."));

Review Comment:
   Please rebase to latest `main` branch, we have removed the 
`helper.validateExcept` check in connector. See 
https://github.com/apache/fluss/commit/336216cc91903c4852976340d95dae8d300befa4



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.GenericRecord;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.row.indexed.IndexedRowWriter;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit test for {@link BinlogRowConverter}. */
+class BinlogRowConverterTest {
+
+    private RowType testRowType;
+    private BinlogRowConverter converter;
+
+    @BeforeEach
+    void setUp() {
+        // Create a simple test table schema: (id INT, name STRING, amount 
BIGINT)
+        testRowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("name", DataTypes.STRING())
+                        .field("amount", DataTypes.BIGINT())
+                        .build();
+
+        converter = new BinlogRowConverter(testRowType);
+    }
+
+    @Test
+    void testConvertInsertRecord() throws Exception {
+        LogRecord record = createLogRecord(ChangeType.INSERT, 100L, 1000L, 1, 
"Alice", 5000L);
+
+        RowData result = converter.convert(record);
+
+        // Verify row kind is always INSERT for virtual tables
+        assertThat(result).isNotNull();
+        assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+        // Verify metadata columns
+        assertThat(result.getString(0)).isEqualTo(StringData.fromString("I"));
+        assertThat(result.getLong(1)).isEqualTo(100L); // log offset
+        assertThat(result.getTimestamp(2, 3)).isNotNull(); // commit timestamp
+
+        // Verify before is null for INSERT
+        assertThat(result.isNullAt(3)).isTrue();
+
+        // Verify after contains the row data
+        RowData afterRow = result.getRow(4, 3);
+        assertThat(afterRow).isNotNull();
+        assertThat(afterRow.getInt(0)).isEqualTo(1); // id
+        assertThat(afterRow.getString(1).toString()).isEqualTo("Alice"); // 
name
+        assertThat(afterRow.getLong(2)).isEqualTo(5000L); // amount
+    }
+
+    @Test
+    void testConvertUpdateMerge() throws Exception {
+        // Send -U (UPDATE_BEFORE) - should return null (buffered)
+        LogRecord beforeRecord =
+                createLogRecord(ChangeType.UPDATE_BEFORE, 200L, 2000L, 2, 
"Bob", 3000L);
+        RowData beforeResult = converter.convert(beforeRecord);
+        assertThat(beforeResult).isNull();
+
+        // Send +U (UPDATE_AFTER) - should return merged row
+        LogRecord afterRecord =
+                createLogRecord(ChangeType.UPDATE_AFTER, 201L, 2000L, 2, 
"Bob-Updated", 4000L);
+        RowData result = converter.convert(afterRecord);
+
+        assertThat(result).isNotNull();
+        assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+        // Verify metadata columns
+        assertThat(result.getString(0)).isEqualTo(StringData.fromString("U"));
+        // Offset should be from the -U record (first entry of update pair)
+        assertThat(result.getLong(1)).isEqualTo(200L);
+
+        // Verify before contains old data
+        RowData beforeRow = result.getRow(3, 3);
+        assertThat(beforeRow).isNotNull();
+        assertThat(beforeRow.getInt(0)).isEqualTo(2);
+        assertThat(beforeRow.getString(1).toString()).isEqualTo("Bob");
+        assertThat(beforeRow.getLong(2)).isEqualTo(3000L);
+
+        // Verify after contains new data
+        RowData afterRow = result.getRow(4, 3);
+        assertThat(afterRow).isNotNull();
+        assertThat(afterRow.getInt(0)).isEqualTo(2);
+        assertThat(afterRow.getString(1).toString()).isEqualTo("Bob-Updated");
+        assertThat(afterRow.getLong(2)).isEqualTo(4000L);
+    }
+
+    @Test
+    void testConvertDeleteRecord() throws Exception {
+        LogRecord record = createLogRecord(ChangeType.DELETE, 300L, 3000L, 3, 
"Charlie", 1000L);
+
+        RowData result = converter.convert(record);
+
+        assertThat(result).isNotNull();
+        assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+        // Verify metadata columns
+        assertThat(result.getString(0)).isEqualTo(StringData.fromString("D"));
+        assertThat(result.getLong(1)).isEqualTo(300L);

Review Comment:
   assert timestamp column as well?



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.utils.clock.ManualClock;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Integration test for $binlog virtual table functionality. */
+abstract class BinlogVirtualTableITCase extends AbstractTestBase {
+
+    protected static final ManualClock CLOCK = new ManualClock();
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(new Configuration())
+                    .setNumOfTabletServers(1)
+                    .setClock(CLOCK)
+                    .build();
+
+    static final String CATALOG_NAME = "testcatalog";
+    static final String DEFAULT_DB = "test_binlog_db";
+    protected StreamExecutionEnvironment execEnv;
+    protected StreamTableEnvironment tEnv;
+    protected static Connection conn;
+    protected static Admin admin;
+
+    protected static Configuration clientConf;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
+    }
+
+    @BeforeEach
+    void before() {
+        // Initialize Flink environment
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
+
+        // Initialize catalog and database
+        String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        tEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        tEnv.executeSql("use catalog " + CATALOG_NAME);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+        tEnv.executeSql("create database " + DEFAULT_DB);
+        tEnv.useDatabase(DEFAULT_DB);
+        // reset clock before each test
+        CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS);
+    }
+
+    @AfterEach
+    void after() {
+        tEnv.useDatabase(BUILTIN_DATABASE);
+        tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+    }
+
+    /** Deletes rows from a primary key table using the proper delete API. */
+    protected static void deleteRows(
+            Connection connection, TablePath tablePath, List<InternalRow> 
rows) throws Exception {
+        try (Table table = connection.getTable(tablePath)) {
+            UpsertWriter writer = table.newUpsert().createWriter();
+            for (InternalRow row : rows) {
+                writer.delete(row);
+            }
+            writer.flush();
+        }
+    }
+
+    @Test
+    public void testDescribeBinlogTable() throws Exception {
+        // Create a table with various data types to test complex schema
+        tEnv.executeSql(
+                "CREATE TABLE describe_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ")");
+
+        // Test DESCRIBE on binlog virtual table
+        CloseableIterator<Row> describeResult =
+                tEnv.executeSql("DESCRIBE describe_test$binlog").collect();
+
+        List<String> schemaRows = new ArrayList<>();
+        while (describeResult.hasNext()) {
+            schemaRows.add(describeResult.next().toString());
+        }
+
+        // Should have 5 columns: _change_type, _log_offset, 
_commit_timestamp, before, after
+        assertThat(schemaRows).hasSize(5);
+
+        // Verify metadata columns are listed first
+        assertThat(schemaRows.get(0))
+                .isEqualTo("+I[_change_type, STRING, false, null, null, 
null]");
+        assertThat(schemaRows.get(1)).isEqualTo("+I[_log_offset, BIGINT, 
false, null, null, null]");
+        assertThat(schemaRows.get(2))
+                .isEqualTo("+I[_commit_timestamp, TIMESTAMP_LTZ(3), false, 
null, null, null]");
+
+        // Verify before and after are ROW types with original columns
+        assertThat(schemaRows.get(3))
+                .isEqualTo(
+                        "+I[before, ROW<`id` INT NOT NULL, `name` STRING, 
`amount` BIGINT>, true, null, null, null]");
+        assertThat(schemaRows.get(4))
+                .isEqualTo(
+                        "+I[after, ROW<`id` INT NOT NULL, `name` STRING, 
`amount` BIGINT>, true, null, null, null]");
+    }
+
+    @Test
+    public void testBinlogUnsupportedForLogTable() throws Exception {
+        // Create a log table (no primary key)
+        tEnv.executeSql(
+                "CREATE TABLE log_table ("
+                        + "  event_id INT,"
+                        + "  event_type STRING"
+                        + ") WITH ('bucket.num' = '1')");
+
+        // $binlog should fail for log tables
+        assertThatThrownBy(() -> tEnv.executeSql("DESCRIBE 
log_table$binlog").collect())
+                .hasMessageContaining("only supported for primary key tables");
+    }
+
+    @Test
+    public void testBinlogWithAllChangeTypes() throws Exception {
+        // Create a primary key table with 1 bucket for consistent log_offset 
numbers
+        tEnv.executeSql(
+                "CREATE TABLE binlog_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "binlog_test");
+
+        // Start binlog scan
+        String query =
+                "SELECT _change_type, _log_offset, "
+                        + "before.id, before.name, before.amount, "
+                        + "after.id, after.name, after.amount "
+                        + "FROM binlog_test$binlog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Test INSERT
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(
+                conn,
+                tablePath,
+                Arrays.asList(row(1, "Item-1", 100L), row(2, "Item-2", 200L)),
+                false);
+
+        // Collect inserts - each INSERT produces one binlog row
+        List<String> insertResults = collectRowsWithTimeout(rowIter, 2, false);
+        assertThat(insertResults).hasSize(2);
+
+        // INSERT: before=null, after=row data
+        // Format: +I[_change_type, _log_offset, before.id, before.name, 
before.amount,
+        //            after.id, after.name, after.amount]
+        assertThat(insertResults.get(0)).isEqualTo("+I[I, 0, null, null, null, 
1, Item-1, 100]");
+        assertThat(insertResults.get(1)).isEqualTo("+I[I, 1, null, null, null, 
2, Item-2, 200]");
+
+        // Test UPDATE - should merge -U and +U into single binlog row
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 
150L)), false);
+
+        // UPDATE produces ONE binlog row (not two like changelog)
+        List<String> updateResults = collectRowsWithTimeout(rowIter, 1, false);
+        assertThat(updateResults).hasSize(1);
+
+        // UPDATE: before=old row, after=new row, offset=from -U record
+        assertThat(updateResults.get(0))
+                .isEqualTo("+I[U, 2, 1, Item-1, 100, 1, Item-1-Updated, 150]");
+
+        // Test DELETE
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        deleteRows(conn, tablePath, Arrays.asList(row(2, "Item-2", 200L)));
+
+        // DELETE produces one binlog row
+        List<String> deleteResults = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(deleteResults).hasSize(1);
+
+        // DELETE: before=row data, after=null
+        assertThat(deleteResults.get(0)).isEqualTo("+I[D, 4, 2, Item-2, 200, 
null, null, null]");
+    }
+
+    @Test
+    public void testBinlogSelectStar() throws Exception {
+        // Test SELECT * which returns the full binlog structure
+        tEnv.executeSql(
+                "CREATE TABLE star_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "star_test");
+
+        String query = "SELECT * FROM star_test$binlog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Insert a row
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "Alice")), false);
+
+        List<String> results = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(results).hasSize(1);
+
+        // SELECT * returns: _change_type, _log_offset, _commit_timestamp, 
before, after
+        // before is null for INSERT, after contains the row
+        assertThat(results.get(0)).startsWith("+I[I, 0, ");
+        assertThat(results.get(0)).contains("null, +I[1, Alice]");

Review Comment:
   Can't we assert a full row string?



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.client.initializer.OffsetsInitializer;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema;
+import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
+import org.apache.fluss.flink.utils.FlinkConversions;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** A Flink table source for the $binlog virtual table. */
+public class BinlogFlinkTableSource implements ScanTableSource {
+
+    private final TablePath tablePath;
+    private final Configuration flussConfig;
+    // The binlog output type (includes metadata + nested before/after ROW 
columns)
+    private final org.apache.flink.table.types.logical.RowType 
binlogOutputType;
+    // The data columns type extracted from the 'before' nested ROW
+    private final org.apache.flink.table.types.logical.RowType dataColumnsType;
+    private final int[] partitionKeyIndexes;

Review Comment:
   We don't need this indexes, we only need a `boolean` flag to indicate it is 
partitioned or not. This can simplify the code. 



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.utils.clock.ManualClock;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Integration test for $binlog virtual table functionality. */
+abstract class BinlogVirtualTableITCase extends AbstractTestBase {
+
+    protected static final ManualClock CLOCK = new ManualClock();
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(new Configuration())
+                    .setNumOfTabletServers(1)
+                    .setClock(CLOCK)
+                    .build();
+
+    static final String CATALOG_NAME = "testcatalog";
+    static final String DEFAULT_DB = "test_binlog_db";
+    protected StreamExecutionEnvironment execEnv;
+    protected StreamTableEnvironment tEnv;
+    protected static Connection conn;
+    protected static Admin admin;
+
+    protected static Configuration clientConf;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
+    }
+
+    @BeforeEach
+    void before() {
+        // Initialize Flink environment
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
+
+        // Initialize catalog and database
+        String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        tEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        tEnv.executeSql("use catalog " + CATALOG_NAME);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+        tEnv.executeSql("create database " + DEFAULT_DB);
+        tEnv.useDatabase(DEFAULT_DB);
+        // reset clock before each test
+        CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS);
+    }
+
+    @AfterEach
+    void after() {
+        tEnv.useDatabase(BUILTIN_DATABASE);
+        tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+    }
+
+    /** Deletes rows from a primary key table using the proper delete API. */
+    protected static void deleteRows(
+            Connection connection, TablePath tablePath, List<InternalRow> 
rows) throws Exception {
+        try (Table table = connection.getTable(tablePath)) {
+            UpsertWriter writer = table.newUpsert().createWriter();
+            for (InternalRow row : rows) {
+                writer.delete(row);
+            }
+            writer.flush();
+        }
+    }
+
+    @Test
+    public void testDescribeBinlogTable() throws Exception {
+        // Create a table with various data types to test complex schema
+        tEnv.executeSql(
+                "CREATE TABLE describe_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ")");
+
+        // Test DESCRIBE on binlog virtual table
+        CloseableIterator<Row> describeResult =
+                tEnv.executeSql("DESCRIBE describe_test$binlog").collect();
+
+        List<String> schemaRows = new ArrayList<>();
+        while (describeResult.hasNext()) {
+            schemaRows.add(describeResult.next().toString());
+        }
+
+        // Should have 5 columns: _change_type, _log_offset, 
_commit_timestamp, before, after
+        assertThat(schemaRows).hasSize(5);
+
+        // Verify metadata columns are listed first
+        assertThat(schemaRows.get(0))
+                .isEqualTo("+I[_change_type, STRING, false, null, null, 
null]");
+        assertThat(schemaRows.get(1)).isEqualTo("+I[_log_offset, BIGINT, 
false, null, null, null]");
+        assertThat(schemaRows.get(2))
+                .isEqualTo("+I[_commit_timestamp, TIMESTAMP_LTZ(3), false, 
null, null, null]");
+
+        // Verify before and after are ROW types with original columns
+        assertThat(schemaRows.get(3))
+                .isEqualTo(
+                        "+I[before, ROW<`id` INT NOT NULL, `name` STRING, 
`amount` BIGINT>, true, null, null, null]");
+        assertThat(schemaRows.get(4))
+                .isEqualTo(
+                        "+I[after, ROW<`id` INT NOT NULL, `name` STRING, 
`amount` BIGINT>, true, null, null, null]");
+    }
+
+    @Test
+    public void testBinlogUnsupportedForLogTable() throws Exception {
+        // Create a log table (no primary key)
+        tEnv.executeSql(
+                "CREATE TABLE log_table ("
+                        + "  event_id INT,"
+                        + "  event_type STRING"
+                        + ") WITH ('bucket.num' = '1')");
+
+        // $binlog should fail for log tables
+        assertThatThrownBy(() -> tEnv.executeSql("DESCRIBE 
log_table$binlog").collect())
+                .hasMessageContaining("only supported for primary key tables");
+    }
+
+    @Test
+    public void testBinlogWithAllChangeTypes() throws Exception {
+        // Create a primary key table with 1 bucket for consistent log_offset 
numbers
+        tEnv.executeSql(
+                "CREATE TABLE binlog_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  amount BIGINT,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "binlog_test");
+
+        // Start binlog scan
+        String query =
+                "SELECT _change_type, _log_offset, "
+                        + "before.id, before.name, before.amount, "
+                        + "after.id, after.name, after.amount "
+                        + "FROM binlog_test$binlog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Test INSERT
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(
+                conn,
+                tablePath,
+                Arrays.asList(row(1, "Item-1", 100L), row(2, "Item-2", 200L)),
+                false);
+
+        // Collect inserts - each INSERT produces one binlog row
+        List<String> insertResults = collectRowsWithTimeout(rowIter, 2, false);
+        assertThat(insertResults).hasSize(2);
+
+        // INSERT: before=null, after=row data
+        // Format: +I[_change_type, _log_offset, before.id, before.name, 
before.amount,
+        //            after.id, after.name, after.amount]
+        assertThat(insertResults.get(0)).isEqualTo("+I[I, 0, null, null, null, 
1, Item-1, 100]");
+        assertThat(insertResults.get(1)).isEqualTo("+I[I, 1, null, null, null, 
2, Item-2, 200]");
+
+        // Test UPDATE - should merge -U and +U into single binlog row
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 
150L)), false);
+
+        // UPDATE produces ONE binlog row (not two like changelog)
+        List<String> updateResults = collectRowsWithTimeout(rowIter, 1, false);
+        assertThat(updateResults).hasSize(1);
+
+        // UPDATE: before=old row, after=new row, offset=from -U record
+        assertThat(updateResults.get(0))
+                .isEqualTo("+I[U, 2, 1, Item-1, 100, 1, Item-1-Updated, 150]");
+
+        // Test DELETE
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        deleteRows(conn, tablePath, Arrays.asList(row(2, "Item-2", 200L)));
+
+        // DELETE produces one binlog row
+        List<String> deleteResults = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(deleteResults).hasSize(1);
+
+        // DELETE: before=row data, after=null
+        assertThat(deleteResults.get(0)).isEqualTo("+I[D, 4, 2, Item-2, 200, 
null, null, null]");
+    }
+
+    @Test
+    public void testBinlogSelectStar() throws Exception {
+        // Test SELECT * which returns the full binlog structure
+        tEnv.executeSql(
+                "CREATE TABLE star_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "star_test");
+
+        String query = "SELECT * FROM star_test$binlog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        // Insert a row
+        CLOCK.advanceTime(Duration.ofMillis(1000));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "Alice")), false);
+
+        List<String> results = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(results).hasSize(1);
+
+        // SELECT * returns: _change_type, _log_offset, _commit_timestamp, 
before, after
+        // before is null for INSERT, after contains the row
+        assertThat(results.get(0)).startsWith("+I[I, 0, ");
+        assertThat(results.get(0)).contains("null, +I[1, Alice]");
+    }
+
+    @Test
+    public void testBinlogWithPartitionedTable() throws Exception {
+        // Create a partitioned primary key table
+        tEnv.executeSql(
+                "CREATE TABLE partitioned_binlog_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  region STRING NOT NULL,"
+                        + "  PRIMARY KEY (id, region) NOT ENFORCED"
+                        + ") PARTITIONED BY (region) WITH ('bucket.num' = 
'1')");
+
+        // Insert data into different partitions using Flink SQL
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        tEnv.executeSql(
+                        "INSERT INTO partitioned_binlog_test VALUES "
+                                + "(1, 'Item-1', 'us'), "
+                                + "(2, 'Item-2', 'eu')")
+                .await();
+
+        // Query binlog with nested field access
+        String query =
+                "SELECT _change_type, after.id, after.name, after.region "
+                        + "FROM partitioned_binlog_test$binlog";
+        CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+        List<String> results = collectRowsWithTimeout(rowIter, 2, false);
+        // Sort results for deterministic assertion (partitions may return in 
any order)
+        Collections.sort(results);
+        assertThat(results)
+                .isEqualTo(Arrays.asList("+I[I, 1, Item-1, us]", "+I[I, 2, 
Item-2, eu]"));
+
+        // Update a record in a specific partition
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        tEnv.executeSql("INSERT INTO partitioned_binlog_test VALUES (1, 
'Item-1-Updated', 'us')")
+                .await();
+
+        List<String> updateResults = collectRowsWithTimeout(rowIter, 1, true);
+        assertThat(updateResults).hasSize(1);
+        assertThat(updateResults.get(0)).isEqualTo("+I[U, 1, Item-1-Updated, 
us]");
+    }
+
+    @Test
+    public void testBinlogScanStartupMode() throws Exception {
+        // Create a primary key table with 1 bucket
+        tEnv.executeSql(
+                "CREATE TABLE startup_binlog_test ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "startup_binlog_test");
+
+        // Write first batch
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, tablePath, Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3")), false);
+
+        // Write second batch
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, tablePath, Arrays.asList(row(4, "v4"), row(5, "v5")), 
false);
+
+        // Test scan.startup.mode='earliest' - should read all records from 
beginning
+        String optionsEarliest = " /*+ OPTIONS('scan.startup.mode' = 
'earliest') */";
+        String queryEarliest =
+                "SELECT _change_type, after.id, after.name FROM 
startup_binlog_test$binlog"
+                        + optionsEarliest;
+        CloseableIterator<Row> rowIterEarliest = 
tEnv.executeSql(queryEarliest).collect();
+        List<String> earliestResults = collectRowsWithTimeout(rowIterEarliest, 
5, true);
+        assertThat(earliestResults).hasSize(5);
+        // All should be INSERT change types
+        for (String result : earliestResults) {
+            assertThat(result).startsWith("+I[I,");
+        }

Review Comment:
   Please assert the full row. If you want to make the result deterministic and 
ordered. You can change the `TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM` config to 
`1` in the `before()` method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to