Re: Temporal join on rolling aggregate

2024-03-26 Thread Matthias Broecheler
Hey Sebastien et al,

have you tried rewriting the rolling aggregate as a window-over query? A
window-over aggregation creates an append-only stream which should preserve
the timestamp/watermark of the source. You can then add a deduplication

to
create a versioned state that you can use in a temporal join. The partition
key of the deduplication becomes the primary key to join on.

I think what you are running into is that Flink creates a change-stream for
a group-by aggregation that has retractions. Temporal joins only work on
"versioned state" or lookup tables.

However, I think you have a valid point in a more foundational limitation
of FlinkSQL: It is currently not possible to create an append-only stream
from a change stream without going through the DataStream API. It would be
incredibly useful to support this natively in FlinkSQL. And Calcite has
support for the STREAM 
keyword to do this. Materialize (i.e. differential dataflow) has a somewhat
related construct in their SQL called a subscription
.

We added support for this in DataSQRL

(which
is a streaming database compiler that generates Flink jobs) using a syntax
that looks like this:
STREAM ON UPDATE AS (*YOUR BASE QUERY*)
to create a append-only stream from a change-stream by essentially dropping
all retractions and deletes (you can also do STREAM ON DELETE to get only
deletes, etc). However, I think this might be a feature that should live in
FlinkSQL instead and we'd be happy to create a FLIP and donate our
implementation if there is interest.

Cheers,
Matthias

On Mon, Mar 18, 2024 at 3:01 AM Sebastien  wrote:

