Posting this to dev as well as it potentially has some implications on
development effort.

What seems to be the problem here is that we cannot control/override
Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you
cannot create a PRIMARY KEY on the view but I think the temporal join also
should not require the PK, should we remove this limitation?

The general problem is the inflexibility of the timestamp/watermark
handling on query outputs, which makes this again impossible.

The workaround here can be to write the rolling aggregate to Kafka, read it
back again and join with that. The fact that this workaround is possible
actually highlights the need for more flexibility on the query/view side in
my opinion.

Has anyone else run into this issue and considered the proper solution to
the problem? Feels like it must be pretty common :)


On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley <>

> Hi,
> I have been trying to write a temporal join in SQL done on a rolling
> aggregate view. However it does not work and throws :
> org.apache.flink.table.api.ValidationException: Event-Time Temporal Table
> Join requires both primary key and row time attribute in versioned table,
> but no row time attribute can be found.
> It seems that after the aggregation, the table looses the watermark and
> it's not possible to add one with the SQL API as it's a view.
> CREATE TABLE orders (
>     order_id INT,
>     price DECIMAL(6, 2),
>     currency_id INT,
>     order_time AS NOW(),
>     WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> )
>     WITH (
>         'connector' = 'datagen',
>         'rows-per-second' = '10',
>         'fields.order_id.kind' = 'sequence',
>         'fields.order_id.start' = '1',
>         'fields.order_id.end' = '100000',
>         'fields.currency_id.min' = '1',
>         'fields.currency_id.max' = '20'
>     );
> CREATE TABLE currency_rates (
>     currency_id INT,
>     conversion_rate DECIMAL(4, 3),
>     PRIMARY KEY (currency_id) NOT ENFORCED
> )
>     WITH (
>         'connector' = 'datagen',
>         'rows-per-second' = '10',
>         'fields.currency_id.min' = '1',
>         'fields.currency_id.max' = '20'
>     );
>     SELECT
>         currency_id,
>         MAX(conversion_rate) AS max_rate
>     FROM currency_rates
>     GROUP BY currency_id
> );
> CREATE TEMPORARY VIEW temporal_join AS (
>     SELECT
>         order_id,
>         max_rates.max_rate
>     FROM orders
>          LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
>          ON orders.currency_id = max_rates.currency_id
> );
> SELECT * FROM temporal_join;
> Am I missing something? What would be a good starting point to address
> this?
> Thanks in advance,
> Sébastien Chevalley

Reply via email to