RE: Re: Can temporal table functions only be registered using the table API?

2022-10-17 Thread Salva Alcántara
Hi David,

Many thanks for your reply. I understand then that there is no easy way to
do a simple processing-time join (purely based on SQL without using the
table API) where you:

- Save elements seen on the right in the current state (in general this
state can be regarded as a materialised view, e.g., could be a value but
also a list)
- Perform a join operation for each element coming on the left based on the
current state (only consider new events here, and simply discard elements
without producing a join result if there no match based on current state)

along the lines of that in the TemporalProcessTimeJoinOperator.java link
that you provided, or, if you like, the typical DIY type of joins that one
normally/easily does when using the DataStream API.

Regards,

Salva

On 2022/10/06 17:23:30 David Anderson wrote:
> I was wrong about this. The AS OF style processing join has been disabled
> at a higher level,
> in
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin#createJoinOperator
>
> David
>
> On Thu, Oct 6, 2022 at 9:59 AM David Anderson  wrote:
>
> > Salva,
> >
> > Have you tried doing an AS OF style processing time temporal join? I
know
> > the documentation leads one to believe this isn't supported, but I
think it
> > actually works. I'm basing this on this comment [1] in the code for
> > the TemporalProcessTimeJoinOperator:
> >
> > The operator to temporal join a stream on processing time.
> >
> > For temporal TableFunction join (LATERAL
> >> TemporalTableFunction(o.proctime)) and temporal table join (FOR
SYSTEM_TIME
> >> AS OF), they can reuse same processing-time operator implementation,
the
> >> differences between them are: (1) The temporal TableFunction join only
> >> supports single column in primary key but temporal table join supports
> >> arbitrary columns in primary key. (2) The temporal TableFunction join
only
> >> supports inner join, temporal table join supports both inner join and
left
> >> outer join.
> >
> >
> > [1]
> >
https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
> >
> > Regards,
> > David
> >
> > On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara 
> > wrote:
> >
> >> I've found more examples here:
> >>
> >>
https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
> >>
> >> where a fact table is enriched using several dimension tables, but
again
> >> the temporal table functions are registered using Table API like so:
> >>
> >> ```java
> >> tEnv.registerFunction(
> >> "dimension_table1",
> >>
tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
> >> "id"));```
> >>
> >> It's not exactly the same application, since this example covers a
lookup
> >> join, but the SQL query is also relying on the LATERAL TABLE + temporal
> >> table functions:
> >>
> >> ```
> >> SELECT
> >> D1.col1 AS A,
> >> D1.col2 AS B,
> >> FROM
> >> fact_table,
> >> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
> >> WHERE
> >> fact_table.dim1 = D1.id
> >> ```
> >>
> >> In particular, this produces a job which is equivalent to
> >>
> >> ```
> >>   private abstract static class AbstractFactDimTableJoin
> >>   extends CoProcessFunction {
> >> private static final long serialVersionUID = 1L;
> >>
> >> protected transient ValueState dimState;
> >>
> >> @Override
> >> public void processElement1(IN1 value, Context ctx, Collector
> >> out) throws Exception {
> >>   Dimension dim = dimState.value();
> >>   if (dim == null) {
> >> return;
> >>   }
> >>   out.collect(join(value, dim));
> >> }
> >>
> >> abstract OUT join(IN1 value, Dimension dim);
> >>
> >> @Override
> >> public void processElement2(Dimension value, Context ctx,
> >> Collector out) throws Exception {
> >>   dimState.update(value);
> >> }
> >>
> >> @Override
> >> public void open(Configuration parameters) throws Exception {
> >>   super.open(parameters);
> >>   ValueStateDescriptor dimStateDesc =
> >>   new ValueStateDescriptor<>("dimstate", Dimension.class);
> >>   this.dimState = getRuntimeContext().getState(dimStateDesc);
> >> }
> >>   }
> >> ```
> >>
> >> I'm basically interested in rewriting these types of DIY joins (based
on
> >> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
> >> possible, otherwise I would like to know which limitations there are.
> >>
> >> Regards,
> >>
> >> Salva
> >>
> >> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
> >> wrote:
> >>
> >>> By looking at the docs for older versions of Flink, e.g.,
> >>>
> >>>
> >>>
https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
> >>>
> >>> it seems that it's possible to rewrite this query
> >>>
> >>> ```
> >>> SELECT
> >>>   o.amount * r.rate AS amount
> >>> FROM
> >>>   Orders AS o,
> >>>   

Re: Can temporal table functions only be registered using the table API?

2022-10-06 Thread David Anderson
I was wrong about this. The AS OF style processing join has been disabled
at a higher level,
in 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin#createJoinOperator

David

On Thu, Oct 6, 2022 at 9:59 AM David Anderson  wrote:

> Salva,
>
> Have you tried doing an AS OF style processing time temporal join? I know
> the documentation leads one to believe this isn't supported, but I think it
> actually works. I'm basing this on this comment [1] in the code for
> the TemporalProcessTimeJoinOperator:
>
> The operator to temporal join a stream on processing time.
>
> For temporal TableFunction join (LATERAL
>> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME
>> AS OF), they can reuse same processing-time operator implementation, the
>> differences between them are: (1) The temporal TableFunction join only
>> supports single column in primary key but temporal table join supports
>> arbitrary columns in primary key. (2) The temporal TableFunction join only
>> supports inner join, temporal table join supports both inner join and left
>> outer join.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
>
> Regards,
> David
>
> On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara 
> wrote:
>
>> I've found more examples here:
>>
>> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>>
>> where a fact table is enriched using several dimension tables, but again
>> the temporal table functions are registered using Table API like so:
>>
>> ```java
>> tEnv.registerFunction(
>> "dimension_table1",
>> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
>> "id"));```
>>
>> It's not exactly the same application, since this example covers a lookup
>> join, but the SQL query is also relying on the LATERAL TABLE + temporal
>> table functions:
>>
>> ```
>> SELECT
>> D1.col1 AS A,
>> D1.col2 AS B,
>> FROM
>> fact_table,
>> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
>> WHERE
>> fact_table.dim1 = D1.id
>> ```
>>
>> In particular, this produces a job which is equivalent to
>>
>> ```
>>   private abstract static class AbstractFactDimTableJoin
>>   extends CoProcessFunction {
>> private static final long serialVersionUID = 1L;
>>
>> protected transient ValueState dimState;
>>
>> @Override
>> public void processElement1(IN1 value, Context ctx, Collector
>> out) throws Exception {
>>   Dimension dim = dimState.value();
>>   if (dim == null) {
>> return;
>>   }
>>   out.collect(join(value, dim));
>> }
>>
>> abstract OUT join(IN1 value, Dimension dim);
>>
>> @Override
>> public void processElement2(Dimension value, Context ctx,
>> Collector out) throws Exception {
>>   dimState.update(value);
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>   super.open(parameters);
>>   ValueStateDescriptor dimStateDesc =
>>   new ValueStateDescriptor<>("dimstate", Dimension.class);
>>   this.dimState = getRuntimeContext().getState(dimStateDesc);
>> }
>>   }
>> ```
>>
>> I'm basically interested in rewriting these types of DIY joins (based on
>> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
>> possible, otherwise I would like to know which limitations there are.
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
>> wrote:
>>
>>> By looking at the docs for older versions of Flink, e.g.,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>>
>>> it seems that it's possible to rewrite this query
>>>
>>> ```
>>> SELECT
>>>   o.amount * r.rate AS amount
>>> FROM
>>>   Orders AS o,
>>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>>> WHERE r.currency = o.currency
>>> ```
>>>
>>> as
>>>
>>> ```
>>> SELECT
>>>   SUM(o.amount * r.rate) AS amount
>>> FROM Orders AS o,
>>>   RatesHistory AS r
>>> WHERE r.currency = o.currency
>>> AND r.rowtime = (
>>>   SELECT MAX(rowtime)
>>>   FROM RatesHistory AS r2
>>>   WHERE r2.currency = o.currency
>>>   AND r2.rowtime <= o.rowtime);
>>> ```
>>>
>>> This would be a way to accomplish this task in SQL without using a
>>> temporal table function.
>>>
>>> Would this rewrite be equivalent in terms of the final generated job?
>>> Obviously I very much prefer the LATERAL TABLE query but this requires
>>> using a temporal table function which can only be registered using the
>>> Table API (apparently).
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara 
>>> wrote:
>>>
 It doesn't seem the case with processing time unless I'm mistaken:


 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join

 

Re: Can temporal table functions only be registered using the table API?

2022-10-06 Thread David Anderson
As for your original question, the documentation states that a temporal
table function can only be registered via the Table API, and I believe this
is true.

David

On Thu, Oct 6, 2022 at 9:59 AM David Anderson  wrote:

> Salva,
>
> Have you tried doing an AS OF style processing time temporal join? I know
> the documentation leads one to believe this isn't supported, but I think it
> actually works. I'm basing this on this comment [1] in the code for
> the TemporalProcessTimeJoinOperator:
>
> The operator to temporal join a stream on processing time.
>
> For temporal TableFunction join (LATERAL
>> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME
>> AS OF), they can reuse same processing-time operator implementation, the
>> differences between them are: (1) The temporal TableFunction join only
>> supports single column in primary key but temporal table join supports
>> arbitrary columns in primary key. (2) The temporal TableFunction join only
>> supports inner join, temporal table join supports both inner join and left
>> outer join.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
>
> Regards,
> David
>
> On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara 
> wrote:
>
>> I've found more examples here:
>>
>> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>>
>> where a fact table is enriched using several dimension tables, but again
>> the temporal table functions are registered using Table API like so:
>>
>> ```java
>> tEnv.registerFunction(
>> "dimension_table1",
>> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
>> "id"));```
>>
>> It's not exactly the same application, since this example covers a lookup
>> join, but the SQL query is also relying on the LATERAL TABLE + temporal
>> table functions:
>>
>> ```
>> SELECT
>> D1.col1 AS A,
>> D1.col2 AS B,
>> FROM
>> fact_table,
>> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
>> WHERE
>> fact_table.dim1 = D1.id
>> ```
>>
>> In particular, this produces a job which is equivalent to
>>
>> ```
>>   private abstract static class AbstractFactDimTableJoin
>>   extends CoProcessFunction {
>> private static final long serialVersionUID = 1L;
>>
>> protected transient ValueState dimState;
>>
>> @Override
>> public void processElement1(IN1 value, Context ctx, Collector
>> out) throws Exception {
>>   Dimension dim = dimState.value();
>>   if (dim == null) {
>> return;
>>   }
>>   out.collect(join(value, dim));
>> }
>>
>> abstract OUT join(IN1 value, Dimension dim);
>>
>> @Override
>> public void processElement2(Dimension value, Context ctx,
>> Collector out) throws Exception {
>>   dimState.update(value);
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>   super.open(parameters);
>>   ValueStateDescriptor dimStateDesc =
>>   new ValueStateDescriptor<>("dimstate", Dimension.class);
>>   this.dimState = getRuntimeContext().getState(dimStateDesc);
>> }
>>   }
>> ```
>>
>> I'm basically interested in rewriting these types of DIY joins (based on
>> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
>> possible, otherwise I would like to know which limitations there are.
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
>> wrote:
>>
>>> By looking at the docs for older versions of Flink, e.g.,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>>
>>> it seems that it's possible to rewrite this query
>>>
>>> ```
>>> SELECT
>>>   o.amount * r.rate AS amount
>>> FROM
>>>   Orders AS o,
>>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>>> WHERE r.currency = o.currency
>>> ```
>>>
>>> as
>>>
>>> ```
>>> SELECT
>>>   SUM(o.amount * r.rate) AS amount
>>> FROM Orders AS o,
>>>   RatesHistory AS r
>>> WHERE r.currency = o.currency
>>> AND r.rowtime = (
>>>   SELECT MAX(rowtime)
>>>   FROM RatesHistory AS r2
>>>   WHERE r2.currency = o.currency
>>>   AND r2.rowtime <= o.rowtime);
>>> ```
>>>
>>> This would be a way to accomplish this task in SQL without using a
>>> temporal table function.
>>>
>>> Would this rewrite be equivalent in terms of the final generated job?
>>> Obviously I very much prefer the LATERAL TABLE query but this requires
>>> using a temporal table function which can only be registered using the
>>> Table API (apparently).
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara 
>>> wrote:
>>>
 It doesn't seem the case with processing time unless I'm mistaken:


 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join

 This case seems to require a 

Re: Can temporal table functions only be registered using the table API?

2022-10-06 Thread David Anderson
Salva,

Have you tried doing an AS OF style processing time temporal join? I know
the documentation leads one to believe this isn't supported, but I think it
actually works. I'm basing this on this comment [1] in the code for
the TemporalProcessTimeJoinOperator:

The operator to temporal join a stream on processing time.

For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime))
> and temporal table join (FOR SYSTEM_TIME AS OF), they can reuse same
> processing-time operator implementation, the differences between them are:
> (1) The temporal TableFunction join only supports single column in primary
> key but temporal table join supports arbitrary columns in primary key. (2)
> The temporal TableFunction join only supports inner join, temporal table
> join supports both inner join and left outer join.


[1]
https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38

Regards,
David

On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara 
wrote:

> I've found more examples here:
>
> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>
> where a fact table is enriched using several dimension tables, but again
> the temporal table functions are registered using Table API like so:
>
> ```java
> tEnv.registerFunction(
> "dimension_table1",
> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
> "id"));```
>
> It's not exactly the same application, since this example covers a lookup
> join, but the SQL query is also relying on the LATERAL TABLE + temporal
> table functions:
>
> ```
> SELECT
> D1.col1 AS A,
> D1.col2 AS B,
> FROM
> fact_table,
> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
> WHERE
> fact_table.dim1 = D1.id
> ```
>
> In particular, this produces a job which is equivalent to
>
> ```
>   private abstract static class AbstractFactDimTableJoin
>   extends CoProcessFunction {
> private static final long serialVersionUID = 1L;
>
> protected transient ValueState dimState;
>
> @Override
> public void processElement1(IN1 value, Context ctx, Collector
> out) throws Exception {
>   Dimension dim = dimState.value();
>   if (dim == null) {
> return;
>   }
>   out.collect(join(value, dim));
> }
>
> abstract OUT join(IN1 value, Dimension dim);
>
> @Override
> public void processElement2(Dimension value, Context ctx,
> Collector out) throws Exception {
>   dimState.update(value);
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
>   super.open(parameters);
>   ValueStateDescriptor dimStateDesc =
>   new ValueStateDescriptor<>("dimstate", Dimension.class);
>   this.dimState = getRuntimeContext().getState(dimStateDesc);
> }
>   }
> ```
>
> I'm basically interested in rewriting these types of DIY joins (based on
> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
> possible, otherwise I would like to know which limitations there are.
>
> Regards,
>
> Salva
>
> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
> wrote:
>
>> By looking at the docs for older versions of Flink, e.g.,
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>
>> it seems that it's possible to rewrite this query
>>
>> ```
>> SELECT
>>   o.amount * r.rate AS amount
>> FROM
>>   Orders AS o,
>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>> WHERE r.currency = o.currency
>> ```
>>
>> as
>>
>> ```
>> SELECT
>>   SUM(o.amount * r.rate) AS amount
>> FROM Orders AS o,
>>   RatesHistory AS r
>> WHERE r.currency = o.currency
>> AND r.rowtime = (
>>   SELECT MAX(rowtime)
>>   FROM RatesHistory AS r2
>>   WHERE r2.currency = o.currency
>>   AND r2.rowtime <= o.rowtime);
>> ```
>>
>> This would be a way to accomplish this task in SQL without using a
>> temporal table function.
>>
>> Would this rewrite be equivalent in terms of the final generated job?
>> Obviously I very much prefer the LATERAL TABLE query but this requires
>> using a temporal table function which can only be registered using the
>> Table API (apparently).
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara 
>> wrote:
>>
>>> It doesn't seem the case with processing time unless I'm mistaken:
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>>
>>> This case seems to require a different syntax based on LATERAL TABLE and
>>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs
>>> too, it seems that temporal table functions can only be registered via the
>>> table API. Am I missing/misunderstanding something?
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser 
>>> wrote:
>>>
 Hi Salva,

 The examples for temporal table 

Re: Can temporal table functions only be registered using the table API?

2022-10-04 Thread Salva Alcántara
I've found more examples here:

https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance

where a fact table is enriched using several dimension tables, but again
the temporal table functions are registered using Table API like so:

```java
tEnv.registerFunction(
"dimension_table1",
tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
"id"));```

It's not exactly the same application, since this example covers a lookup
join, but the SQL query is also relying on the LATERAL TABLE + temporal
table functions:

```
SELECT
D1.col1 AS A,
D1.col2 AS B,
FROM
fact_table,
LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
WHERE
fact_table.dim1 = D1.id
```

In particular, this produces a job which is equivalent to

```
  private abstract static class AbstractFactDimTableJoin
  extends CoProcessFunction {
private static final long serialVersionUID = 1L;

protected transient ValueState dimState;

@Override
public void processElement1(IN1 value, Context ctx, Collector out)
throws Exception {
  Dimension dim = dimState.value();
  if (dim == null) {
return;
  }
  out.collect(join(value, dim));
}

abstract OUT join(IN1 value, Dimension dim);

@Override
public void processElement2(Dimension value, Context ctx,
Collector out) throws Exception {
  dimState.update(value);
}

@Override
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  ValueStateDescriptor dimStateDesc =
  new ValueStateDescriptor<>("dimstate", Dimension.class);
  this.dimState = getRuntimeContext().getState(dimStateDesc);
}
  }
