I've found more examples here: https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
where a fact table is enriched using several dimension tables, but again the temporal table functions are registered using Table API like so: ```java tEnv.registerFunction( "dimension_table1", tEnv.from("dim_table1").createTemporalTableFunction("r_proctime", "id"));``` It's not exactly the same application, since this example covers a lookup join, but the SQL query is also relying on the LATERAL TABLE + temporal table functions: ``` SELECT D1.col1 AS A, D1.col2 AS B, FROM fact_table, LATERAL TABLE (dimension_table1(f_proctime)) AS D1, WHERE fact_table.dim1 = D1.id ``` In particular, this produces a job which is equivalent to ``` private abstract static class AbstractFactDimTableJoin<IN1, OUT> extends CoProcessFunction<IN1, Dimension, OUT> { private static final long serialVersionUID = 1L; protected transient ValueState<Dimension> dimState; @Override public void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception { Dimension dim = dimState.value(); if (dim == null) { return; } out.collect(join(value, dim)); } abstract OUT join(IN1 value, Dimension dim); @Override public void processElement2(Dimension value, Context ctx, Collector<OUT> out) throws Exception { dimState.update(value); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor<Dimension> dimStateDesc = new ValueStateDescriptor<>("dimstate", Dimension.class); this.dimState = getRuntimeContext().getState(dimStateDesc); } } ``` I'm basically interested in rewriting these types of DIY joins (based on CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if possible, otherwise I would like to know which limitations there are. Regards, Salva On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <salcantara...@gmail.com> wrote: > By looking at the docs for older versions of Flink, e.g., > > > https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html > > it seems that it's possible to rewrite this query > > ``` > SELECT > o.amount * r.rate AS amount > FROM > Orders AS o, > LATERAL TABLE (Rates(o.rowtime)) AS r > WHERE r.currency = o.currency > ``` > > as > > ``` > SELECT > SUM(o.amount * r.rate) AS amount > FROM Orders AS o, > RatesHistory AS r > WHERE r.currency = o.currency > AND r.rowtime = ( > SELECT MAX(rowtime) > FROM RatesHistory AS r2 > WHERE r2.currency = o.currency > AND r2.rowtime <= o.rowtime); > ``` > > This would be a way to accomplish this task in SQL without using a > temporal table function. > > Would this rewrite be equivalent in terms of the final generated job? > Obviously I very much prefer the LATERAL TABLE query but this requires > using a temporal table function which can only be registered using the > Table API (apparently). > > Regards, > > Salva > > On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <salcantara...@gmail.com> > wrote: > >> It doesn't seem the case with processing time unless I'm mistaken: >> >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join >> >> This case seems to require a different syntax based on LATERAL TABLE and >> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs >> too, it seems that temporal table functions can only be registered via the >> table API. Am I missing/misunderstanding something? >> >> Salva >> >> On Tue, Oct 4, 2022, 19:26 Martijn Visser <martijnvis...@apache.org> >> wrote: >> >>> Hi Salva, >>> >>> The examples for temporal table joins can be found at >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins. >>> Your example is definitely possible with just using SQL. >>> >>> Best regards, >>> >>> Martijn >>> >>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <salcantara...@gmail.com> >>> wrote: >>> >>>> Based on this: >>>> >>>> >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/ >>>> >>>> It seems that the only way of registering temporal table functions is >>>> via the Table API. >>>> >>>> If that is the case, is there a way to make this example work >>>> >>>> ``` >>>> SELECT >>>> SUM(amount * rate) AS amount >>>> FROM >>>> orders, >>>> LATERAL TABLE (rates(order_time)) >>>> WHERE >>>> rates.currency = orders.currency >>>> ``` >>>> >>>> without the Table API, just using SQL? E.g., is it possible to deploy >>>> the temporal table function to the cluster (by packaging it in a jar file) >>>> and then run the above query from the Flink SQL CLI? >>>> >>>> Thanks in advance, >>>> >>>> Salva >>>> >>>>