MehulBatra commented on code in PR #2565:
URL: https://github.com/apache/fluss/pull/2565#discussion_r2764255698


##########
website/docs/table-design/virtual-tables.md:
##########
@@ -99,70 +91,128 @@ SELECT * FROM orders$changelog;
 Output:
 
 ```
-+----+--------------+-------------+---------------------+----------+---------------+---------+
-| op | _change_type | _log_offset | _commit_timestamp   | order_id | 
customer_name | amount  |
-+----+--------------+-------------+---------------------+----------+---------------+---------+
-| +I | +I           |           0 | 2024-01-15 10:30:00 |        1 | Rhea      
    |  100.00 |
-| +I | -U           |           1 | 2024-01-15 10:35:00 |        1 | Rhea      
    |  100.00 |
-| +I | +U           |           2 | 2024-01-15 10:35:00 |        1 | Rhea      
    |  150.00 |
-| +I | -D           |           3 | 2024-01-15 10:40:00 |        1 | Rhea      
    |  150.00 |
-+----+--------------+-------------+---------------------+----------+---------------+---------+
++----+---------------+-------------+---------------------+----------+---------------+---------+
+| op | _change_type  | _log_offset | _commit_timestamp   | order_id | 
customer_name | amount  |
++----+---------------+-------------+---------------------+----------+---------------+---------+
+| +I | insert        |           0 | 2024-01-15 10:30:00 |        1 | Rhea     
     |  100.00 |
+| +I | update_before |           1 | 2024-01-15 10:35:00 |        1 | Rhea     
     |  100.00 |
+| +I | update_after  |           2 | 2024-01-15 10:35:00 |        1 | Rhea     
     |  150.00 |
+| +I | delete        |           3 | 2024-01-15 10:40:00 |        1 | Rhea     
     |  150.00 |
++----+---------------+-------------+---------------------+----------+---------------+---------+
 ```
 
 :::note
 The `op` column is Flink's row kind indicator. For changelog virtual tables, 
all rows are emitted as `+I` (insert) to the downstream, while the actual 
change type is captured in the `_change_type` column.
 :::
 
-#### Log Table Changelog
 
-Consider a Log Table storing click events:
+## Binlog Table
+
+The `$binlog` virtual table provides access to change data where each record 
contains both the before and after images of the row. This is useful for:
+
+:::note
+The `$binlog` virtual table is only available for **Primary Key Tables**.
+:::
+
+### Accessing the Binlog
+
+To access the binlog of a Primary Key Table, append `$binlog` to the table 
name:
+
+```sql title="Flink SQL"
+SELECT * FROM my_pk_table$binlog;
+```
+
+### Schema
+
+The binlog virtual table includes three metadata columns followed by nested 
`before` and `after` row structures:
+
+| Column | Type | Description |
+|--------|------|-------------|
+| `_change_type` | STRING NOT NULL | The type of change operation: `insert`, 
`update`, or `delete` |
+| `_log_offset` | BIGINT NOT NULL | The offset position in the log |
+| `_commit_timestamp` | TIMESTAMP_LTZ(3) NOT NULL | The timestamp when the 
change was committed |
+| `before` | ROW<...> | The row values before the change (NULL for 
inserts) |
+| `after` | ROW<...> | The row values after the change (NULL for 
deletes) |
+
+The `before` and `after` columns are nested ROW types containing all columns 
from the base table.
+
+### Change Types
+
+| Change Type | Description | `before` | `after` |
+|-------------|-------------|----------|---------|
+| `insert` | A new row was inserted | NULL | Contains new row values |
+| `update` | A row was updated | Contains old row values | Contains new row 
values |
+| `delete` | A row was deleted | Contains deleted row values | NULL |
+
+### Examples
 
 ```sql title="Flink SQL"
--- Create a log table (no primary key)
-CREATE TABLE click_events (
-    event_id INT,
-    user_id INT,
-    event_type STRING
+-- Create a primary key table
+CREATE TABLE users (
+    user_id INT NOT NULL,
+    name STRING,
+    email STRING,
+    PRIMARY KEY (user_id) NOT ENFORCED
 ) WITH ('bucket.num' = '1');
 
--- Append events
-INSERT INTO click_events VALUES (1, 101, 'click'), (2, 102, 'view');
+-- Insert, update, then delete a record
+INSERT INTO users VALUES (1, 'Alice', '[email protected]');
+INSERT INTO users VALUES (1, 'Alice Smith', '[email protected]');
+DELETE FROM users WHERE user_id = 1;
 
--- Query the changelog
-SELECT * FROM click_events$changelog;
+-- Query the binlog
+SELECT * FROM users$binlog;
 ```
 
 Output:
 
 ```
-+----+--------------+-------------+---------------------+----------+---------+------------+
-| op | _change_type | _log_offset | _commit_timestamp   | event_id | user_id | 
event_type |
-+----+--------------+-------------+---------------------+----------+---------+------------+
-| +I | +A           |           0 | 2024-01-15 11:00:00 |        1 |     101 | 
click      |
-| +I | +A           |           1 | 2024-01-15 11:00:00 |        2 |     102 | 
view       |
-+----+--------------+-------------+---------------------+----------+---------+------------+
++----+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
+| op | _change_type | _log_offset | _commit_timestamp   | before               
            | after                                |
++----+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
+| +I | insert       |           0 | 2024-01-15 10:30:00 | NULL                 
            | (1, Alice, [email protected])        |
+| +I | update       |           2 | 2024-01-15 10:35:00 | (1, Alice, 
[email protected])    | (1, Alice Smith, [email protected]) |
+| +I | delete       |           3 | 2024-01-15 10:40:00 | (1, Alice Smith, 
[email protected]) | NULL                        |
++----+--------------+-------------+---------------------+----------------------------------+--------------------------------------+
+```
+
+#### Accessing Nested Fields
+
+You can access individual fields from the `before` and `after` structures:
+
+```sql title="Flink SQL"
+SELECT
+    _change_type,
+    _commit_timestamp,
+    `before`.name AS old_name,
+    `after`.name AS new_name
+FROM users$binlog
+WHERE _change_type = 'update';
 ```
 
 ### Startup Modes
 
-The changelog virtual table supports different startup modes to control where 
reading begins:
+
+| Mode | Description |
+|------|-------------|
+| `earliest` | Start reading from the beginning of the log |
+| `latest` | Start reading from the current end of the log (only new changes) |
+| `timestamp` | Start reading from a specific timestamp (milliseconds since 
epoch) |
+
+
+The virtual table supports different startup modes to control where reading 
begins:
 
 ```sql title="Flink SQL"
 -- Read from the beginning (default)
 SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'earliest') 
*/;
 
 -- Read only new changes from now
-SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'latest') */;
+SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'latest') */;
 
 -- Read from a specific timestamp
 SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 
'scan.startup.timestamp' = '1705312200000') */;
 ```
 
-| Mode | Description |
-|------|-------------|
-| `earliest` | Start reading from the beginning of the log |
-| `latest` | Start reading from the current end of the log (only new changes) |
-| `timestamp` | Start reading from a specific timestamp (milliseconds since 
epoch) |
 
 ### Limitations
-- Projection & partition & predicate pushdowns are not supported yet. This 
will be addressed in future releases.
+- Projection & partition & predicate pushdowns are not supported yet.

Review Comment:
   Yes, they aren't. We will work on this in 0.10, currently you can select 
specific columns via flink sql level but that will not reflect in storage 
level, hence no projection pushdown aswell.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to