```

I'm basically interested in rewriting these types of DIY joins (based on
CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
possible, otherwise I would like to know which limitations there are.

Regards,

Salva

On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
wrote:

> By looking at the docs for older versions of Flink, e.g.,
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>
> it seems that it's possible to rewrite this query
>
> ```
> SELECT
>   o.amount * r.rate AS amount
> FROM
>   Orders AS o,
>   LATERAL TABLE (Rates(o.rowtime)) AS r
> WHERE r.currency = o.currency
> ```
>
> as
>
> ```
> SELECT
>   SUM(o.amount * r.rate) AS amount
> FROM Orders AS o,
>   RatesHistory AS r
> WHERE r.currency = o.currency
> AND r.rowtime = (
>   SELECT MAX(rowtime)
>   FROM RatesHistory AS r2
>   WHERE r2.currency = o.currency
>   AND r2.rowtime <= o.rowtime);
> ```
>
> This would be a way to accomplish this task in SQL without using a
> temporal table function.
>
> Would this rewrite be equivalent in terms of the final generated job?
> Obviously I very much prefer the LATERAL TABLE query but this requires
> using a temporal table function which can only be registered using the
> Table API (apparently).
>
> Regards,
>
> Salva
>
> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara 
> wrote:
>
>> It doesn't seem the case with processing time unless I'm mistaken:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>
>> This case seems to require a different syntax based on LATERAL TABLE and
>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs
>> too, it seems that temporal table functions can only be registered via the
>> table API. Am I missing/misunderstanding something?
>>
>> Salva
>>
>> On Tue, Oct 4, 2022, 19:26 Martijn Visser 
>> wrote:
>>
>>> Hi Salva,
>>>
>>> The examples for temporal table joins can be found at
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>> Your example is definitely possible with just using SQL.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara 
>>> wrote:
>>>
 Based on this:


 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/

 It seems that the only way of registering temporal table functions is
 via the Table API.

 If that is the case, is there a way to make this example work

 ```
 SELECT
   SUM(amount * rate) AS amount
 FROM
   orders,
   LATERAL TABLE (rates(order_time))
 WHERE
   rates.currency = orders.currency
 ```

 without the Table API, just using SQL? E.g., is it possible to deploy
 the temporal table function to the cluster (by packaging it in a jar file)
 and then run the above query from the Flink SQL CLI?

 Thanks in advance,

 Salva




Re: Can temporal table functions only be registered using the table API?

2022-10-04 Thread Salva Alcántara
By looking at the docs for older versions of Flink, e.g.,

https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html

it seems that it's possible to rewrite this query

```
SELECT
  o.amount * r.rate AS amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency
