Salva,

Have you tried doing an AS OF style processing time temporal join? I know
the documentation leads one to believe this isn't supported, but I think it
actually works. I'm basing this on this comment [1] in the code for
the TemporalProcessTimeJoinOperator:

The operator to temporal join a stream on processing time.

For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime))
> and temporal table join (FOR SYSTEM_TIME AS OF), they can reuse same
> processing-time operator implementation, the differences between them are:
> (1) The temporal TableFunction join only supports single column in primary
> key but temporal table join supports arbitrary columns in primary key. (2)
> The temporal TableFunction join only supports inner join, temporal table
> join supports both inner join and left outer join.


[1]
https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38

Regards,
David

On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara <salcantara...@gmail.com>
wrote:

> I've found more examples here:
>
> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>
> where a fact table is enriched using several dimension tables, but again
> the temporal table functions are registered using Table API like so:
>
> ```java
>     tEnv.registerFunction(
>         "dimension_table1",
>         tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
> "id"));```
>
> It's not exactly the same application, since this example covers a lookup
> join, but the SQL query is also relying on the LATERAL TABLE + temporal
> table functions:
>
> ```
> SELECT
>     D1.col1 AS A,
>     D1.col2 AS B,
> FROM
>     fact_table,
>     LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
> WHERE
>     fact_table.dim1     = D1.id
> ```
>
> In particular, this produces a job which is equivalent to
>
> ```
>   private abstract static class AbstractFactDimTableJoin<IN1, OUT>
>       extends CoProcessFunction<IN1, Dimension, OUT> {
>     private static final long serialVersionUID = 1L;
>
>     protected transient ValueState<Dimension> dimState;
>
>     @Override
>     public void processElement1(IN1 value, Context ctx, Collector<OUT>
> out) throws Exception {
>       Dimension dim = dimState.value();
>       if (dim == null) {
>         return;
>       }
>       out.collect(join(value, dim));
>     }
>
>     abstract OUT join(IN1 value, Dimension dim);
>
>     @Override
>     public void processElement2(Dimension value, Context ctx,
> Collector<OUT> out) throws Exception {
>       dimState.update(value);
>     }
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>       super.open(parameters);
>       ValueStateDescriptor<Dimension> dimStateDesc =
>           new ValueStateDescriptor<>("dimstate", Dimension.class);
>       this.dimState = getRuntimeContext().getState(dimStateDesc);
>     }
>   }
> ```
>
> I'm basically interested in rewriting these types of DIY joins (based on
> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
> possible, otherwise I would like to know which limitations there are.
>
> Regards,
>
> Salva
>
> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <salcantara...@gmail.com>
> wrote:
>
>> By looking at the docs for older versions of Flink, e.g.,
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>
>> it seems that it's possible to rewrite this query
>>
>> ```
>> SELECT
>>   o.amount * r.rate AS amount
>> FROM
>>   Orders AS o,
>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>> WHERE r.currency = o.currency
>> ```
>>
>> as
>>
>> ```
>> SELECT
>>   SUM(o.amount * r.rate) AS amount
>> FROM Orders AS o,
>>   RatesHistory AS r
>> WHERE r.currency = o.currency
>> AND r.rowtime = (
>>   SELECT MAX(rowtime)
>>   FROM RatesHistory AS r2
>>   WHERE r2.currency = o.currency
>>   AND r2.rowtime <= o.rowtime);
>> ```
>>
>> This would be a way to accomplish this task in SQL without using a
>> temporal table function.
>>
>> Would this rewrite be equivalent in terms of the final generated job?
>> Obviously I very much prefer the LATERAL TABLE query but this requires
>> using a temporal table function which can only be registered using the
>> Table API (apparently).
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <salcantara...@gmail.com>
>> wrote:
>>
>>> It doesn't seem the case with processing time unless I'm mistaken:
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>>
>>> This case seems to require a different syntax based on LATERAL TABLE and
>>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs
>>> too, it seems that temporal table functions can only be registered via the
>>> table API. Am I missing/misunderstanding something?
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <martijnvis...@apache.org>
>>> wrote:
>>>
>>>> Hi Salva,
>>>>
>>>> The examples for temporal table joins can be found at
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>>> Your example is definitely possible with just using SQL.
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <
>>>> salcantara...@gmail.com> wrote:
>>>>
>>>>> Based on this:
>>>>>
>>>>>
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>>>
>>>>> It seems that the only way of registering temporal table functions is
>>>>> via the Table API.
>>>>>
>>>>> If that is the case, is there a way to make this example work
>>>>>
>>>>> ```
>>>>> SELECT
>>>>>   SUM(amount * rate) AS amount
>>>>> FROM
>>>>>   orders,
>>>>>   LATERAL TABLE (rates(order_time))
>>>>> WHERE
>>>>>   rates.currency = orders.currency
>>>>> ```
>>>>>
>>>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>>>> the temporal table function to the cluster (by packaging it in a jar file)
>>>>> and then run the above query from the Flink SQL CLI?
>>>>>
>>>>> Thanks in advance,
>>>>>
>>>>> Salva
>>>>>
>>>>>

Reply via email to