> Hi everyone,
>
> Before digging into what it would it take to implement a general solution,
> I narrowed down the scope to write a fix which makes the query mentioned in
> the thread work. Here are some findings:
>
> - For the temporal join logic, it's not the watermark that matters but
> having a TimeIndicatorRelDataType column in the input relation. To address
> that, in the PR below, we introduced a call to the LAST_VALUE aggregate
> function to bring a timestamp column to the view. That makes the query
> works, but we think it is not enough. It would probably require a distinct
> aggregate function or a new syntax to be able to support more general use
> cases.
> - There is a relationship between the way the logical operators are
> reordered, the way the special Flink's Timestamp time is materialized and
> the watermark assigner.
> - I also looked into the flink-sql-parser and I found out that Flink has
> customized the parsing of the CREATE and DROP statements (
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd#L627-L638)
> (and Calcite supports as well support customizations for ALTER statements)
> but Calcite does not seem to support changes to the SELECT syntax (see
> https://issues.apache.org/jira/browse/CALCITE-4979). I mention it because
> I think it will inform what could be done syntax-wise.
>
> and a PR that highlights the changes with the complete investigation
> https://github.com/apache/flink/pull/24506
>
> This is more of a demonstration and I am looking to get feedback from
> someone who has more experience with the codebase.
>
> Thanks,
> Seb
>
> On Tue, Mar 5, 2024, at 10:07, Gyula Fóra wrote:
>
> Hi Everyone!
>
> I have discussed this with Sébastien Chevalley, he is going to prepare and
> drive the FLIP while I will assist him along the way.
>
> Thanks
> Gyula
>
> On Tue, Mar 5, 2024 at 9:57 AM  wrote:
>
> I do agree with Ron Liu.
> This would definitely need a FLIP as it would impact SQL and extend it
> with the equivalent of TimestampAssigners in the Java API.
>
> Is there any existing JIRA here, or is anybody willing to drive a FLIP?
> On Feb 26, 2024 at 02:36 +0100, Ron liu , wrote:
>
> +1,
> But I think this should be a more general requirement, that is, support for
> declaring watermarks in query, which can be declared for any type of
> source, such as table, view. Similar to databricks provided [1], this needs
> a FLIP.
>
> [1]
>
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html
>
> Best,
> Ron
>
>
>


Re: Temporal join on rolling aggregate

2024-03-18 Thread Sebastien
Hi everyone,

Before digging into what it would it take to implement a general solution, I 
narrowed down the scope to write a fix which makes the query mentioned in the 
thread work. Here are some findings:

- For the temporal join logic, it's not the watermark that matters but having a 
TimeIndicatorRelDataType column in the input relation. To address that, in the 
PR below, we introduced a call to the LAST_VALUE aggregate function to bring a 
timestamp column to the view. That makes the query works, but we think it is 
not enough. It would probably require a distinct aggregate function or a new 
syntax to be able to support more general use cases.
- There is a relationship between the way the logical operators are reordered, 
the way the special Flink's Timestamp time is materialized and the watermark 
assigner.
- I also looked into the flink-sql-parser and I found out that Flink has 
customized the parsing of the CREATE and DROP statements 
(https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd#L627-L638)
 (and Calcite supports as well support customizations for ALTER statements) but 
Calcite does not seem to support changes to the SELECT syntax (see 
https://issues.apache.org/jira/browse/CALCITE-4979). I mention it because I 
think it will inform what could be done syntax-wise.

and a PR that highlights the changes with the complete investigation 
https://github.com/apache/flink/pull/24506

This is more of a demonstration and I am looking to get feedback from someone 
who has more experience with the codebase.

Thanks,
Seb

On Tue, Mar 5, 2024, at 10:07, Gyula Fóra wrote:
> Hi Everyone!
> 
> I have discussed this with Sébastien Chevalley, he is going to prepare and 
> drive the FLIP while I will assist him along the way.
> 
> Thanks
> Gyula
> 
> On Tue, Mar 5, 2024 at 9:57 AM  wrote:
>> I do agree with Ron Liu.
>> This would definitely need a FLIP as it would impact SQL and extend it with 
>> the equivalent of TimestampAssigners in the Java API.
>> 
>> Is there any existing JIRA here, or is anybody willing to drive a FLIP?
>> On Feb 26, 2024 at 02:36 +0100, Ron liu , wrote:
>>> +1,
>>> But I think this should be a more general requirement, that is, support for
>>> declaring watermarks in query, which can be declared for any type of
>>> source, such as table, view. Similar to databricks provided [1], this needs
>>> a FLIP.
>>> 
>>> [1]
>>> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html
>>> 
>>> Best,
>>> Ron


Re: Temporal join on rolling aggregate

2024-03-05 Thread Gyula Fóra
Hi Everyone!

I have discussed this with Sébastien Chevalley, he is going to prepare and
drive the FLIP while I will assist him along the way.

Thanks
Gyula

On Tue, Mar 5, 2024 at 9:57 AM  wrote:

> I do agree with Ron Liu.
> This would definitely need a FLIP as it would impact SQL and extend it
> with the equivalent of TimestampAssigners in the Java API.
>
> Is there any existing JIRA here, or is anybody willing to drive a FLIP?
> On Feb 26, 2024 at 02:36 +0100, Ron liu , wrote:
>
> +1,
> But I think this should be a more general requirement, that is, support for
> declaring watermarks in query, which can be declared for any type of
> source, such as table, view. Similar to databricks provided [1], this needs
> a FLIP.
>
> [1]
>
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html
>
> Best,
> Ron
>
>


Re: Temporal join on rolling aggregate

2024-03-05 Thread lorenzo.affetti.ververica.com via user
I do agree with Ron Liu.
This would definitely need a FLIP as it would impact SQL and extend it with the 
equivalent of TimestampAssigners in the Java API.

Is there any existing JIRA here, or is anybody willing to drive a FLIP?
On Feb 26, 2024 at 02:36 +0100, Ron liu , wrote:
> +1,
> But I think this should be a more general requirement, that is, support for
> declaring watermarks in query, which can be declared for any type of
> source, such as table, view. Similar to databricks provided [1], this needs
> a FLIP.
>
> [1]
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html
>
> Best,
> Ron


Re: Temporal join on rolling aggregate

2024-02-25 Thread Ron liu
+1,
But I think this should be a more general requirement, that is, support for
declaring watermarks in query, which can be declared for any type of
source, such as table, view. Similar to databricks provided [1], this needs
a FLIP.

[1]
https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html

Best,
Ron


Re: Temporal join on rolling aggregate

2024-02-23 Thread Feng Jin
+1 Support this feature. There are many limitations to using time window
aggregation currently, and if we can declare watermark and time attribute
on the view, it will make it easier for us to use time windows. Similarly,
it would be very useful if the primary key could be declared in the view.

Therefore, I believe we need a FLIP to detail the design of this feature.


Best,
Feng

On Fri, Feb 23, 2024 at 2:39 PM  wrote:

> +1 for supporting defining time attributes on views.
>
> I once encountered the same problem as yours. I did some regular joins and
> lost time attribute, and hence I could no longer do window operations in
> subsequent logics. I had to output the joined view to Kafka, read from it
> again, and define watermark on the new source - a cubersome workaround.
>
> It would be more flexible if we could control time attribute / watermark
> on views, just as if it's some kind of special source.
>
> Thanks,
> Yaming
> 在 Feb 22, 2024, 7:46 PM +0800,Gyula Fóra ,写道:
> > Posting this to dev as well as it potentially has some implications on
> development effort.
> >
> > What seems to be the problem here is that we cannot control/override
> Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you
> cannot create a PRIMARY KEY on the view but I think the temporal join also
> should not require the PK, should we remove this limitation?
> >
> > The general problem is the inflexibility of the timestamp/watermark
> handling on query outputs, which makes this again impossible.
> >
> > The workaround here can be to write the rolling aggregate to Kafka, read
> it back again and join with that. The fact that this workaround is possible
> actually highlights the need for more flexibility on the query/view side in
> my opinion.
> >
> > Has anyone else run into this issue and considered the proper solution
> to the problem? Feels like it must be pretty common :)
> >
> > Cheers,
> > Gyula
> >
> >
> >
> >
> > > On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley 
> wrote:
> > > > Hi,
> > > >
> > > > I have been trying to write a temporal join in SQL done on a rolling
> aggregate view. However it does not work and throws :
> > > >
> > > > org.apache.flink.table.api.ValidationException: Event-Time Temporal
> Table Join requires both primary key and row time attribute in versioned
> table, but no row time attribute can be found.
> > > >
> > > > It seems that after the aggregation, the table looses the watermark
> and it's not possible to add one with the SQL API as it's a view.
> > > >
> > > > CREATE TABLE orders (
> > > > order_id INT,
> > > > price DECIMAL(6, 2),
> > > > currency_id INT,
> > > > order_time AS NOW(),
> > > > WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> > > > )
> > > > WITH (
> > > > 'connector' = 'datagen',
> > > > 'rows-per-second' = '10',
> > > > 'fields.order_id.kind' = 'sequence',
> > > > 'fields.order_id.start' = '1',
> > > > 'fields.order_id.end' = '10',
> > > > 'fields.currency_id.min' = '1',
> > > > 'fields.currency_id.max' = '20'
> > > > );
> > > >
> > > > CREATE TABLE currency_rates (
> > > > currency_id INT,
> > > > conversion_rate DECIMAL(4, 3),
> > > > PRIMARY KEY (currency_id) NOT ENFORCED
> > > > )
> > > > WITH (
> > > > 'connector' = 'datagen',
> > > > 'rows-per-second' = '10',
> > > > 'fields.currency_id.min' = '1',
> > > > 'fields.currency_id.max' = '20'
> > > > );
> > > >
> > > > CREATE TEMPORARY VIEW max_rates AS (
> > > > SELECT
> > > > currency_id,
> > > > MAX(conversion_rate) AS max_rate
> > > > FROM currency_rates
> > > > GROUP BY currency_id
> > > > );
> > > >
> > > > CREATE TEMPORARY VIEW temporal_join AS (
> > > > SELECT
> > > > order_id,
> > > > max_rates.max_rate
> > > > FROM orders
> > > >  LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
> > > >  ON orders.currency_id = max_rates.currency_id
> > > > );
> > > >
> > > > SELECT * FROM temporal_join;
> > > >
> > > > Am I missing something? What would be a good starting point to
> address this?
> > > >
> > > > Thanks in advance,
> > > > Sébastien Chevalley
>


Re: Temporal join on rolling aggregate

2024-02-22 Thread mayaming1983
+1 for supporting defining time attributes on views.

I once encountered the same problem as yours. I did some regular joins and lost 
time attribute, and hence I could no longer do window operations in subsequent 
logics. I had to output the joined view to Kafka, read from it again, and 
define watermark on the new source - a cubersome workaround.

It would be more flexible if we could control time attribute / watermark on 
views, just as if it's some kind of special source.

Thanks,
Yaming
在 Feb 22, 2024, 7:46 PM +0800,Gyula Fóra ,写道:
> Posting this to dev as well as it potentially has some implications on 
> development effort.
>
> What seems to be the problem here is that we cannot control/override 
> Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you 
> cannot create a PRIMARY KEY on the view but I think the temporal join also 
> should not require the PK, should we remove this limitation?
>
> The general problem is the inflexibility of the timestamp/watermark handling 
> on query outputs, which makes this again impossible.
>
> The workaround here can be to write the rolling aggregate to Kafka, read it 
> back again and join with that. The fact that this workaround is possible 
> actually highlights the need for more flexibility on the query/view side in 
> my opinion.
>
> Has anyone else run into this issue and considered the proper solution to the 
> problem? Feels like it must be pretty common :)
>
> Cheers,
> Gyula
>
>
>
>
> > On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley  
> > wrote:
> > > Hi,
> > >
> > > I have been trying to write a temporal join in SQL done on a rolling 
> > > aggregate view. However it does not work and throws :
> > >
> > > org.apache.flink.table.api.ValidationException: Event-Time Temporal Table 
> > > Join requires both primary key and row time attribute in versioned table, 
> > > but no row time attribute can be found.
> > >
> > > It seems that after the aggregation, the table looses the watermark and 
> > > it's not possible to add one with the SQL API as it's a view.
> > >
> > > CREATE TABLE orders (
> > >     order_id INT,
> > >     price DECIMAL(6, 2),
> > >     currency_id INT,
> > >     order_time AS NOW(),
> > >     WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> > > )
> > >     WITH (
> > >         'connector' = 'datagen',
> > >         'rows-per-second' = '10',
> > >         'fields.order_id.kind' = 'sequence',
> > >         'fields.order_id.start' = '1',
> > >         'fields.order_id.end' = '10',
> > >         'fields.currency_id.min' = '1',
> > >         'fields.currency_id.max' = '20'
> > >     );
> > >
> > > CREATE TABLE currency_rates (
> > >     currency_id INT,
> > >     conversion_rate DECIMAL(4, 3),
> > >     PRIMARY KEY (currency_id) NOT ENFORCED
> > > )
> > >     WITH (
> > >         'connector' = 'datagen',
> > >         'rows-per-second' = '10',
> > >         'fields.currency_id.min' = '1',
> > >         'fields.currency_id.max' = '20'
> > >     );
> > >
> > > CREATE TEMPORARY VIEW max_rates AS (
> > >     SELECT
> > >         currency_id,
> > >         MAX(conversion_rate) AS max_rate
> > >     FROM currency_rates
> > >     GROUP BY currency_id
> > > );
> > >
> > > CREATE TEMPORARY VIEW temporal_join AS (
> > >     SELECT
> > >         order_id,
> > >         max_rates.max_rate
> > >     FROM orders
> > >          LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
> > >          ON orders.currency_id = max_rates.currency_id
> > > );
> > >
> > > SELECT * FROM temporal_join;
> > >
> > > Am I missing something? What would be a good starting point to address 
> > > this?
> > >
> > > Thanks in advance,
> > > Sébastien Chevalley


Re: Temporal join on rolling aggregate

2024-02-22 Thread Gyula Fóra
Posting this to dev as well as it potentially has some implications on
development effort.

What seems to be the problem here is that we cannot control/override
Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you
cannot create a PRIMARY KEY on the view but I think the temporal join also
should not require the PK, should we remove this limitation?

The general problem is the inflexibility of the timestamp/watermark
handling on query outputs, which makes this again impossible.

The workaround here can be to write the rolling aggregate to Kafka, read it
back again and join with that. The fact that this workaround is possible
actually highlights the need for more flexibility on the query/view side in
my opinion.

Has anyone else run into this issue and considered the proper solution to
the problem? Feels like it must be pretty common :)

Cheers,
Gyula




On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley 
wrote:

> Hi,
>
> I have been trying to write a temporal join in SQL done on a rolling
> aggregate view. However it does not work and throws :
>
> org.apache.flink.table.api.ValidationException: Event-Time Temporal Table
> Join requires both primary key and row time attribute in versioned table,
> but no row time attribute can be found.
>
> It seems that after the aggregation, the table looses the watermark and
> it's not possible to add one with the SQL API as it's a view.
>
> CREATE TABLE orders (
> order_id INT,
> price DECIMAL(6, 2),
> currency_id INT,
> order_time AS NOW(),
> WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> )
> WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.order_id.kind' = 'sequence',
> 'fields.order_id.start' = '1',
> 'fields.order_id.end' = '10',
> 'fields.currency_id.min' = '1',
> 'fields.currency_id.max' = '20'
> );
>
> CREATE TABLE currency_rates (
> currency_id INT,
> conversion_rate DECIMAL(4, 3),
> PRIMARY KEY (currency_id) NOT ENFORCED
> )
> WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.currency_id.min' = '1',
> 'fields.currency_id.max' = '20'
> );
>
> CREATE TEMPORARY VIEW max_rates AS (
> SELECT
> currency_id,
> MAX(conversion_rate) AS max_rate
> FROM currency_rates
> GROUP BY currency_id
> );
>
> CREATE TEMPORARY VIEW temporal_join AS (
> SELECT
> order_id,
> max_rates.max_rate
> FROM orders
>  LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
>  ON orders.currency_id = max_rates.currency_id
> );
>
> SELECT * FROM temporal_join;
>
> Am I missing something? What would be a good starting point to address
> this?
>
> Thanks in advance,
> Sébastien Chevalley


