RE: Re: Can temporal table functions only be registered using the table API?
Hi David, Many thanks for your reply. I understand then that there is no easy way to do a simple processing-time join (purely based on SQL without using the table API) where you: - Save elements seen on the right in the current state (in general this state can be regarded as a materialised view, e.g., could be a value but also a list) - Perform a join operation for each element coming on the left based on the current state (only consider new events here, and simply discard elements without producing a join result if there no match based on current state) along the lines of that in the TemporalProcessTimeJoinOperator.java link that you provided, or, if you like, the typical DIY type of joins that one normally/easily does when using the DataStream API. Regards, Salva On 2022/10/06 17:23:30 David Anderson wrote: > I was wrong about this. The AS OF style processing join has been disabled > at a higher level, > in org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin#createJoinOperator > > David > > On Thu, Oct 6, 2022 at 9:59 AM David Anderson wrote: > > > Salva, > > > > Have you tried doing an AS OF style processing time temporal join? I know > > the documentation leads one to believe this isn't supported, but I think it > > actually works. I'm basing this on this comment [1] in the code for > > the TemporalProcessTimeJoinOperator: > > > > The operator to temporal join a stream on processing time. > > > > For temporal TableFunction join (LATERAL > >> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME > >> AS OF), they can reuse same processing-time operator implementation, the > >> differences between them are: (1) The temporal TableFunction join only > >> supports single column in primary key but temporal table join supports > >> arbitrary columns in primary key. (2) The temporal TableFunction join only > >> supports inner join, temporal table join supports both inner join and left > >> outer join. > > > > > > [1] > > https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38 > > > > Regards, > > David > > > > On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara > > wrote: > > > >> I've found more examples here: > >> > >> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance > >> > >> where a fact table is enriched using several dimension tables, but again > >> the temporal table functions are registered using Table API like so: > >> > >> ```java > >> tEnv.registerFunction( > >> "dimension_table1", > >> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime", > >> "id"));``` > >> > >> It's not exactly the same application, since this example covers a lookup > >> join, but the SQL query is also relying on the LATERAL TABLE + temporal > >> table functions: > >> > >> ``` > >> SELECT > >> D1.col1 AS A, > >> D1.col2 AS B, > >> FROM > >> fact_table, > >> LATERAL TABLE (dimension_table1(f_proctime)) AS D1, > >> WHERE > >> fact_table.dim1 = D1.id > >> ``` > >> > >> In particular, this produces a job which is equivalent to > >> > >> ``` > >> private abstract static class AbstractFactDimTableJoin > >> extends CoProcessFunction { > >> private static final long serialVersionUID = 1L; > >> > >> protected transient ValueState dimState; > >> > >> @Override > >> public void processElement1(IN1 value, Context ctx, Collector > >> out) throws Exception { > >> Dimension dim = dimState.value(); > >> if (dim == null) { > >> return; > >> } > >> out.collect(join(value, dim)); > >> } > >> > >> abstract OUT join(IN1 value, Dimension dim); > >> > >> @Override > >> public void processElement2(Dimension value, Context ctx, > >> Collector out) throws Exception { > >> dimState.update(value); > >> } > >> > >> @Override > >> public void open(Configuration parameters) throws Exception { > >> super.open(parameters); > >> ValueStateDescriptor dimStateDesc = > >> new ValueStateDescriptor<>("dimstate", Dimension.class); > >> this.dimState = getRuntimeContext().getState(dimStateDesc); > >> } > >> } > >> ``` > >> > >> I'm basically interested in rewriting these types of DIY joins (based on > >> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if > >> possible, otherwise I would like to know which limitations there are. > >> > >> Regards, > >> > >> Salva > >> > >> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara > >> wrote: > >> > >>> By looking at the docs for older versions of Flink, e.g., > >>> > >>> > >>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html > >>> > >>> it seems that it's possible to rewrite this query > >>> > >>> ``` > >>> SELECT > >>> o.amount * r.rate AS amount > >>> FROM > >>> Orders AS o, > >>>
Re: Can temporal table functions only be registered using the table API?
I was wrong about this. The AS OF style processing join has been disabled at a higher level, in org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin#createJoinOperator David On Thu, Oct 6, 2022 at 9:59 AM David Anderson wrote: > Salva, > > Have you tried doing an AS OF style processing time temporal join? I know > the documentation leads one to believe this isn't supported, but I think it > actually works. I'm basing this on this comment [1] in the code for > the TemporalProcessTimeJoinOperator: > > The operator to temporal join a stream on processing time. > > For temporal TableFunction join (LATERAL >> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME >> AS OF), they can reuse same processing-time operator implementation, the >> differences between them are: (1) The temporal TableFunction join only >> supports single column in primary key but temporal table join supports >> arbitrary columns in primary key. (2) The temporal TableFunction join only >> supports inner join, temporal table join supports both inner join and left >> outer join. > > > [1] > https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38 > > Regards, > David > > On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara > wrote: > >> I've found more examples here: >> >> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance >> >> where a fact table is enriched using several dimension tables, but again >> the temporal table functions are registered using Table API like so: >> >> ```java >> tEnv.registerFunction( >> "dimension_table1", >> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime", >> "id"));``` >> >> It's not exactly the same application, since this example covers a lookup >> join, but the SQL query is also relying on the LATERAL TABLE + temporal >> table functions: >> >> ``` >> SELECT >> D1.col1 AS A, >> D1.col2 AS B, >> FROM >> fact_table, >> LATERAL TABLE (dimension_table1(f_proctime)) AS D1, >> WHERE >> fact_table.dim1 = D1.id >> ``` >> >> In particular, this produces a job which is equivalent to >> >> ``` >> private abstract static class AbstractFactDimTableJoin >> extends CoProcessFunction { >> private static final long serialVersionUID = 1L; >> >> protected transient ValueState dimState; >> >> @Override >> public void processElement1(IN1 value, Context ctx, Collector >> out) throws Exception { >> Dimension dim = dimState.value(); >> if (dim == null) { >> return; >> } >> out.collect(join(value, dim)); >> } >> >> abstract OUT join(IN1 value, Dimension dim); >> >> @Override >> public void processElement2(Dimension value, Context ctx, >> Collector out) throws Exception { >> dimState.update(value); >> } >> >> @Override >> public void open(Configuration parameters) throws Exception { >> super.open(parameters); >> ValueStateDescriptor dimStateDesc = >> new ValueStateDescriptor<>("dimstate", Dimension.class); >> this.dimState = getRuntimeContext().getState(dimStateDesc); >> } >> } >> ``` >> >> I'm basically interested in rewriting these types of DIY joins (based on >> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if >> possible, otherwise I would like to know which limitations there are. >> >> Regards, >> >> Salva >> >> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara >> wrote: >> >>> By looking at the docs for older versions of Flink, e.g., >>> >>> >>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html >>> >>> it seems that it's possible to rewrite this query >>> >>> ``` >>> SELECT >>> o.amount * r.rate AS amount >>> FROM >>> Orders AS o, >>> LATERAL TABLE (Rates(o.rowtime)) AS r >>> WHERE r.currency = o.currency >>> ``` >>> >>> as >>> >>> ``` >>> SELECT >>> SUM(o.amount * r.rate) AS amount >>> FROM Orders AS o, >>> RatesHistory AS r >>> WHERE r.currency = o.currency >>> AND r.rowtime = ( >>> SELECT MAX(rowtime) >>> FROM RatesHistory AS r2 >>> WHERE r2.currency = o.currency >>> AND r2.rowtime <= o.rowtime); >>> ``` >>> >>> This would be a way to accomplish this task in SQL without using a >>> temporal table function. >>> >>> Would this rewrite be equivalent in terms of the final generated job? >>> Obviously I very much prefer the LATERAL TABLE query but this requires >>> using a temporal table function which can only be registered using the >>> Table API (apparently). >>> >>> Regards, >>> >>> Salva >>> >>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara >>> wrote: >>> It doesn't seem the case with processing time unless I'm mistaken: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
Re: Can temporal table functions only be registered using the table API?
As for your original question, the documentation states that a temporal table function can only be registered via the Table API, and I believe this is true. David On Thu, Oct 6, 2022 at 9:59 AM David Anderson wrote: > Salva, > > Have you tried doing an AS OF style processing time temporal join? I know > the documentation leads one to believe this isn't supported, but I think it > actually works. I'm basing this on this comment [1] in the code for > the TemporalProcessTimeJoinOperator: > > The operator to temporal join a stream on processing time. > > For temporal TableFunction join (LATERAL >> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME >> AS OF), they can reuse same processing-time operator implementation, the >> differences between them are: (1) The temporal TableFunction join only >> supports single column in primary key but temporal table join supports >> arbitrary columns in primary key. (2) The temporal TableFunction join only >> supports inner join, temporal table join supports both inner join and left >> outer join. > > > [1] > https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38 > > Regards, > David > > On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara > wrote: > >> I've found more examples here: >> >> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance >> >> where a fact table is enriched using several dimension tables, but again >> the temporal table functions are registered using Table API like so: >> >> ```java >> tEnv.registerFunction( >> "dimension_table1", >> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime", >> "id"));``` >> >> It's not exactly the same application, since this example covers a lookup >> join, but the SQL query is also relying on the LATERAL TABLE + temporal >> table functions: >> >> ``` >> SELECT >> D1.col1 AS A, >> D1.col2 AS B, >> FROM >> fact_table, >> LATERAL TABLE (dimension_table1(f_proctime)) AS D1, >> WHERE >> fact_table.dim1 = D1.id >> ``` >> >> In particular, this produces a job which is equivalent to >> >> ``` >> private abstract static class AbstractFactDimTableJoin >> extends CoProcessFunction { >> private static final long serialVersionUID = 1L; >> >> protected transient ValueState dimState; >> >> @Override >> public void processElement1(IN1 value, Context ctx, Collector >> out) throws Exception { >> Dimension dim = dimState.value(); >> if (dim == null) { >> return; >> } >> out.collect(join(value, dim)); >> } >> >> abstract OUT join(IN1 value, Dimension dim); >> >> @Override >> public void processElement2(Dimension value, Context ctx, >> Collector out) throws Exception { >> dimState.update(value); >> } >> >> @Override >> public void open(Configuration parameters) throws Exception { >> super.open(parameters); >> ValueStateDescriptor dimStateDesc = >> new ValueStateDescriptor<>("dimstate", Dimension.class); >> this.dimState = getRuntimeContext().getState(dimStateDesc); >> } >> } >> ``` >> >> I'm basically interested in rewriting these types of DIY joins (based on >> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if >> possible, otherwise I would like to know which limitations there are. >> >> Regards, >> >> Salva >> >> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara >> wrote: >> >>> By looking at the docs for older versions of Flink, e.g., >>> >>> >>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html >>> >>> it seems that it's possible to rewrite this query >>> >>> ``` >>> SELECT >>> o.amount * r.rate AS amount >>> FROM >>> Orders AS o, >>> LATERAL TABLE (Rates(o.rowtime)) AS r >>> WHERE r.currency = o.currency >>> ``` >>> >>> as >>> >>> ``` >>> SELECT >>> SUM(o.amount * r.rate) AS amount >>> FROM Orders AS o, >>> RatesHistory AS r >>> WHERE r.currency = o.currency >>> AND r.rowtime = ( >>> SELECT MAX(rowtime) >>> FROM RatesHistory AS r2 >>> WHERE r2.currency = o.currency >>> AND r2.rowtime <= o.rowtime); >>> ``` >>> >>> This would be a way to accomplish this task in SQL without using a >>> temporal table function. >>> >>> Would this rewrite be equivalent in terms of the final generated job? >>> Obviously I very much prefer the LATERAL TABLE query but this requires >>> using a temporal table function which can only be registered using the >>> Table API (apparently). >>> >>> Regards, >>> >>> Salva >>> >>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara >>> wrote: >>> It doesn't seem the case with processing time unless I'm mistaken: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join This case seems to require a
Re: Can temporal table functions only be registered using the table API?
Salva, Have you tried doing an AS OF style processing time temporal join? I know the documentation leads one to believe this isn't supported, but I think it actually works. I'm basing this on this comment [1] in the code for the TemporalProcessTimeJoinOperator: The operator to temporal join a stream on processing time. For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime)) > and temporal table join (FOR SYSTEM_TIME AS OF), they can reuse same > processing-time operator implementation, the differences between them are: > (1) The temporal TableFunction join only supports single column in primary > key but temporal table join supports arbitrary columns in primary key. (2) > The temporal TableFunction join only supports inner join, temporal table > join supports both inner join and left outer join. [1] https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38 Regards, David On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara wrote: > I've found more examples here: > > https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance > > where a fact table is enriched using several dimension tables, but again > the temporal table functions are registered using Table API like so: > > ```java > tEnv.registerFunction( > "dimension_table1", > tEnv.from("dim_table1").createTemporalTableFunction("r_proctime", > "id"));``` > > It's not exactly the same application, since this example covers a lookup > join, but the SQL query is also relying on the LATERAL TABLE + temporal > table functions: > > ``` > SELECT > D1.col1 AS A, > D1.col2 AS B, > FROM > fact_table, > LATERAL TABLE (dimension_table1(f_proctime)) AS D1, > WHERE > fact_table.dim1 = D1.id > ``` > > In particular, this produces a job which is equivalent to > > ``` > private abstract static class AbstractFactDimTableJoin > extends CoProcessFunction { > private static final long serialVersionUID = 1L; > > protected transient ValueState dimState; > > @Override > public void processElement1(IN1 value, Context ctx, Collector > out) throws Exception { > Dimension dim = dimState.value(); > if (dim == null) { > return; > } > out.collect(join(value, dim)); > } > > abstract OUT join(IN1 value, Dimension dim); > > @Override > public void processElement2(Dimension value, Context ctx, > Collector out) throws Exception { > dimState.update(value); > } > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > ValueStateDescriptor dimStateDesc = > new ValueStateDescriptor<>("dimstate", Dimension.class); > this.dimState = getRuntimeContext().getState(dimStateDesc); > } > } > ``` > > I'm basically interested in rewriting these types of DIY joins (based on > CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if > possible, otherwise I would like to know which limitations there are. > > Regards, > > Salva > > On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara > wrote: > >> By looking at the docs for older versions of Flink, e.g., >> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html >> >> it seems that it's possible to rewrite this query >> >> ``` >> SELECT >> o.amount * r.rate AS amount >> FROM >> Orders AS o, >> LATERAL TABLE (Rates(o.rowtime)) AS r >> WHERE r.currency = o.currency >> ``` >> >> as >> >> ``` >> SELECT >> SUM(o.amount * r.rate) AS amount >> FROM Orders AS o, >> RatesHistory AS r >> WHERE r.currency = o.currency >> AND r.rowtime = ( >> SELECT MAX(rowtime) >> FROM RatesHistory AS r2 >> WHERE r2.currency = o.currency >> AND r2.rowtime <= o.rowtime); >> ``` >> >> This would be a way to accomplish this task in SQL without using a >> temporal table function. >> >> Would this rewrite be equivalent in terms of the final generated job? >> Obviously I very much prefer the LATERAL TABLE query but this requires >> using a temporal table function which can only be registered using the >> Table API (apparently). >> >> Regards, >> >> Salva >> >> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara >> wrote: >> >>> It doesn't seem the case with processing time unless I'm mistaken: >>> >>> >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join >>> >>> This case seems to require a different syntax based on LATERAL TABLE and >>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs >>> too, it seems that temporal table functions can only be registered via the >>> table API. Am I missing/misunderstanding something? >>> >>> Salva >>> >>> On Tue, Oct 4, 2022, 19:26 Martijn Visser >>> wrote: >>> Hi Salva, The examples for temporal table
Re: Can temporal table functions only be registered using the table API?
I've found more examples here: https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance where a fact table is enriched using several dimension tables, but again the temporal table functions are registered using Table API like so: ```java tEnv.registerFunction( "dimension_table1", tEnv.from("dim_table1").createTemporalTableFunction("r_proctime", "id"));``` It's not exactly the same application, since this example covers a lookup join, but the SQL query is also relying on the LATERAL TABLE + temporal table functions: ``` SELECT D1.col1 AS A, D1.col2 AS B, FROM fact_table, LATERAL TABLE (dimension_table1(f_proctime)) AS D1, WHERE fact_table.dim1 = D1.id ``` In particular, this produces a job which is equivalent to ``` private abstract static class AbstractFactDimTableJoin extends CoProcessFunction { private static final long serialVersionUID = 1L; protected transient ValueState dimState; @Override public void processElement1(IN1 value, Context ctx, Collector out) throws Exception { Dimension dim = dimState.value(); if (dim == null) { return; } out.collect(join(value, dim)); } abstract OUT join(IN1 value, Dimension dim); @Override public void processElement2(Dimension value, Context ctx, Collector out) throws Exception { dimState.update(value); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor dimStateDesc = new ValueStateDescriptor<>("dimstate", Dimension.class); this.dimState = getRuntimeContext().getState(dimStateDesc); } } ``` I'm basically interested in rewriting these types of DIY joins (based on CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if possible, otherwise I would like to know which limitations there are. Regards, Salva On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara wrote: > By looking at the docs for older versions of Flink, e.g., > > > https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html > > it seems that it's possible to rewrite this query > > ``` > SELECT > o.amount * r.rate AS amount > FROM > Orders AS o, > LATERAL TABLE (Rates(o.rowtime)) AS r > WHERE r.currency = o.currency > ``` > > as > > ``` > SELECT > SUM(o.amount * r.rate) AS amount > FROM Orders AS o, > RatesHistory AS r > WHERE r.currency = o.currency > AND r.rowtime = ( > SELECT MAX(rowtime) > FROM RatesHistory AS r2 > WHERE r2.currency = o.currency > AND r2.rowtime <= o.rowtime); > ``` > > This would be a way to accomplish this task in SQL without using a > temporal table function. > > Would this rewrite be equivalent in terms of the final generated job? > Obviously I very much prefer the LATERAL TABLE query but this requires > using a temporal table function which can only be registered using the > Table API (apparently). > > Regards, > > Salva > > On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara > wrote: > >> It doesn't seem the case with processing time unless I'm mistaken: >> >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join >> >> This case seems to require a different syntax based on LATERAL TABLE and >> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs >> too, it seems that temporal table functions can only be registered via the >> table API. Am I missing/misunderstanding something? >> >> Salva >> >> On Tue, Oct 4, 2022, 19:26 Martijn Visser >> wrote: >> >>> Hi Salva, >>> >>> The examples for temporal table joins can be found at >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins. >>> Your example is definitely possible with just using SQL. >>> >>> Best regards, >>> >>> Martijn >>> >>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara >>> wrote: >>> Based on this: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/ It seems that the only way of registering temporal table functions is via the Table API. If that is the case, is there a way to make this example work ``` SELECT SUM(amount * rate) AS amount FROM orders, LATERAL TABLE (rates(order_time)) WHERE rates.currency = orders.currency ``` without the Table API, just using SQL? E.g., is it possible to deploy the temporal table function to the cluster (by packaging it in a jar file) and then run the above query from the Flink SQL CLI? Thanks in advance, Salva
Re: Can temporal table functions only be registered using the table API?
By looking at the docs for older versions of Flink, e.g., https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html it seems that it's possible to rewrite this query ``` SELECT o.amount * r.rate AS amount FROM Orders AS o, LATERAL TABLE (Rates(o.rowtime)) AS r WHERE r.currency = o.currency ``` as ``` SELECT SUM(o.amount * r.rate) AS amount FROM Orders AS o, RatesHistory AS r WHERE r.currency = o.currency AND r.rowtime = ( SELECT MAX(rowtime) FROM RatesHistory AS r2 WHERE r2.currency = o.currency AND r2.rowtime <= o.rowtime); ``` This would be a way to accomplish this task in SQL without using a temporal table function. Would this rewrite be equivalent in terms of the final generated job? Obviously I very much prefer the LATERAL TABLE query but this requires using a temporal table function which can only be registered using the Table API (apparently). Regards, Salva On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara wrote: > It doesn't seem the case with processing time unless I'm mistaken: > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join > > This case seems to require a different syntax based on LATERAL TABLE and a > temporal table function (FOR SYSTEM_TIME is not supported). From the docs > too, it seems that temporal table functions can only be registered via the > table API. Am I missing/misunderstanding something? > > Salva > > On Tue, Oct 4, 2022, 19:26 Martijn Visser > wrote: > >> Hi Salva, >> >> The examples for temporal table joins can be found at >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins. >> Your example is definitely possible with just using SQL. >> >> Best regards, >> >> Martijn >> >> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara >> wrote: >> >>> Based on this: >>> >>> >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/ >>> >>> It seems that the only way of registering temporal table functions is >>> via the Table API. >>> >>> If that is the case, is there a way to make this example work >>> >>> ``` >>> SELECT >>> SUM(amount * rate) AS amount >>> FROM >>> orders, >>> LATERAL TABLE (rates(order_time)) >>> WHERE >>> rates.currency = orders.currency >>> ``` >>> >>> without the Table API, just using SQL? E.g., is it possible to deploy >>> the temporal table function to the cluster (by packaging it in a jar file) >>> and then run the above query from the Flink SQL CLI? >>> >>> Thanks in advance, >>> >>> Salva >>> >>>
Re: Can temporal table functions only be registered using the table API?
It doesn't seem the case with processing time unless I'm mistaken: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join This case seems to require a different syntax based on LATERAL TABLE and a temporal table function (FOR SYSTEM_TIME is not supported). From the docs too, it seems that temporal table functions can only be registered via the table API. Am I missing/misunderstanding something? Salva On Tue, Oct 4, 2022, 19:26 Martijn Visser wrote: > Hi Salva, > > The examples for temporal table joins can be found at > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins. > Your example is definitely possible with just using SQL. > > Best regards, > > Martijn > > On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara > wrote: > >> Based on this: >> >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/ >> >> It seems that the only way of registering temporal table functions is via >> the Table API. >> >> If that is the case, is there a way to make this example work >> >> ``` >> SELECT >> SUM(amount * rate) AS amount >> FROM >> orders, >> LATERAL TABLE (rates(order_time)) >> WHERE >> rates.currency = orders.currency >> ``` >> >> without the Table API, just using SQL? E.g., is it possible to deploy the >> temporal table function to the cluster (by packaging it in a jar file) and >> then run the above query from the Flink SQL CLI? >> >> Thanks in advance, >> >> Salva >> >>
Can temporal table functions only be registered using the table API?
Based on this: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/ It seems that the only way of registering temporal table functions is via the Table API. If that is the case, is there a way to make this example work ``` SELECT SUM(amount * rate) AS amount FROM orders, LATERAL TABLE (rates(order_time)) WHERE rates.currency = orders.currency ``` without the Table API, just using SQL? E.g., is it possible to deploy the temporal table function to the cluster (by packaging it in a jar file) and then run the above query from the Flink SQL CLI? Thanks in advance, Salva