I'm trying to understand how watermarks work in FlinkSQL. I’ve created the
following tables:
CREATE TABLE currency_rates (
currency STRING,
conversion_rate STRING,
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'currency_rates1',
'key.format' = 'raw',
'value.format' = 'avro',
'properties.auto.offset.reset' = 'earliest',
'value.fields-include' = 'ALL',
'scan.watermark.idle-timeout' = '1000'
);
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'orders1',
'value.format' = 'avro',
'properties.auto.offset.reset' = 'latest',
'value.fields-include' = 'ALL'
);
To test this, I run the following query:
SELECT
orders.order_id,
orders.price,
orders.currency,
currency_rates.conversion_rate,
orders.order_time,
currency_rates.update_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
These are the inserts I’m making:
INSERT INTO currency_rates VALUES ('USD', 'value 1', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD001', 100.00, 'USD', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD002', 100.00, 'USD', CURRENT_TIMESTAMP);
The following result is displayed:
order_idpricecurrencyconversion_rateorder_timeupdate_time
ORD001 100.00 USD value 1 2024-10-29 21:42:41.630 2024-10-29 21:41:59.279
Then, I run:
INSERT INTO currency_rates VALUES ('USD', 'value 2', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD007', 2000.00, 'USD', CURRENT_TIMESTAMP
- interval '5' minutes);
INSERT INTO orders VALUES ('ORD003', 100.00, 'USD', CURRENT_TIMESTAMP);
And:
order_idpricecurrencyconversion_rateorder_timeupdate_time
ORD001 100.00 USD value 1 2024-10-29 21:42:41.630 2024-10-29 21:41:59.279
ORD002 100.00 USD value 1 2024-10-29 21:42:46.936 2024-10-29 21:41:59.279
ORD007 2000.00 USD <NULL> 2024-10-29 21:39:32.560 <NULL>. <<<<<-----
I don't understand why record ORD007 appears, as it was inserted with a
5-minute delay, so I thought it should not be included because it came late.