Temporal join on rolling aggregate

2024-02-21 Thread Sébastien Chevalley
Hi,

I have been trying to write a temporal join in SQL done on a rolling aggregate 
view. However it does not work and throws :

org.apache.flink.table.api.ValidationException: Event-Time Temporal Table Join 
requires both primary key and row time attribute in versioned table, but no row 
time attribute can be found.

It seems that after the aggregation, the table looses the watermark and it's 
not possible to add one with the SQL API as it's a view.

CREATE TABLE orders (
order_id INT,
price DECIMAL(6, 2),
currency_id INT,
order_time AS NOW(),
WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.order_id.kind' = 'sequence',
'fields.order_id.start' = '1',
'fields.order_id.end' = '10',
'fields.currency_id.min' = '1',
'fields.currency_id.max' = '20'
);

CREATE TABLE currency_rates (
currency_id INT,
conversion_rate DECIMAL(4, 3),
PRIMARY KEY (currency_id) NOT ENFORCED
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.currency_id.min' = '1',
'fields.currency_id.max' = '20'
);

CREATE TEMPORARY VIEW max_rates AS (
SELECT
currency_id,
MAX(conversion_rate) AS max_rate
FROM currency_rates
GROUP BY currency_id
);

CREATE TEMPORARY VIEW temporal_join AS (
SELECT
order_id,
max_rates.max_rate
FROM orders
 LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
 ON orders.currency_id = max_rates.currency_id
);

SELECT * FROM temporal_join;

Am I missing something? What would be a good starting point to address this?

Thanks in advance,
Sébastien Chevalley