By looking at the docs for older versions of Flink, e.g.,

https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html

it seems that it's possible to rewrite this query

```
SELECT
  o.amount * r.rate AS amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency
```

as

```
SELECT
  SUM(o.amount * r.rate) AS amount
FROM Orders AS o,
  RatesHistory AS r
WHERE r.currency = o.currency
AND r.rowtime = (
  SELECT MAX(rowtime)
  FROM RatesHistory AS r2
  WHERE r2.currency = o.currency
  AND r2.rowtime <= o.rowtime);
```

This would be a way to accomplish this task in SQL without using a temporal
table function.

Would this rewrite be equivalent in terms of the final generated job?
Obviously I very much prefer the LATERAL TABLE query but this requires
using a temporal table function which can only be registered using the
Table API (apparently).

Regards,

Salva

On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <salcantara...@gmail.com>
wrote:

> It doesn't seem the case with processing time unless I'm mistaken:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>
> This case seems to require a different syntax based on LATERAL TABLE and a
> temporal table function (FOR SYSTEM_TIME is not supported). From the docs
> too, it seems that temporal table functions can only be registered via the
> table API. Am I missing/misunderstanding something?
>
> Salva
>
> On Tue, Oct 4, 2022, 19:26 Martijn Visser <martijnvis...@apache.org>
> wrote:
>
>> Hi Salva,
>>
>> The examples for temporal table joins can be found at
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>> Your example is definitely possible with just using SQL.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <salcantara...@gmail.com>
>> wrote:
>>
>>> Based on this:
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>
>>> It seems that the only way of registering temporal table functions is
>>> via the Table API.
>>>
>>> If that is the case, is there a way to make this example work
>>>
>>> ```
>>> SELECT
>>>   SUM(amount * rate) AS amount
>>> FROM
>>>   orders,
>>>   LATERAL TABLE (rates(order_time))
>>> WHERE
>>>   rates.currency = orders.currency
>>> ```
>>>
>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>> the temporal table function to the cluster (by packaging it in a jar file)
>>> and then run the above query from the Flink SQL CLI?
>>>
>>> Thanks in advance,
>>>
>>> Salva
>>>
>>>

Reply via email to