+1 for supporting defining time attributes on views.

I once encountered the same problem as yours. I did some regular joins and lost 
time attribute, and hence I could no longer do window operations in subsequent 
logics. I had to output the joined view to Kafka, read from it again, and 
define watermark on the new source - a cubersome workaround.

It would be more flexible if we could control time attribute / watermark on 
views, just as if it's some kind of special source.

Thanks,
Yaming
在 Feb 22, 2024, 7:46 PM +0800,Gyula Fóra <gyula.f...@gmail.com>,写道:
> Posting this to dev as well as it potentially has some implications on 
> development effort.
>
> What seems to be the problem here is that we cannot control/override 
> Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you 
> cannot create a PRIMARY KEY on the view but I think the temporal join also 
> should not require the PK, should we remove this limitation?
>
> The general problem is the inflexibility of the timestamp/watermark handling 
> on query outputs, which makes this again impossible.
>
> The workaround here can be to write the rolling aggregate to Kafka, read it 
> back again and join with that. The fact that this workaround is possible 
> actually highlights the need for more flexibility on the query/view side in 
> my opinion.
>
> Has anyone else run into this issue and considered the proper solution to the 
> problem? Feels like it must be pretty common :)
>
> Cheers,
> Gyula
>
>
>
>
> > On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley <s...@erreur404.ch> 
> > wrote:
> > > Hi,
> > >
> > > I have been trying to write a temporal join in SQL done on a rolling 
> > > aggregate view. However it does not work and throws :
> > >
> > > org.apache.flink.table.api.ValidationException: Event-Time Temporal Table 
> > > Join requires both primary key and row time attribute in versioned table, 
> > > but no row time attribute can be found.
> > >
> > > It seems that after the aggregation, the table looses the watermark and 
> > > it's not possible to add one with the SQL API as it's a view.
> > >
> > > CREATE TABLE orders (
> > >     order_id INT,
> > >     price DECIMAL(6, 2),
> > >     currency_id INT,
> > >     order_time AS NOW(),
> > >     WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> > > )
> > >     WITH (
> > >         'connector' = 'datagen',
> > >         'rows-per-second' = '10',
> > >         'fields.order_id.kind' = 'sequence',
> > >         'fields.order_id.start' = '1',
> > >         'fields.order_id.end' = '100000',
> > >         'fields.currency_id.min' = '1',
> > >         'fields.currency_id.max' = '20'
> > >     );
> > >
> > > CREATE TABLE currency_rates (
> > >     currency_id INT,
> > >     conversion_rate DECIMAL(4, 3),
> > >     PRIMARY KEY (currency_id) NOT ENFORCED
> > > )
> > >     WITH (
> > >         'connector' = 'datagen',
> > >         'rows-per-second' = '10',
> > >         'fields.currency_id.min' = '1',
> > >         'fields.currency_id.max' = '20'
> > >     );
> > >
> > > CREATE TEMPORARY VIEW max_rates AS (
> > >     SELECT
> > >         currency_id,
> > >         MAX(conversion_rate) AS max_rate
> > >     FROM currency_rates
> > >     GROUP BY currency_id
> > > );
> > >
> > > CREATE TEMPORARY VIEW temporal_join AS (
> > >     SELECT
> > >         order_id,
> > >         max_rates.max_rate
> > >     FROM orders
> > >          LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
> > >          ON orders.currency_id = max_rates.currency_id
> > > );
> > >
> > > SELECT * FROM temporal_join;
> > >
> > > Am I missing something? What would be a good starting point to address 
> > > this?
> > >
> > > Thanks in advance,
> > > Sébastien Chevalley

Reply via email to