I've found more examples here:

https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance

where a fact table is enriched using several dimension tables, but again
the temporal table functions are registered using Table API like so:

```java
    tEnv.registerFunction(
        "dimension_table1",
        tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
"id"));```

It's not exactly the same application, since this example covers a lookup
join, but the SQL query is also relying on the LATERAL TABLE + temporal
table functions:

```
SELECT
    D1.col1 AS A,
    D1.col2 AS B,
FROM
    fact_table,
    LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
WHERE
    fact_table.dim1     = D1.id
```

In particular, this produces a job which is equivalent to

```
  private abstract static class AbstractFactDimTableJoin<IN1, OUT>
      extends CoProcessFunction<IN1, Dimension, OUT> {
    private static final long serialVersionUID = 1L;

    protected transient ValueState<Dimension> dimState;

    @Override
    public void processElement1(IN1 value, Context ctx, Collector<OUT> out)
throws Exception {
      Dimension dim = dimState.value();
      if (dim == null) {
        return;
      }
      out.collect(join(value, dim));
    }

    abstract OUT join(IN1 value, Dimension dim);

    @Override
    public void processElement2(Dimension value, Context ctx,
Collector<OUT> out) throws Exception {
      dimState.update(value);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      ValueStateDescriptor<Dimension> dimStateDesc =
          new ValueStateDescriptor<>("dimstate", Dimension.class);
      this.dimState = getRuntimeContext().getState(dimStateDesc);
    }
  }
```

I'm basically interested in rewriting these types of DIY joins (based on
CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
possible, otherwise I would like to know which limitations there are.

Regards,

Salva

On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <salcantara...@gmail.com>
wrote:

> By looking at the docs for older versions of Flink, e.g.,
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>
> it seems that it's possible to rewrite this query
>
> ```
> SELECT
>   o.amount * r.rate AS amount
> FROM
>   Orders AS o,
>   LATERAL TABLE (Rates(o.rowtime)) AS r
> WHERE r.currency = o.currency
> ```
>
> as
>
> ```
> SELECT
>   SUM(o.amount * r.rate) AS amount
> FROM Orders AS o,
>   RatesHistory AS r
> WHERE r.currency = o.currency
> AND r.rowtime = (
>   SELECT MAX(rowtime)
>   FROM RatesHistory AS r2
>   WHERE r2.currency = o.currency
>   AND r2.rowtime <= o.rowtime);
> ```
>
> This would be a way to accomplish this task in SQL without using a
> temporal table function.
>
> Would this rewrite be equivalent in terms of the final generated job?
> Obviously I very much prefer the LATERAL TABLE query but this requires
> using a temporal table function which can only be registered using the
> Table API (apparently).
>
> Regards,
>
> Salva
>
> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <salcantara...@gmail.com>
> wrote:
>
>> It doesn't seem the case with processing time unless I'm mistaken:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>
>> This case seems to require a different syntax based on LATERAL TABLE and
>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs
>> too, it seems that temporal table functions can only be registered via the
>> table API. Am I missing/misunderstanding something?
>>
>> Salva
>>
>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <martijnvis...@apache.org>
>> wrote:
>>
>>> Hi Salva,
>>>
>>> The examples for temporal table joins can be found at
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>> Your example is definitely possible with just using SQL.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <salcantara...@gmail.com>
>>> wrote:
>>>
>>>> Based on this:
>>>>
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>>
>>>> It seems that the only way of registering temporal table functions is
>>>> via the Table API.
>>>>
>>>> If that is the case, is there a way to make this example work
>>>>
>>>> ```
>>>> SELECT
>>>>   SUM(amount * rate) AS amount
>>>> FROM
>>>>   orders,
>>>>   LATERAL TABLE (rates(order_time))
>>>> WHERE
>>>>   rates.currency = orders.currency
>>>> ```
>>>>
>>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>>> the temporal table function to the cluster (by packaging it in a jar file)
>>>> and then run the above query from the Flink SQL CLI?
>>>>
>>>> Thanks in advance,
>>>>
>>>> Salva
>>>>
>>>>

Reply via email to