+1 for supporting defining time attributes on views. I once encountered the same problem as yours. I did some regular joins and lost time attribute, and hence I could no longer do window operations in subsequent logics. I had to output the joined view to Kafka, read from it again, and define watermark on the new source - a cubersome workaround.
It would be more flexible if we could control time attribute / watermark on views, just as if it's some kind of special source. Thanks, Yaming 在 Feb 22, 2024, 7:46 PM +0800,Gyula Fóra <gyula.f...@gmail.com>,写道: > Posting this to dev as well as it potentially has some implications on > development effort. > > What seems to be the problem here is that we cannot control/override > Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you > cannot create a PRIMARY KEY on the view but I think the temporal join also > should not require the PK, should we remove this limitation? > > The general problem is the inflexibility of the timestamp/watermark handling > on query outputs, which makes this again impossible. > > The workaround here can be to write the rolling aggregate to Kafka, read it > back again and join with that. The fact that this workaround is possible > actually highlights the need for more flexibility on the query/view side in > my opinion. > > Has anyone else run into this issue and considered the proper solution to the > problem? Feels like it must be pretty common :) > > Cheers, > Gyula > > > > > > On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley <s...@erreur404.ch> > > wrote: > > > Hi, > > > > > > I have been trying to write a temporal join in SQL done on a rolling > > > aggregate view. However it does not work and throws : > > > > > > org.apache.flink.table.api.ValidationException: Event-Time Temporal Table > > > Join requires both primary key and row time attribute in versioned table, > > > but no row time attribute can be found. > > > > > > It seems that after the aggregation, the table looses the watermark and > > > it's not possible to add one with the SQL API as it's a view. > > > > > > CREATE TABLE orders ( > > > order_id INT, > > > price DECIMAL(6, 2), > > > currency_id INT, > > > order_time AS NOW(), > > > WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND > > > ) > > > WITH ( > > > 'connector' = 'datagen', > > > 'rows-per-second' = '10', > > > 'fields.order_id.kind' = 'sequence', > > > 'fields.order_id.start' = '1', > > > 'fields.order_id.end' = '100000', > > > 'fields.currency_id.min' = '1', > > > 'fields.currency_id.max' = '20' > > > ); > > > > > > CREATE TABLE currency_rates ( > > > currency_id INT, > > > conversion_rate DECIMAL(4, 3), > > > PRIMARY KEY (currency_id) NOT ENFORCED > > > ) > > > WITH ( > > > 'connector' = 'datagen', > > > 'rows-per-second' = '10', > > > 'fields.currency_id.min' = '1', > > > 'fields.currency_id.max' = '20' > > > ); > > > > > > CREATE TEMPORARY VIEW max_rates AS ( > > > SELECT > > > currency_id, > > > MAX(conversion_rate) AS max_rate > > > FROM currency_rates > > > GROUP BY currency_id > > > ); > > > > > > CREATE TEMPORARY VIEW temporal_join AS ( > > > SELECT > > > order_id, > > > max_rates.max_rate > > > FROM orders > > > LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time > > > ON orders.currency_id = max_rates.currency_id > > > ); > > > > > > SELECT * FROM temporal_join; > > > > > > Am I missing something? What would be a good starting point to address > > > this? > > > > > > Thanks in advance, > > > Sébastien Chevalley