```

as

```
SELECT
  SUM(o.amount * r.rate) AS amount
FROM Orders AS o,
  RatesHistory AS r
WHERE r.currency = o.currency
AND r.rowtime = (
  SELECT MAX(rowtime)
  FROM RatesHistory AS r2
  WHERE r2.currency = o.currency
  AND r2.rowtime <= o.rowtime);
```

This would be a way to accomplish this task in SQL without using a temporal
table function.

Would this rewrite be equivalent in terms of the final generated job?
Obviously I very much prefer the LATERAL TABLE query but this requires
using a temporal table function which can only be registered using the
Table API (apparently).

Regards,

Salva

On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara 
wrote:

> It doesn't seem the case with processing time unless I'm mistaken:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>
> This case seems to require a different syntax based on LATERAL TABLE and a
> temporal table function (FOR SYSTEM_TIME is not supported). From the docs
> too, it seems that temporal table functions can only be registered via the
> table API. Am I missing/misunderstanding something?
>
> Salva
>
> On Tue, Oct 4, 2022, 19:26 Martijn Visser 
> wrote:
>
>> Hi Salva,
>>
>> The examples for temporal table joins can be found at
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>> Your example is definitely possible with just using SQL.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara 
>> wrote:
>>
>>> Based on this:
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>
>>> It seems that the only way of registering temporal table functions is
>>> via the Table API.
>>>
>>> If that is the case, is there a way to make this example work
>>>
>>> ```
>>> SELECT
>>>   SUM(amount * rate) AS amount
>>> FROM
>>>   orders,
>>>   LATERAL TABLE (rates(order_time))
>>> WHERE
>>>   rates.currency = orders.currency
>>> ```
>>>
>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>> the temporal table function to the cluster (by packaging it in a jar file)
>>> and then run the above query from the Flink SQL CLI?
>>>
>>> Thanks in advance,
>>>
>>> Salva
>>>
>>>


Re: Can temporal table functions only be registered using the table API?

2022-10-04 Thread Salva Alcántara
It doesn't seem the case with processing time unless I'm mistaken:

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join

This case seems to require a different syntax based on LATERAL TABLE and a
temporal table function (FOR SYSTEM_TIME is not supported). From the docs
too, it seems that temporal table functions can only be registered via the
table API. Am I missing/misunderstanding something?

Salva

On Tue, Oct 4, 2022, 19:26 Martijn Visser  wrote:

> Hi Salva,
>
> The examples for temporal table joins can be found at
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
> Your example is definitely possible with just using SQL.
>
> Best regards,
>
> Martijn
>
> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara 
> wrote:
>
>> Based on this:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>
>> It seems that the only way of registering temporal table functions is via
>> the Table API.
>>
>> If that is the case, is there a way to make this example work
>>
>> ```
>> SELECT
>>   SUM(amount * rate) AS amount
>> FROM
>>   orders,
>>   LATERAL TABLE (rates(order_time))
>> WHERE
>>   rates.currency = orders.currency
>> ```
>>
>> without the Table API, just using SQL? E.g., is it possible to deploy the
>> temporal table function to the cluster (by packaging it in a jar file) and
>> then run the above query from the Flink SQL CLI?
>>
>> Thanks in advance,
>>
>> Salva
>>
>>


Can temporal table functions only be registered using the table API?

2022-10-04 Thread Salva Alcántara
Based on this:

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/

It seems that the only way of registering temporal table functions is via
the Table API.

If that is the case, is there a way to make this example work

```
SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency
```

without the Table API, just using SQL? E.g., is it possible to deploy the
temporal table function to the cluster (by packaging it in a jar file) and
then run the above query from the Flink SQL CLI?

Thanks in advance,

Salva